From 9493dd29689253a30ad4b6a593d4201ce22deec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 11 Feb 2024 16:20:45 +0800 Subject: [PATCH 01/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/BrokerContextImpl.java | 26 ++--- .../smartboot/mqtt/broker/MqttSession.java | 84 +++++++++------- ...terSubscriber.java => SubscribeTopic.java} | 11 ++- .../mqtt/broker/eventbus/EventType.java | 8 +- .../broker/processor/PublishProcessor.java | 13 +-- .../broker/provider/SubscribeProvider.java | 2 +- .../broker/topic/AbstractConsumerRecord.java | 72 ++++++++++++++ .../mqtt/broker/topic/BrokerTopic.java | 28 ++++-- .../mqtt/broker/topic/SubscriberGroup.java | 47 +++++++++ .../broker/topic/SubscriberSharedGroup.java | 46 +++++++++ .../topic/TopicConsumerOrderShareRecord.java | 99 +++++++++++++++++++ ...bscriber.java => TopicConsumerRecord.java} | 71 +++---------- .../mqtt/broker/topic/TopicSubscribeTree.java | 29 ++++-- .../org/smartboot/mqtt/common/TopicToken.java | 17 +++- 14 files changed, 408 insertions(+), 145 deletions(-) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{TopicFilterSubscriber.java => SubscribeTopic.java} (79%) create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/AbstractConsumerRecord.java create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/{TopicSubscriber.java => TopicConsumerRecord.java} (64%) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 1acdf57a..9606d7d5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -34,6 +34,7 @@ import org.smartboot.mqtt.broker.processor.SubscribeProcessor; import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.broker.topic.BrokerTopic; +import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.broker.topic.TopicPublishTree; import org.smartboot.mqtt.broker.topic.TopicSubscribeTree; import org.smartboot.mqtt.common.AsyncTask; @@ -223,25 +224,26 @@ public class BrokerContextImpl implements BrokerContext { }); //一个新的订阅建立时,对每个匹配的主题名,如果存在最近保留的消息,它必须被发送给这个订阅者 - eventBus.subscribe(EventType.SUBSCRIBE_TOPIC, (eventType, subscriber) -> retainPushThreadPool.execute(new AsyncTask() { + eventBus.subscribe(EventType.SUBSCRIBE_TOPIC, (eventType, eventObject) -> retainPushThreadPool.execute(new AsyncTask() { @Override public void execute() { - BrokerTopic topic = subscriber.getTopic(); + TopicConsumerRecord consumerRecord = eventObject.getObject(); + BrokerTopic topic = consumerRecord.getTopic(); Message retainMessage = topic.getRetainMessage(); - if (retainMessage == null || retainMessage.getCreateTime() > subscriber.getLatestSubscribeTime()) { - topic.addSubscriber(subscriber); + if (retainMessage == null || retainMessage.getCreateTime() > consumerRecord.getLatestSubscribeTime()) { + topic.addSubscriber(consumerRecord); return; } - MqttSession session = subscriber.getMqttSession(); + MqttSession session = eventObject.getSession(); - MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(retainMessage.getPayload()).qos(subscriber.getMqttQoS()).topicName(retainMessage.getTopic()); + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(retainMessage.getPayload()).qos(consumerRecord.getMqttQoS()).topicName(retainMessage.getTopic()); if (session.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } // Qos0不走飞行窗口 - if (subscriber.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { + if (consumerRecord.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { session.write(publishBuilder.build()); - topic.addSubscriber(subscriber); + topic.addSubscriber(consumerRecord); return; } InflightQueue inflightQueue = session.getInflightQueue(); @@ -249,17 +251,17 @@ public class BrokerContextImpl implements BrokerContext { CompletableFuture> future = inflightQueue.offer(publishBuilder); future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { LOGGER.info("publish retain to client:{} success ", session.getClientId()); - topic.addSubscriber(subscriber); + topic.addSubscriber(consumerRecord); }); session.flush(); } })); - eventBus.subscribe(EventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object, (session, topicFilterSubscriber) -> { - if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) { + eventBus.subscribe(EventType.TOPIC_CREATE, (eventType, brokerTopic) -> subscribeTopicTree.refreshMatchRelation(brokerTopic, (session, topicFilterSubscriber) -> { + if (!providers.getSubscribeProvider().subscribeTopic(brokerTopic.getTopic(), session)) { return; } - session.subscribeSuccess(topicFilterSubscriber.getMqttQoS(), topicFilterSubscriber.getTopicFilterToken(), object); + session.subscribeSuccess(topicFilterSubscriber, brokerTopic); })); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 583ff4b3..dbd27464 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -16,8 +16,10 @@ import org.smartboot.mqtt.broker.eventbus.EventBus; import org.smartboot.mqtt.broker.eventbus.EventObject; import org.smartboot.mqtt.broker.eventbus.EventType; import org.smartboot.mqtt.broker.provider.impl.session.SessionState; +import org.smartboot.mqtt.broker.topic.AbstractConsumerRecord; import org.smartboot.mqtt.broker.topic.BrokerTopic; -import org.smartboot.mqtt.broker.topic.TopicSubscriber; +import org.smartboot.mqtt.broker.topic.SubscriberGroup; +import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.MqttWriter; @@ -47,7 +49,7 @@ public class MqttSession extends AbstractSession { /** * 当前连接订阅的Topic的消费信息 */ - private final Map subscribers = new ConcurrentHashMap<>(); + private final Map subscribers = new ConcurrentHashMap<>(); private final BrokerContext mqttContext; private String username; @@ -169,71 +171,79 @@ public class MqttSession extends AbstractSession { } private void subscribe0(String topicFilter, MqttQoS mqttQoS) { - TopicFilterSubscriber subscriber = subscribers.get(topicFilter); + SubscribeTopic subscriber = subscribers.get(topicFilter); + //订阅topic已存在,可能只是更新了Qos if (subscriber != null) { subscriber.setMqttQoS(mqttQoS); - subscriber.getTopicSubscribers().values().forEach(sub -> sub.setMqttQoS(mqttQoS)); return; } TopicToken topicToken = new TopicToken(topicFilter); if (!topicToken.isWildcards()) { mqttContext.getOrCreateTopic(topicFilter); } - subscriber = new TopicFilterSubscriber(topicToken, mqttQoS); - TopicFilterSubscriber preSubscriber = subscribers.put(topicFilter, subscriber); + SubscribeTopic newSubscriber = new SubscribeTopic(topicToken, mqttQoS); + SubscribeTopic preSubscriber = subscribers.put(topicFilter, newSubscriber); ValidateUtils.isTrue(preSubscriber == null, "duplicate topic filter"); - mqttContext.getTopicSubscribeTree().subscribeTopic(this, subscriber); - mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(mqttQoS, topicToken, topic)); + mqttContext.getTopicSubscribeTree().subscribeTopic(this, newSubscriber); + mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(newSubscriber, topic)); } - public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { + public void subscribeSuccess(SubscribeTopic subscribeTopic, BrokerTopic topic) { + TopicToken topicToken = subscribeTopic.getTopicFilterToken(); if (!mqttContext.getProviders().getSubscribeProvider().matchTopic(topic, this)) { return; } - TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this); - if (topicSubscriber != null) { + SubscriberGroup subscriberGroup = topic.getSubscriberGroup(subscribeTopic.getTopicFilterToken()); + AbstractConsumerRecord consumerRecord = subscriberGroup.getSubscriber(this); + if (consumerRecord != null) { //此前的订阅关系 - TopicToken preToken = topicSubscriber.getTopicFilterToken(); - if (preToken.isWildcards()) { + TopicToken preToken = consumerRecord.getTopicFilterToken(); + //此前为统配订阅或者未共享订阅,则更新订阅关系 + if (topicToken.isShared()) { + ValidateUtils.isTrue(preToken.getTopicFilter().equals(subscribeTopic.getTopicFilterToken().getTopicFilter()), "invalid subscriber"); + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, topic.getMessageQueue().getLatestOffset() + 1); + subscriberGroup.addSubscriber(record); + } else if (preToken.isWildcards()) { if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) { //解除旧的订阅关系 - TopicSubscriber preSubscription = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); - preSubscription.setMqttQoS(mqttQoS); - preSubscription.setTopicFilterToken(topicToken); + TopicConsumerRecord preRecord = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); + ValidateUtils.isTrue(preRecord == consumerRecord, "invalid consumerRecord"); + preRecord.disable(); + //绑定新的订阅关系 - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, preSubscription); - mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, preSubscription); + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, preRecord.getNextConsumerOffset()); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); + mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, record); } } - return; + } else { + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, topic.getMessageQueue().getLatestOffset() + 1); + mqttContext.getEventBus().publish(EventType.SUBSCRIBE_TOPIC, EventObject.newEventObject(this, record)); + subscriberGroup.addSubscriber(record); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); } - //以当前消息队列的最新点位为起始点位 - TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, topic.getMessageQueue().getLatestOffset() + 1); - mqttContext.getEventBus().publish(EventType.SUBSCRIBE_TOPIC, subscription); - subscription.setTopicFilterToken(topicToken); - topic.getConsumeOffsets().put(MqttSession.this, subscription); - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, subscription); } public void resubscribe() { - subscribers.values().stream().filter(subscriber -> subscriber.getTopicFilterToken().isWildcards()).forEach(subscriber -> { - mqttContext.getPublishTopicTree().match(subscriber.getTopicFilterToken(), topic -> subscribeSuccess(subscriber.getMqttQoS(), subscriber.getTopicFilterToken(), topic)); - }); + subscribers.values().stream().filter(subscriber -> subscriber.getTopicFilterToken().isWildcards()).forEach(subscriber -> mqttContext.getPublishTopicTree().match(subscriber.getTopicFilterToken(), topic -> subscribeSuccess(subscriber, topic))); } public void unsubscribe(String topicFilter) { - TopicFilterSubscriber filterSubscriber = subscribers.remove(topicFilter); + //移除当前Session的映射关系 + SubscribeTopic filterSubscriber = subscribers.remove(topicFilter); if (filterSubscriber == null) { return; } - filterSubscriber.getTopicSubscribers().values().forEach(subscriber -> { - TopicSubscriber removeSubscriber = subscriber.getTopic().getConsumeOffsets().remove(this); - if (subscriber == removeSubscriber) { - removeSubscriber.disable(); - mqttContext.getEventBus().publish(EventType.UNSUBSCRIBE_TOPIC, removeSubscriber); - LOGGER.debug("remove subscriber:{} success!", subscriber.getTopic().getTopic()); + //移除关联Broker中的映射关系 + filterSubscriber.getTopicSubscribers().forEach((brokerTopic, subscriber) -> { + SubscriberGroup subscriberGroup = brokerTopic.getSubscriberGroup(filterSubscriber.getTopicFilterToken()); + TopicConsumerRecord consumerRecord = subscriberGroup.removeSubscriber(this); + if (subscriber == consumerRecord) { + consumerRecord.disable(); + mqttContext.getEventBus().publish(EventType.UNSUBSCRIBE_TOPIC, consumerRecord); + LOGGER.debug("remove subscriber:{} success!", brokerTopic.getTopic()); } else { - LOGGER.error("remove subscriber:{} error!", removeSubscriber); + LOGGER.error("remove subscriber:{} error!", subscriberGroup); } }); mqttContext.getTopicSubscribeTree().unsubscribe(this, filterSubscriber); @@ -256,7 +266,7 @@ public class MqttSession extends AbstractSession { return mqttContext; } - public Map getSubscribers() { + public Map getSubscribers() { return subscribers; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/SubscribeTopic.java similarity index 79% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/SubscribeTopic.java index a36e4dda..589d3ecb 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/SubscribeTopic.java @@ -11,7 +11,7 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.topic.BrokerTopic; -import org.smartboot.mqtt.broker.topic.TopicSubscriber; +import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -24,7 +24,7 @@ import java.util.Map; * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/7/13 */ -public class TopicFilterSubscriber { +public class SubscribeTopic { private final TopicToken topicFilterToken; private MqttQoS mqttQoS; @@ -32,9 +32,9 @@ public class TopicFilterSubscriber { /** * 客户端订阅所匹配的Topic。通配符订阅时可能有多个 */ - private final Map topicSubscribers = new HashMap<>(); + private final Map topicSubscribers = new HashMap<>(); - TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { + SubscribeTopic(TopicToken topicFilterToken, MqttQoS mqttQoS) { this.topicFilterToken = topicFilterToken; this.mqttQoS = mqttQoS; } @@ -51,7 +51,8 @@ public class TopicFilterSubscriber { this.mqttQoS = mqttQoS; } - public Map getTopicSubscribers() { + public Map getTopicSubscribers() { return topicSubscribers; } } + diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java index a9b9452f..b2cf83dd 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java @@ -14,7 +14,7 @@ import org.smartboot.mqtt.broker.BrokerConfigure; import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.topic.BrokerTopic; -import org.smartboot.mqtt.broker.topic.TopicSubscriber; +import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; @@ -79,17 +79,17 @@ public class EventType { /** * 客户端订阅Topic */ - public static final EventType SUBSCRIBE_TOPIC = new EventType<>("subscribeTopic"); + public static final EventType> SUBSCRIBE_TOPIC = new EventType<>("subscribeTopic"); /** * 客户端取消订阅Topic */ - public static final EventType UNSUBSCRIBE_TOPIC = new EventType<>("unsubscribe_topic"); + public static final EventType UNSUBSCRIBE_TOPIC = new EventType<>("unsubscribe_topic"); /** * 客户端订阅Topic */ - public static final EventType SUBSCRIBE_REFRESH_TOPIC = new EventType<>("subscribe_refresh_topic"); + public static final EventType SUBSCRIBE_REFRESH_TOPIC = new EventType<>("subscribe_refresh_topic"); /** * 客户端连接请求 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index e0fd08f7..29ffcd0d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -16,7 +16,6 @@ import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.topic.BrokerTopic; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.enums.MqttReasonCode; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; @@ -77,9 +76,10 @@ public class PublishProcessor extends AuthorizedMqttProcessor consumeOffsets = new ConcurrentHashMap<>(); + private final Map subscribers = new ConcurrentHashMap<>(); /** * 当前Topic是否圈闭推送完成 */ @@ -47,7 +48,7 @@ public class BrokerTopic { private final AsyncTask asyncTask = new AsyncTask() { @Override public void execute() { - TopicSubscriber subscriber; + AbstractConsumerRecord subscriber; int i = 0; while (++i < 1000 && (subscriber = queue.poll()) != null) { try { @@ -72,8 +73,7 @@ public class BrokerTopic { /** * 当前Topic处于监听状态的订阅者 */ - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); public BrokerTopic(String topic) { this(topic, null); @@ -84,11 +84,21 @@ public class BrokerTopic { this.executorService = executorService; } - public Map getConsumeOffsets() { - return consumeOffsets; + + public SubscriberGroup getSubscriberGroup(TopicToken topicToken) { + if (topicToken.isShared()) { + return subscribers.computeIfAbsent(topicToken.getTopicFilter(), s -> new SubscriberSharedGroup(topicToken, BrokerTopic.this)); + } else { + return defaultGroup; + } } - public void addSubscriber(TopicSubscriber subscriber) { + public void removeShareGroup(String topicFilter) { + subscribers.remove(topicFilter); + } + + + public void addSubscriber(AbstractConsumerRecord subscriber) { queue.offer(subscriber); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java new file mode 100644 index 00000000..ecb5978e --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.MqttSession; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SubscriberGroup { + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberGroup.class); + + protected final Map subscribers = new ConcurrentHashMap<>(); + + + /** + * 最近一次订阅时间 + */ + private final long latestSubscribeTime = System.currentTimeMillis(); + + + public long getLatestSubscribeTime() { + return latestSubscribeTime; + } + + public AbstractConsumerRecord getSubscriber(MqttSession session) { + return subscribers.get(session); + } + + public TopicConsumerRecord removeSubscriber(MqttSession session) { + return subscribers.remove(session); + } + + public void addSubscriber(TopicConsumerRecord subscriber) { + subscribers.put(subscriber.getMqttSession(), subscriber); + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java new file mode 100644 index 00000000..823a756a --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.common.TopicToken; + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class SubscriberSharedGroup extends SubscriberGroup { + private final TopicConsumerOrderShareRecord record; + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + public SubscriberSharedGroup(TopicToken topicFilterToken, BrokerTopic brokerTopic) { + record = new TopicConsumerOrderShareRecord(brokerTopic, topicFilterToken, queue); + } + + @Override + public void addSubscriber(TopicConsumerRecord subscriber) { + super.addSubscriber(subscriber); + queue.add(subscriber); + } + + @Override + public AbstractConsumerRecord getSubscriber(MqttSession session) { + return record; + } + + @Override + public TopicConsumerRecord removeSubscriber(MqttSession session) { + TopicConsumerRecord consumerRecord = super.removeSubscriber(session); + if (subscribers.isEmpty()) { + record.disable(); + record.topic.removeShareGroup(record.getTopicFilterToken().getTopicFilter()); + } + return consumerRecord; + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java new file mode 100644 index 00000000..68939eea --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.smartboot.mqtt.broker.eventbus.messagebus.Message; +import org.smartboot.mqtt.common.TopicToken; +import org.smartboot.mqtt.common.enums.MqttQoS; +import org.smartboot.mqtt.common.enums.MqttVersion; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; +import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; +import org.smartboot.mqtt.common.util.MqttMessageBuilders; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * 顺序共享订阅 + */ +class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { + private final ConcurrentLinkedQueue queue; + + private final AtomicBoolean semaphore = new AtomicBoolean(false); + + public TopicConsumerOrderShareRecord(BrokerTopic topic, TopicToken topicFilterToken, ConcurrentLinkedQueue queue) { + super(topic, topicFilterToken, topic.getMessageQueue().getLatestOffset() + 1); + this.queue = queue; + topic.addSubscriber(this); + } + + @Override + public void pushToClient() { + if (semaphore.compareAndSet(false, true)) { + push0(); + } + } + + public void push0() { + int i = 10000; + while (i-- > 0) { + Message message = topic.getMessageQueue().get(nextConsumerOffset); + if (message == null) { + if (semaphore.compareAndSet(true, false)) { + topic.addSubscriber(this); + if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { + topic.push(); + } + } + return; + } + TopicConsumerRecord record = queue.poll(); + if (record == null) { + if (semaphore.compareAndSet(true, false)) { + if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { + topic.addSubscriber(this); + topic.push(); + } + } + return; + } + + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(record.getMqttQoS()).topicName(message.getTopic()); + if (record.getMqttSession().getMqttVersion() == MqttVersion.MQTT_5) { + publishBuilder.publishProperties(new PublishProperties()); + } + + //Qos0直接发送 + if (record.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { + nextConsumerOffset++; + record.getMqttSession().write(publishBuilder.build()); + queue.offer(record); + continue; + } + + CompletableFuture> future = record.getMqttSession().getInflightQueue().offer(publishBuilder, () -> { + queue.offer(record); + }); + if (future != null) { + nextConsumerOffset++; + queue.offer(record); + } + } + if (semaphore.compareAndSet(true, false)) { + topic.addSubscriber(this); + if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { + topic.push(); + } + } + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java similarity index 64% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscriber.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java index 28c06a3e..e4b329fc 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java @@ -13,8 +13,8 @@ package org.smartboot.mqtt.broker.topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.broker.SubscribeTopic; import org.smartboot.mqtt.broker.eventbus.messagebus.Message; -import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; @@ -31,45 +31,24 @@ import java.util.concurrent.atomic.AtomicBoolean; * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/3/25 */ -public class TopicSubscriber { - private static final Logger LOGGER = LoggerFactory.getLogger(TopicSubscriber.class); +public class TopicConsumerRecord extends AbstractConsumerRecord { + private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumerRecord.class); private final MqttSession mqttSession; - /** - * 定义消息主题 - */ - private final BrokerTopic topic; - /** - * 服务端向客户端发送应用消息所允许的最大 QoS 等级 - */ - private MqttQoS mqttQoS; - - /** - * 期望消费的点位 - */ - private long nextConsumerOffset; - - /** - * 最近一次订阅时间 - */ - private final long latestSubscribeTime = System.currentTimeMillis(); - - private TopicToken topicFilterToken; - private final AtomicBoolean semaphore = new AtomicBoolean(false); + protected final AtomicBoolean semaphore = new AtomicBoolean(false); - private boolean enable = true; + private final SubscribeTopic subscribeTopic; - public TopicSubscriber(BrokerTopic topic, MqttSession session, MqttQoS mqttQoS, long nextConsumerOffset) { - this.topic = topic; + public TopicConsumerRecord(BrokerTopic topic, MqttSession session, SubscribeTopic subscribeTopic, long nextConsumerOffset) { + super(topic, subscribeTopic.getTopicFilterToken(), nextConsumerOffset); this.mqttSession = session; - this.mqttQoS = mqttQoS; - this.nextConsumerOffset = nextConsumerOffset; + this.subscribeTopic = subscribeTopic; } /** * 推送消息到客户端 */ - void pushToClient() { + public void pushToClient() { if (mqttSession.isDisconnect() || !enable) { return; } @@ -91,14 +70,14 @@ public class TopicSubscriber { return; } - MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(mqttQoS).topicName(message.getTopic()); + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(subscribeTopic.getMqttQoS()).topicName(message.getTopic()); if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } nextConsumerOffset = message.getOffset() + 1; //Qos0直接发送 - if (mqttQoS == MqttQoS.AT_MOST_ONCE) { + if (subscribeTopic.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); push0(); return; @@ -106,7 +85,7 @@ public class TopicSubscriber { CompletableFuture> future = mqttSession.getInflightQueue().offer(publishBuilder, () -> { if (semaphore.compareAndSet(true, false)) { - topic.addSubscriber(TopicSubscriber.this); + topic.addSubscriber(this); } topic.push(); }); @@ -118,35 +97,11 @@ public class TopicSubscriber { push0(); } - public BrokerTopic getTopic() { - return topic; - } - public MqttSession getMqttSession() { return mqttSession; } public MqttQoS getMqttQoS() { - return mqttQoS; - } - - public long getLatestSubscribeTime() { - return latestSubscribeTime; - } - - public TopicToken getTopicFilterToken() { - return topicFilterToken; - } - - public void setTopicFilterToken(TopicToken topicFilterToken) { - this.topicFilterToken = topicFilterToken; - } - - public void disable() { - this.enable = false; - } - - public void setMqttQoS(MqttQoS mqttQoS) { - this.mqttQoS = mqttQoS; + return subscribeTopic.getMqttQoS(); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java index 26e241ec..d7d45141 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -11,7 +11,7 @@ package org.smartboot.mqtt.broker.topic; import org.smartboot.mqtt.broker.MqttSession; -import org.smartboot.mqtt.broker.TopicFilterSubscriber; +import org.smartboot.mqtt.broker.SubscribeTopic; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.util.ValidateUtils; @@ -24,10 +24,13 @@ import java.util.function.BiConsumer; * @version V1.0 , 5/28/23 */ public class TopicSubscribeTree { - private final Map subscribers = new ConcurrentHashMap<>(); + private final Map subscribers = new ConcurrentHashMap<>(); private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); - public void subscribeTopic(MqttSession session, TopicFilterSubscriber subscriber) { + /** + * 将此订阅注册到订阅树 + */ + public void subscribeTopic(MqttSession session, SubscribeTopic subscriber) { TopicSubscribeTree treeNode = this; TopicToken token = subscriber.getTopicFilterToken(); do { @@ -36,7 +39,7 @@ public class TopicSubscribeTree { treeNode.subscribers.put(session, subscriber); } - public void unsubscribe(MqttSession session, TopicFilterSubscriber subscriber) { + public void unsubscribe(MqttSession session, SubscribeTopic subscriber) { TopicSubscribeTree subscribeTree = this; TopicToken topicToken = subscriber.getTopicFilterToken(); while (true) { @@ -49,18 +52,26 @@ public class TopicSubscribeTree { subscribeTree.subscribers.remove(session); } - public void match(BrokerTopic topicToken, BiConsumer consumer) { - match(topicToken.getTopicToken(), consumer); + /** + * 新增的Topic触发与订阅树匹配关系的刷新 + */ + public void refreshMatchRelation(BrokerTopic topicToken, BiConsumer consumer) { + //遍历共享订阅 + TopicSubscribeTree shareTree = subNode.get("$share"); + if (shareTree != null) { + shareTree.subNode.values().forEach(tree -> tree.match0(topicToken.getTopicToken(), consumer)); + } + match0(topicToken.getTopicToken(), consumer); } - private void match(TopicToken topicToken, BiConsumer consumer) { + private void match0(TopicToken topicToken, BiConsumer consumer) { //精确匹配 TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode()); if (subscribeTree != null) { if (topicToken.getNextNode() == null) { subscribers.forEach(consumer); } else { - subscribeTree.match(topicToken.getNextNode(), consumer); + subscribeTree.match0(topicToken.getNextNode(), consumer); } } subscribeTree = subNode.get("#"); @@ -74,7 +85,7 @@ public class TopicSubscribeTree { if (topicToken.getNextNode() == null) { subscribers.forEach(consumer); } else { - subscribeTree.subNode.values().forEach(t -> match(topicToken.getNextNode(), consumer)); + subscribeTree.subNode.values().forEach(t -> match0(topicToken.getNextNode(), consumer)); } } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java index fc007c60..76d1b122 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java @@ -18,26 +18,31 @@ import org.smartboot.mqtt.common.util.ValidateUtils; */ public class TopicToken { private final String node; - private String topicFilter; - private TopicToken nextNode; + private final String topicFilter; + private final TopicToken nextNode; public TopicToken(String node) { this(node, 0); - this.topicFilter = node; } - public TopicToken(String node, int offset) { + TopicToken(String node, int offset) { int index = node.indexOf('/', offset); if (index == -1) { this.node = node.substring(offset); ValidateUtils.isTrue(this.node.indexOf('#') == -1 || this.node.length() == 1, "invalid topic filter"); ValidateUtils.isTrue(this.node.indexOf('+') == -1 || this.node.length() == 1, "invalid topic filter"); + this.nextNode = null; } else { this.node = node.substring(offset, index); ValidateUtils.isTrue(this.node.indexOf('#') == -1, "invalid topic filter"); ValidateUtils.isTrue(this.node.indexOf('+') == -1 || this.node.length() == 1, "invalid topic filter"); this.nextNode = new TopicToken(node, index + 1); } + if (offset == 0) { + this.topicFilter = node; + } else { + this.topicFilter = null; + } } public String getNode() { @@ -58,4 +63,8 @@ public class TopicToken { } return nextNode != null && nextNode.isWildcards(); } + + public boolean isShared() { + return this.node.equals("$share"); + } } -- Gitee From ba62c854048a9a515710c5cc54ff8b9078e8ac21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 11 Feb 2024 20:52:20 +0800 Subject: [PATCH 02/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/MqttBrokerMessageProcessor.java | 2 ++ .../smartboot/mqtt/broker/MqttSession.java | 1 + .../broker/processor/PublishProcessor.java | 1 + .../mqtt/broker/topic/BrokerTopic.java | 21 ++++++++++++++++--- .../topic/TopicConsumerOrderShareRecord.java | 4 +++- 5 files changed, 25 insertions(+), 4 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 0b034cd2..51ad428c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -71,6 +71,8 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor preToken.getTopicFilter().length()) { //解除旧的订阅关系 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index 29ffcd0d..ad5a3fb9 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -63,6 +63,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor queue = new ConcurrentLinkedQueue<>(); + private static final AbstractConsumerRecord BREAK = new AbstractConsumerRecord(null, null, -1) { + @Override + public void pushToClient() { + throw new UnsupportedOperationException(); + } + }; + public BrokerTopic(String topic) { this(topic, null); } @@ -137,6 +148,10 @@ public class BrokerTopic { } } + public LongAdder getVersion() { + return version; + } + public void disable() { this.enabled = false; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java index 68939eea..f04a4a6a 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -60,12 +60,14 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { TopicConsumerRecord record = queue.poll(); if (record == null) { if (semaphore.compareAndSet(true, false)) { + topic.addSubscriber(this); if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { - topic.addSubscriber(this); topic.push(); } } return; + } else if (!record.enable) { + continue; } MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(record.getMqttQoS()).topicName(message.getTopic()); -- Gitee From 9598497363f6c5039249a1c9d3b0ee7b70d8b7ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 12 Feb 2024 12:34:51 +0800 Subject: [PATCH 03/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/MqttSession.java | 55 +++++++++++-------- .../mqtt/broker/eventbus/EventType.java | 3 +- .../mqtt/broker/topic/SubscriberGroup.java | 2 +- .../broker/topic/SubscriberSharedGroup.java | 19 +++---- .../topic/TopicConsumerOrderShareRecord.java | 43 +++++++-------- 5 files changed, 63 insertions(+), 59 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 2dc868e1..f66562bc 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -195,34 +195,41 @@ public class MqttSession extends AbstractSession { } SubscriberGroup subscriberGroup = topic.getSubscriberGroup(subscribeTopic.getTopicFilterToken()); AbstractConsumerRecord consumerRecord = subscriberGroup.getSubscriber(this); - if (consumerRecord != null) { - //此前的订阅关系 - TopicToken preToken = consumerRecord.getTopicFilterToken(); - //此前为统配订阅或者未共享订阅,则更新订阅关系 - if (topicToken.isShared()) { - ValidateUtils.isTrue(preToken.getTopicFilter().equals(subscribeTopic.getTopicFilterToken().getTopicFilter()), "invalid subscriber"); - TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, topic.getMessageQueue().getLatestOffset() + 1); - subscriberGroup.addSubscriber(record); - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); - } else if (preToken.isWildcards()) { - if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) { - //解除旧的订阅关系 - TopicConsumerRecord preRecord = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); - ValidateUtils.isTrue(preRecord == consumerRecord, "invalid consumerRecord"); - preRecord.disable(); - - //绑定新的订阅关系 - TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, preRecord.getNextConsumerOffset()); - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); - mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, record); - } - } - } else { + //共享订阅不会为null + if (consumerRecord == null) { TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, topic.getMessageQueue().getLatestOffset() + 1); mqttContext.getEventBus().publish(EventType.SUBSCRIBE_TOPIC, EventObject.newEventObject(this, record)); subscriberGroup.addSubscriber(record); subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); + return; + } + //此前的订阅关系 + TopicToken preToken = consumerRecord.getTopicFilterToken(); + //此前为统配订阅或者未共享订阅,则更新订阅关系 + if (topicToken.isShared()) { + ValidateUtils.isTrue(preToken.getTopicFilter().equals(subscribeTopic.getTopicFilterToken().getTopicFilter()), "invalid subscriber"); + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, topic.getMessageQueue().getLatestOffset() + 1) { + @Override + public void pushToClient() { + throw new IllegalStateException(); + } + }; + subscriberGroup.addSubscriber(record); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); + } else if (preToken.isWildcards()) { + if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) { + //解除旧的订阅关系 + TopicConsumerRecord preRecord = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); + ValidateUtils.isTrue(preRecord == consumerRecord, "invalid consumerRecord"); + preRecord.disable(); + + //绑定新的订阅关系 + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, preRecord.getNextConsumerOffset()); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); + mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, record); + } } + } public void resubscribe() { @@ -238,7 +245,7 @@ public class MqttSession extends AbstractSession { //移除关联Broker中的映射关系 filterSubscriber.getTopicSubscribers().forEach((brokerTopic, subscriber) -> { SubscriberGroup subscriberGroup = brokerTopic.getSubscriberGroup(filterSubscriber.getTopicFilterToken()); - TopicConsumerRecord consumerRecord = subscriberGroup.removeSubscriber(this); + AbstractConsumerRecord consumerRecord = subscriberGroup.removeSubscriber(this); if (subscriber == consumerRecord) { consumerRecord.disable(); mqttContext.getEventBus().publish(EventType.UNSUBSCRIBE_TOPIC, consumerRecord); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java index b2cf83dd..bba1455e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java @@ -13,6 +13,7 @@ package org.smartboot.mqtt.broker.eventbus; import org.smartboot.mqtt.broker.BrokerConfigure; import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.broker.topic.AbstractConsumerRecord; import org.smartboot.mqtt.broker.topic.BrokerTopic; import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.common.AbstractSession; @@ -84,7 +85,7 @@ public class EventType { /** * 客户端取消订阅Topic */ - public static final EventType UNSUBSCRIBE_TOPIC = new EventType<>("unsubscribe_topic"); + public static final EventType UNSUBSCRIBE_TOPIC = new EventType<>("unsubscribe_topic"); /** * 客户端订阅Topic diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java index ecb5978e..7263776b 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java @@ -37,7 +37,7 @@ public class SubscriberGroup { return subscribers.get(session); } - public TopicConsumerRecord removeSubscriber(MqttSession session) { + public AbstractConsumerRecord removeSubscriber(MqttSession session) { return subscribers.remove(session); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java index 823a756a..974f801b 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java @@ -13,30 +13,27 @@ package org.smartboot.mqtt.broker.topic; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.common.TopicToken; -import java.util.concurrent.ConcurrentLinkedQueue; - public class SubscriberSharedGroup extends SubscriberGroup { private final TopicConsumerOrderShareRecord record; - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); public SubscriberSharedGroup(TopicToken topicFilterToken, BrokerTopic brokerTopic) { - record = new TopicConsumerOrderShareRecord(brokerTopic, topicFilterToken, queue); + record = new TopicConsumerOrderShareRecord(brokerTopic, topicFilterToken); } @Override - public void addSubscriber(TopicConsumerRecord subscriber) { - super.addSubscriber(subscriber); - queue.add(subscriber); + public AbstractConsumerRecord getSubscriber(MqttSession session) { + return record; } @Override - public AbstractConsumerRecord getSubscriber(MqttSession session) { - return record; + public void addSubscriber(TopicConsumerRecord subscriber) { + super.addSubscriber(subscriber); + record.getQueue().offer(subscriber); } @Override - public TopicConsumerRecord removeSubscriber(MqttSession session) { - TopicConsumerRecord consumerRecord = super.removeSubscriber(session); + public AbstractConsumerRecord removeSubscriber(MqttSession session) { + AbstractConsumerRecord consumerRecord = super.removeSubscriber(session); if (subscribers.isEmpty()) { record.disable(); record.topic.removeShareGroup(record.getTopicFilterToken().getTopicFilter()); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java index f04a4a6a..c6945c2d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -10,6 +10,8 @@ package org.smartboot.mqtt.broker.topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.eventbus.messagebus.Message; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -27,20 +29,32 @@ import java.util.concurrent.atomic.AtomicBoolean; * 顺序共享订阅 */ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { - private final ConcurrentLinkedQueue queue; + private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumerOrderShareRecord.class); + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean semaphore = new AtomicBoolean(false); - public TopicConsumerOrderShareRecord(BrokerTopic topic, TopicToken topicFilterToken, ConcurrentLinkedQueue queue) { + public TopicConsumerOrderShareRecord(BrokerTopic topic, TopicToken topicFilterToken) { super(topic, topicFilterToken, topic.getMessageQueue().getLatestOffset() + 1); - this.queue = queue; topic.addSubscriber(this); } + public ConcurrentLinkedQueue getQueue() { + return queue; + } + @Override public void pushToClient() { if (semaphore.compareAndSet(false, true)) { - push0(); + try { + push0(); + } finally { + semaphore.set(false); + topic.addSubscriber(this); + if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { + topic.push(); + } + } } } @@ -49,22 +63,10 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { while (i-- > 0) { Message message = topic.getMessageQueue().get(nextConsumerOffset); if (message == null) { - if (semaphore.compareAndSet(true, false)) { - topic.addSubscriber(this); - if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { - topic.push(); - } - } return; } TopicConsumerRecord record = queue.poll(); if (record == null) { - if (semaphore.compareAndSet(true, false)) { - topic.addSubscriber(this); - if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { - topic.push(); - } - } return; } else if (!record.enable) { continue; @@ -80,6 +82,7 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { nextConsumerOffset++; record.getMqttSession().write(publishBuilder.build()); queue.offer(record); + LOGGER.info("publish share subscribe"); continue; } @@ -88,14 +91,10 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { }); if (future != null) { nextConsumerOffset++; + record.getMqttSession().flush(); queue.offer(record); } } - if (semaphore.compareAndSet(true, false)) { - topic.addSubscriber(this); - if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { - topic.push(); - } - } + } } -- Gitee From e508aacb2fdb0a168acb024d22330d01dfda3433 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 12 Feb 2024 16:44:38 +0800 Subject: [PATCH 04/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/topic/AbstractConsumerRecord.java | 2 +- .../mqtt/broker/topic/TopicConsumerOrderShareRecord.java | 2 +- .../org/smartboot/mqtt/broker/topic/TopicPublishTree.java | 6 +++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/AbstractConsumerRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/AbstractConsumerRecord.java index 353b940b..606e57f4 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/AbstractConsumerRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/AbstractConsumerRecord.java @@ -34,7 +34,7 @@ public abstract class AbstractConsumerRecord { */ private final long latestSubscribeTime = System.currentTimeMillis(); - private final TopicToken topicFilterToken; + protected final TopicToken topicFilterToken; protected boolean enable = true; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java index c6945c2d..53adf755 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -82,7 +82,7 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { nextConsumerOffset++; record.getMqttSession().write(publishBuilder.build()); queue.offer(record); - LOGGER.info("publish share subscribe"); + LOGGER.debug("publish share subscribe:{}", topicFilterToken.getTopicFilter()); continue; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java index 6d0125af..7d8a9a33 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java @@ -38,7 +38,11 @@ public class TopicPublishTree { } public void match(TopicToken topicToken, Consumer consumer) { - match(this, topicToken, consumer); + if (topicToken.isShared()) { + match(this, topicToken.getNextNode().getNextNode(), consumer); + } else { + match(this, topicToken, consumer); + } } private void match(TopicPublishTree treeNode, TopicToken topicToken, Consumer consumer) { -- Gitee From 874c549c1fc06ef5bfb0b1d053ecefe1f9e93358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 12 Feb 2024 17:08:11 +0800 Subject: [PATCH 05/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/topic/TopicConsumerOrderShareRecord.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java index 53adf755..4e495311 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -68,7 +68,7 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { TopicConsumerRecord record = queue.poll(); if (record == null) { return; - } else if (!record.enable) { + } else if (!record.enable || record.getMqttSession().isDisconnect()) { continue; } -- Gitee From 67082cca32369b7a712e50f0a782595332b8c498 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 12 Feb 2024 21:01:33 +0800 Subject: [PATCH 06/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/MqttSession.java | 31 +++++++++---------- ...bscribeTopic.java => TopicSubscriber.java} | 6 ++-- .../topic/TopicConsumerOrderShareRecord.java | 2 +- .../broker/topic/TopicConsumerRecord.java | 16 +++++----- .../mqtt/broker/topic/TopicSubscribeTree.java | 12 +++---- 5 files changed, 33 insertions(+), 34 deletions(-) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{SubscribeTopic.java => TopicSubscriber.java} (89%) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index f66562bc..8b48836e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -49,7 +49,7 @@ public class MqttSession extends AbstractSession { /** * 当前连接订阅的Topic的消费信息 */ - private final Map subscribers = new ConcurrentHashMap<>(); + private final Map subscribers = new ConcurrentHashMap<>(); private final BrokerContext mqttContext; private String username; @@ -171,33 +171,32 @@ public class MqttSession extends AbstractSession { } private void subscribe0(String topicFilter, MqttQoS mqttQoS) { - SubscribeTopic subscriber = subscribers.get(topicFilter); + TopicSubscriber preSubscriber = subscribers.get(topicFilter); //订阅topic已存在,可能只是更新了Qos - if (subscriber != null) { - subscriber.setMqttQoS(mqttQoS); + if (preSubscriber != null) { + preSubscriber.setMqttQoS(mqttQoS); return; } TopicToken topicToken = new TopicToken(topicFilter); if (!topicToken.isWildcards()) { mqttContext.getOrCreateTopic(topicFilter); } - SubscribeTopic newSubscriber = new SubscribeTopic(topicToken, mqttQoS); - SubscribeTopic preSubscriber = subscribers.put(topicFilter, newSubscriber); - ValidateUtils.isTrue(preSubscriber == null, "duplicate topic filter"); + TopicSubscriber newSubscriber = new TopicSubscriber(topicToken, mqttQoS); + ValidateUtils.isTrue(subscribers.put(topicFilter, newSubscriber) == null, "duplicate topic filter"); mqttContext.getTopicSubscribeTree().subscribeTopic(this, newSubscriber); mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(newSubscriber, topic)); } - public void subscribeSuccess(SubscribeTopic subscribeTopic, BrokerTopic topic) { - TopicToken topicToken = subscribeTopic.getTopicFilterToken(); + public void subscribeSuccess(TopicSubscriber topicSubscriber, BrokerTopic topic) { + TopicToken topicToken = topicSubscriber.getTopicFilterToken(); if (!mqttContext.getProviders().getSubscribeProvider().matchTopic(topic, this)) { return; } - SubscriberGroup subscriberGroup = topic.getSubscriberGroup(subscribeTopic.getTopicFilterToken()); + SubscriberGroup subscriberGroup = topic.getSubscriberGroup(topicSubscriber.getTopicFilterToken()); AbstractConsumerRecord consumerRecord = subscriberGroup.getSubscriber(this); //共享订阅不会为null if (consumerRecord == null) { - TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, topic.getMessageQueue().getLatestOffset() + 1); + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, topicSubscriber, topic.getMessageQueue().getLatestOffset() + 1); mqttContext.getEventBus().publish(EventType.SUBSCRIBE_TOPIC, EventObject.newEventObject(this, record)); subscriberGroup.addSubscriber(record); subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); @@ -207,8 +206,8 @@ public class MqttSession extends AbstractSession { TopicToken preToken = consumerRecord.getTopicFilterToken(); //此前为统配订阅或者未共享订阅,则更新订阅关系 if (topicToken.isShared()) { - ValidateUtils.isTrue(preToken.getTopicFilter().equals(subscribeTopic.getTopicFilterToken().getTopicFilter()), "invalid subscriber"); - TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, topic.getMessageQueue().getLatestOffset() + 1) { + ValidateUtils.isTrue(preToken.getTopicFilter().equals(topicSubscriber.getTopicFilterToken().getTopicFilter()), "invalid subscriber"); + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, topicSubscriber, topic.getMessageQueue().getLatestOffset() + 1) { @Override public void pushToClient() { throw new IllegalStateException(); @@ -224,7 +223,7 @@ public class MqttSession extends AbstractSession { preRecord.disable(); //绑定新的订阅关系 - TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, subscribeTopic, preRecord.getNextConsumerOffset()); + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, topicSubscriber, preRecord.getNextConsumerOffset()); subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, record); } @@ -238,7 +237,7 @@ public class MqttSession extends AbstractSession { public void unsubscribe(String topicFilter) { //移除当前Session的映射关系 - SubscribeTopic filterSubscriber = subscribers.remove(topicFilter); + TopicSubscriber filterSubscriber = subscribers.remove(topicFilter); if (filterSubscriber == null) { return; } @@ -274,7 +273,7 @@ public class MqttSession extends AbstractSession { return mqttContext; } - public Map getSubscribers() { + public Map getSubscribers() { return subscribers; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/SubscribeTopic.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java similarity index 89% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/SubscribeTopic.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 589d3ecb..22d91482 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/SubscribeTopic.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -24,7 +24,7 @@ import java.util.Map; * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/7/13 */ -public class SubscribeTopic { +public class TopicSubscriber { private final TopicToken topicFilterToken; private MqttQoS mqttQoS; @@ -34,7 +34,7 @@ public class SubscribeTopic { */ private final Map topicSubscribers = new HashMap<>(); - SubscribeTopic(TopicToken topicFilterToken, MqttQoS mqttQoS) { + TopicSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { this.topicFilterToken = topicFilterToken; this.mqttQoS = mqttQoS; } @@ -51,7 +51,7 @@ public class SubscribeTopic { this.mqttQoS = mqttQoS; } - public Map getTopicSubscribers() { + Map getTopicSubscribers() { return topicSubscribers; } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java index 4e495311..47d05f6c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -82,7 +82,7 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { nextConsumerOffset++; record.getMqttSession().write(publishBuilder.build()); queue.offer(record); - LOGGER.debug("publish share subscribe:{}", topicFilterToken.getTopicFilter()); + LOGGER.debug("publish share subscribe:{} to {}", topicFilterToken.getTopicFilter(), record.getMqttSession().getClientId()); continue; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java index e4b329fc..cc624a15 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java @@ -13,7 +13,7 @@ package org.smartboot.mqtt.broker.topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.MqttSession; -import org.smartboot.mqtt.broker.SubscribeTopic; +import org.smartboot.mqtt.broker.TopicSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.Message; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; @@ -37,12 +37,12 @@ public class TopicConsumerRecord extends AbstractConsumerRecord { protected final AtomicBoolean semaphore = new AtomicBoolean(false); - private final SubscribeTopic subscribeTopic; + private final TopicSubscriber topicSubscriber; - public TopicConsumerRecord(BrokerTopic topic, MqttSession session, SubscribeTopic subscribeTopic, long nextConsumerOffset) { - super(topic, subscribeTopic.getTopicFilterToken(), nextConsumerOffset); + public TopicConsumerRecord(BrokerTopic topic, MqttSession session, TopicSubscriber topicSubscriber, long nextConsumerOffset) { + super(topic, topicSubscriber.getTopicFilterToken(), nextConsumerOffset); this.mqttSession = session; - this.subscribeTopic = subscribeTopic; + this.topicSubscriber = topicSubscriber; } /** @@ -70,14 +70,14 @@ public class TopicConsumerRecord extends AbstractConsumerRecord { return; } - MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(subscribeTopic.getMqttQoS()).topicName(message.getTopic()); + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(topicSubscriber.getMqttQoS()).topicName(message.getTopic()); if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } nextConsumerOffset = message.getOffset() + 1; //Qos0直接发送 - if (subscribeTopic.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { + if (topicSubscriber.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); push0(); return; @@ -102,6 +102,6 @@ public class TopicConsumerRecord extends AbstractConsumerRecord { } public MqttQoS getMqttQoS() { - return subscribeTopic.getMqttQoS(); + return topicSubscriber.getMqttQoS(); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java index d7d45141..89b0d30b 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -11,7 +11,7 @@ package org.smartboot.mqtt.broker.topic; import org.smartboot.mqtt.broker.MqttSession; -import org.smartboot.mqtt.broker.SubscribeTopic; +import org.smartboot.mqtt.broker.TopicSubscriber; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.util.ValidateUtils; @@ -24,13 +24,13 @@ import java.util.function.BiConsumer; * @version V1.0 , 5/28/23 */ public class TopicSubscribeTree { - private final Map subscribers = new ConcurrentHashMap<>(); + private final Map subscribers = new ConcurrentHashMap<>(); private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); /** * 将此订阅注册到订阅树 */ - public void subscribeTopic(MqttSession session, SubscribeTopic subscriber) { + public void subscribeTopic(MqttSession session, TopicSubscriber subscriber) { TopicSubscribeTree treeNode = this; TopicToken token = subscriber.getTopicFilterToken(); do { @@ -39,7 +39,7 @@ public class TopicSubscribeTree { treeNode.subscribers.put(session, subscriber); } - public void unsubscribe(MqttSession session, SubscribeTopic subscriber) { + public void unsubscribe(MqttSession session, TopicSubscriber subscriber) { TopicSubscribeTree subscribeTree = this; TopicToken topicToken = subscriber.getTopicFilterToken(); while (true) { @@ -55,7 +55,7 @@ public class TopicSubscribeTree { /** * 新增的Topic触发与订阅树匹配关系的刷新 */ - public void refreshMatchRelation(BrokerTopic topicToken, BiConsumer consumer) { + public void refreshMatchRelation(BrokerTopic topicToken, BiConsumer consumer) { //遍历共享订阅 TopicSubscribeTree shareTree = subNode.get("$share"); if (shareTree != null) { @@ -64,7 +64,7 @@ public class TopicSubscribeTree { match0(topicToken.getTopicToken(), consumer); } - private void match0(TopicToken topicToken, BiConsumer consumer) { + private void match0(TopicToken topicToken, BiConsumer consumer) { //精确匹配 TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode()); if (subscribeTree != null) { -- Gitee From e98e3b0a15d00800f37a47cc74875a9d668806cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 13 Feb 2024 14:04:17 +0800 Subject: [PATCH 07/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/MqttSession.java | 23 ++++++----------- .../mqtt/broker/topic/BrokerTopic.java | 4 +-- .../topic/TopicConsumerOrderShareRecord.java | 25 +++++++++++-------- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 8b48836e..f7fc7270 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -52,7 +52,6 @@ public class MqttSession extends AbstractSession { private final Map subscribers = new ConcurrentHashMap<>(); private final BrokerContext mqttContext; - private String username; /** * 已授权 */ @@ -153,13 +152,6 @@ public class MqttSession extends AbstractSession { this.clientId = clientId; } - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) { if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) { @@ -239,6 +231,7 @@ public class MqttSession extends AbstractSession { //移除当前Session的映射关系 TopicSubscriber filterSubscriber = subscribers.remove(topicFilter); if (filterSubscriber == null) { + LOGGER.warn("unsubscribe waring! topic:{} is not exists", topicFilter); return; } //移除关联Broker中的映射关系 @@ -269,13 +262,13 @@ public class MqttSession extends AbstractSession { this.willMessage = willMessage; } - public BrokerContext getMqttContext() { - return mqttContext; - } - - public Map getSubscribers() { - return subscribers; - } +// public BrokerContext getMqttContext() { +// return mqttContext; +// } +// +// public Map getSubscribers() { +// return subscribers; +// } public long getLatestReceiveMessageTime() { return latestReceiveMessageTime; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java index 7e610ecb..8ad108a6 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java @@ -53,7 +53,7 @@ public class BrokerTopic { public void execute() { AbstractConsumerRecord subscriber; queue.offer(BREAK); - int preVersion = version.intValue(); + int mark = version.intValue(); while ((subscriber = queue.poll()) != BREAK) { try { subscriber.pushToClient(); @@ -62,7 +62,7 @@ public class BrokerTopic { } } semaphore.release(); - if (preVersion != version.intValue() && !queue.isEmpty()) { + if (mark != version.intValue() && !queue.isEmpty()) { push(); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java index 47d05f6c..26f44cf0 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -23,7 +23,7 @@ import org.smartboot.mqtt.common.util.MqttMessageBuilders; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; /** * 顺序共享订阅 @@ -32,7 +32,7 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumerOrderShareRecord.class); private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private final AtomicBoolean semaphore = new AtomicBoolean(false); + private final Semaphore semaphore = new Semaphore(1); public TopicConsumerOrderShareRecord(BrokerTopic topic, TopicToken topicFilterToken) { super(topic, topicFilterToken, topic.getMessageQueue().getLatestOffset() + 1); @@ -45,20 +45,21 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { @Override public void pushToClient() { - if (semaphore.compareAndSet(false, true)) { + if (semaphore.tryAcquire()) { try { push0(); } finally { - semaphore.set(false); - topic.addSubscriber(this); - if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { - topic.push(); - } + semaphore.release(); + } + topic.addSubscriber(this); + if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { + //触发下一轮推送 + topic.getVersion().increment(); } } } - public void push0() { + private void push0() { int i = 10000; while (i-- > 0) { Message message = topic.getMessageQueue().get(nextConsumerOffset); @@ -66,9 +67,12 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { return; } TopicConsumerRecord record = queue.poll(); + //共享订阅列表无可用通道 if (record == null) { return; - } else if (!record.enable || record.getMqttSession().isDisconnect()) { + } + + if (!record.enable || record.getMqttSession().isDisconnect()) { continue; } @@ -95,6 +99,5 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { queue.offer(record); } } - } } -- Gitee From ebc7b609209733c5776ef5ef2025b63e06b00192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 13 Feb 2024 16:13:08 +0800 Subject: [PATCH 08/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/broker/topic/BrokerTopic.java | 2 +- .../smartboot/mqtt/broker/topic/SubscriberGroup.java | 11 ----------- .../mqtt/broker/topic/SubscriberSharedGroup.java | 2 +- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java index 8ad108a6..54761db0 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/BrokerTopic.java @@ -104,7 +104,7 @@ public class BrokerTopic { } } - public void removeShareGroup(String topicFilter) { + void removeShareGroup(String topicFilter) { subscribers.remove(topicFilter); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java index 7263776b..66879f11 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java @@ -22,17 +22,6 @@ public class SubscriberGroup { protected final Map subscribers = new ConcurrentHashMap<>(); - - /** - * 最近一次订阅时间 - */ - private final long latestSubscribeTime = System.currentTimeMillis(); - - - public long getLatestSubscribeTime() { - return latestSubscribeTime; - } - public AbstractConsumerRecord getSubscriber(MqttSession session) { return subscribers.get(session); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java index 974f801b..351fbfee 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java @@ -13,7 +13,7 @@ package org.smartboot.mqtt.broker.topic; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.common.TopicToken; -public class SubscriberSharedGroup extends SubscriberGroup { +class SubscriberSharedGroup extends SubscriberGroup { private final TopicConsumerOrderShareRecord record; public SubscriberSharedGroup(TopicToken topicFilterToken, BrokerTopic brokerTopic) { -- Gitee From 6ddfebc5dd163872f28c3d67938544de4370651d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 14 Feb 2024 13:19:27 +0800 Subject: [PATCH 09/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/processor/PublishProcessor.java | 2 +- .../mqtt/broker/topic/BrokerTopic.java | 19 +++++++++++-------- .../topic/TopicConsumerOrderShareRecord.java | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index ad5a3fb9..2710c112 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -63,7 +63,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor subscribers = new ConcurrentHashMap<>(); + private final Map shareSubscribers = new ConcurrentHashMap<>(); /** * 当前Topic是否圈闭推送完成 */ @@ -46,14 +49,14 @@ public class BrokerTopic { private boolean enabled = true; - private final LongAdder version = new LongAdder(); + private final AtomicInteger version = new AtomicInteger(); private final AsyncTask asyncTask = new AsyncTask() { @Override public void execute() { AbstractConsumerRecord subscriber; queue.offer(BREAK); - int mark = version.intValue(); + int mark = version.get(); while ((subscriber = queue.poll()) != BREAK) { try { subscriber.pushToClient(); @@ -62,7 +65,7 @@ public class BrokerTopic { } } semaphore.release(); - if (mark != version.intValue() && !queue.isEmpty()) { + if (mark != version.get() && !queue.isEmpty()) { push(); } } @@ -98,14 +101,14 @@ public class BrokerTopic { public SubscriberGroup getSubscriberGroup(TopicToken topicToken) { if (topicToken.isShared()) { - return subscribers.computeIfAbsent(topicToken.getTopicFilter(), s -> new SubscriberSharedGroup(topicToken, BrokerTopic.this)); + return shareSubscribers.computeIfAbsent(topicToken.getTopicFilter(), s -> new SubscriberSharedGroup(topicToken, BrokerTopic.this)); } else { return defaultGroup; } } void removeShareGroup(String topicFilter) { - subscribers.remove(topicFilter); + shareSubscribers.remove(topicFilter); } @@ -148,7 +151,7 @@ public class BrokerTopic { } } - public LongAdder getVersion() { + public AtomicInteger getVersion() { return version; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java index 26f44cf0..32d14c2e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -54,7 +54,7 @@ class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { topic.addSubscriber(this); if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { //触发下一轮推送 - topic.getVersion().increment(); + topic.getVersion().incrementAndGet(); } } } -- Gitee From 2189065149591870fa7c51aa2b19bbc36295379d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 14 Feb 2024 13:25:35 +0800 Subject: [PATCH 10/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/broker/BrokerContextImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 9606d7d5..64727d46 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -174,9 +174,10 @@ public class BrokerContextImpl implements BrokerContext { eventBus.publish(EventType.BROKER_STARTED, this); //释放内存 configJson = null; - System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); - System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt"); + System.out.println(BrokerConfigure.BANNER + "\r\n ::smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); + System.out.println("Gitee: https://gitee.com/smartboot/smart-mqtt"); System.out.println("Github: https://github.com/smartboot/smart-mqtt"); + System.out.println("Document: https://smartboot.tech/smart-mqtt"); System.out.println("Support: zhengjunweimail@163.com"); if (StringUtils.isBlank(brokerConfigure.getHost())) { System.out.println("\uD83C\uDF89start smart-mqtt success! [port:" + brokerConfigure.getPort() + "]"); @@ -405,7 +406,7 @@ public class BrokerContextImpl implements BrokerContext { if (StringUtils.isBlank(brokerConfig)) { inputStream = BrokerContext.class.getClassLoader().getResourceAsStream("smart-mqtt.yaml"); - LOGGER.info("load smart-mqtt.yaml from classpath."); + LOGGER.debug("load smart-mqtt.yaml from classpath."); } else { inputStream = Files.newInputStream(Paths.get(brokerConfig)); LOGGER.info("load external yaml config."); -- Gitee From 19fb82305ffcab4a68fa1692cbfd644f54f1ec9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 14 Feb 2024 13:51:46 +0800 Subject: [PATCH 11/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 64727d46..5350f7ab 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -409,7 +409,7 @@ public class BrokerContextImpl implements BrokerContext { LOGGER.debug("load smart-mqtt.yaml from classpath."); } else { inputStream = Files.newInputStream(Paths.get(brokerConfig)); - LOGGER.info("load external yaml config."); + LOGGER.debug("load external yaml config."); } Yaml yaml = new Yaml(); Object object = yaml.load(inputStream); -- Gitee From 5fd08f9133f31566c592472365af4eea054134ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 14 Feb 2024 13:55:47 +0800 Subject: [PATCH 12/12] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- smart-mqtt-broker/src/main/resources/logback.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/smart-mqtt-broker/src/main/resources/logback.xml b/smart-mqtt-broker/src/main/resources/logback.xml index 03eecb5d..0929c482 100644 --- a/smart-mqtt-broker/src/main/resources/logback.xml +++ b/smart-mqtt-broker/src/main/resources/logback.xml @@ -11,7 +11,8 @@ - [%d{YYYY-MM-dd HH:mm:ss.SSS}] [%thread] %-5level [%logger{0}:%L] - %msg%n + + [%d{YYYY-MM-dd HH:mm:ss.SSS}] %-5level- %msg%n -- Gitee