diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 1acdf57ac7e4cbed2d8b6fe5cb9a26ae55c71d83..5350f7ab1183e6b244924ec253186f5c3212ded6 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -34,6 +34,7 @@ import org.smartboot.mqtt.broker.processor.SubscribeProcessor; import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.broker.topic.BrokerTopic; +import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.broker.topic.TopicPublishTree; import org.smartboot.mqtt.broker.topic.TopicSubscribeTree; import org.smartboot.mqtt.common.AsyncTask; @@ -173,9 +174,10 @@ public class BrokerContextImpl implements BrokerContext { eventBus.publish(EventType.BROKER_STARTED, this); //释放内存 configJson = null; - System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); - System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt"); + System.out.println(BrokerConfigure.BANNER + "\r\n ::smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); + System.out.println("Gitee: https://gitee.com/smartboot/smart-mqtt"); System.out.println("Github: https://github.com/smartboot/smart-mqtt"); + System.out.println("Document: https://smartboot.tech/smart-mqtt"); System.out.println("Support: zhengjunweimail@163.com"); if (StringUtils.isBlank(brokerConfigure.getHost())) { System.out.println("\uD83C\uDF89start smart-mqtt success! [port:" + brokerConfigure.getPort() + "]"); @@ -223,25 +225,26 @@ public class BrokerContextImpl implements BrokerContext { }); //一个新的订阅建立时,对每个匹配的主题名,如果存在最近保留的消息,它必须被发送给这个订阅者 - eventBus.subscribe(EventType.SUBSCRIBE_TOPIC, (eventType, subscriber) -> retainPushThreadPool.execute(new AsyncTask() { + eventBus.subscribe(EventType.SUBSCRIBE_TOPIC, (eventType, eventObject) -> retainPushThreadPool.execute(new AsyncTask() { @Override public void execute() { - BrokerTopic topic = subscriber.getTopic(); + TopicConsumerRecord consumerRecord = eventObject.getObject(); + BrokerTopic topic = consumerRecord.getTopic(); Message retainMessage = topic.getRetainMessage(); - if (retainMessage == null || retainMessage.getCreateTime() > subscriber.getLatestSubscribeTime()) { - topic.addSubscriber(subscriber); + if (retainMessage == null || retainMessage.getCreateTime() > consumerRecord.getLatestSubscribeTime()) { + topic.addSubscriber(consumerRecord); return; } - MqttSession session = subscriber.getMqttSession(); + MqttSession session = eventObject.getSession(); - MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(retainMessage.getPayload()).qos(subscriber.getMqttQoS()).topicName(retainMessage.getTopic()); + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(retainMessage.getPayload()).qos(consumerRecord.getMqttQoS()).topicName(retainMessage.getTopic()); if (session.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } // Qos0不走飞行窗口 - if (subscriber.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { + if (consumerRecord.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { session.write(publishBuilder.build()); - topic.addSubscriber(subscriber); + topic.addSubscriber(consumerRecord); return; } InflightQueue inflightQueue = session.getInflightQueue(); @@ -249,17 +252,17 @@ public class BrokerContextImpl implements BrokerContext { CompletableFuture> future = inflightQueue.offer(publishBuilder); future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { LOGGER.info("publish retain to client:{} success ", session.getClientId()); - topic.addSubscriber(subscriber); + topic.addSubscriber(consumerRecord); }); session.flush(); } })); - eventBus.subscribe(EventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object, (session, topicFilterSubscriber) -> { - if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) { + eventBus.subscribe(EventType.TOPIC_CREATE, (eventType, brokerTopic) -> subscribeTopicTree.refreshMatchRelation(brokerTopic, (session, topicFilterSubscriber) -> { + if (!providers.getSubscribeProvider().subscribeTopic(brokerTopic.getTopic(), session)) { return; } - session.subscribeSuccess(topicFilterSubscriber.getMqttQoS(), topicFilterSubscriber.getTopicFilterToken(), object); + session.subscribeSuccess(topicFilterSubscriber, brokerTopic); })); } @@ -403,10 +406,10 @@ public class BrokerContextImpl implements BrokerContext { if (StringUtils.isBlank(brokerConfig)) { inputStream = BrokerContext.class.getClassLoader().getResourceAsStream("smart-mqtt.yaml"); - LOGGER.info("load smart-mqtt.yaml from classpath."); + LOGGER.debug("load smart-mqtt.yaml from classpath."); } else { inputStream = Files.newInputStream(Paths.get(brokerConfig)); - LOGGER.info("load external yaml config."); + LOGGER.debug("load external yaml config."); } Yaml yaml = new Yaml(); Object object = yaml.load(inputStream); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 0b034cd21a62932cf68c2b338f960d651dea79ef..51ad428c7655a714758e33fbf45e2f8de6f67a2e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -71,6 +71,8 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor subscribers = new ConcurrentHashMap<>(); + private final Map subscribers = new ConcurrentHashMap<>(); private final BrokerContext mqttContext; - private String username; /** * 已授权 */ @@ -151,13 +152,6 @@ public class MqttSession extends AbstractSession { this.clientId = clientId; } - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) { if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) { @@ -169,71 +163,87 @@ public class MqttSession extends AbstractSession { } private void subscribe0(String topicFilter, MqttQoS mqttQoS) { - TopicFilterSubscriber subscriber = subscribers.get(topicFilter); - if (subscriber != null) { - subscriber.setMqttQoS(mqttQoS); - subscriber.getTopicSubscribers().values().forEach(sub -> sub.setMqttQoS(mqttQoS)); + TopicSubscriber preSubscriber = subscribers.get(topicFilter); + //订阅topic已存在,可能只是更新了Qos + if (preSubscriber != null) { + preSubscriber.setMqttQoS(mqttQoS); return; } TopicToken topicToken = new TopicToken(topicFilter); if (!topicToken.isWildcards()) { mqttContext.getOrCreateTopic(topicFilter); } - subscriber = new TopicFilterSubscriber(topicToken, mqttQoS); - TopicFilterSubscriber preSubscriber = subscribers.put(topicFilter, subscriber); - ValidateUtils.isTrue(preSubscriber == null, "duplicate topic filter"); - mqttContext.getTopicSubscribeTree().subscribeTopic(this, subscriber); - mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(mqttQoS, topicToken, topic)); + TopicSubscriber newSubscriber = new TopicSubscriber(topicToken, mqttQoS); + ValidateUtils.isTrue(subscribers.put(topicFilter, newSubscriber) == null, "duplicate topic filter"); + mqttContext.getTopicSubscribeTree().subscribeTopic(this, newSubscriber); + mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(newSubscriber, topic)); } - public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { + public void subscribeSuccess(TopicSubscriber topicSubscriber, BrokerTopic topic) { + TopicToken topicToken = topicSubscriber.getTopicFilterToken(); if (!mqttContext.getProviders().getSubscribeProvider().matchTopic(topic, this)) { return; } - TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this); - if (topicSubscriber != null) { - //此前的订阅关系 - TopicToken preToken = topicSubscriber.getTopicFilterToken(); - if (preToken.isWildcards()) { - if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) { - //解除旧的订阅关系 - TopicSubscriber preSubscription = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); - preSubscription.setMqttQoS(mqttQoS); - preSubscription.setTopicFilterToken(topicToken); - //绑定新的订阅关系 - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, preSubscription); - mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, preSubscription); + SubscriberGroup subscriberGroup = topic.getSubscriberGroup(topicSubscriber.getTopicFilterToken()); + AbstractConsumerRecord consumerRecord = subscriberGroup.getSubscriber(this); + //共享订阅不会为null + if (consumerRecord == null) { + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, topicSubscriber, topic.getMessageQueue().getLatestOffset() + 1); + mqttContext.getEventBus().publish(EventType.SUBSCRIBE_TOPIC, EventObject.newEventObject(this, record)); + subscriberGroup.addSubscriber(record); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); + return; + } + //此前的订阅关系 + TopicToken preToken = consumerRecord.getTopicFilterToken(); + //此前为统配订阅或者未共享订阅,则更新订阅关系 + if (topicToken.isShared()) { + ValidateUtils.isTrue(preToken.getTopicFilter().equals(topicSubscriber.getTopicFilterToken().getTopicFilter()), "invalid subscriber"); + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, topicSubscriber, topic.getMessageQueue().getLatestOffset() + 1) { + @Override + public void pushToClient() { + throw new IllegalStateException(); } + }; + subscriberGroup.addSubscriber(record); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); + } else if (preToken.isWildcards()) { + if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) { + //解除旧的订阅关系 + TopicConsumerRecord preRecord = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); + ValidateUtils.isTrue(preRecord == consumerRecord, "invalid consumerRecord"); + preRecord.disable(); + + //绑定新的订阅关系 + TopicConsumerRecord record = new TopicConsumerRecord(topic, MqttSession.this, topicSubscriber, preRecord.getNextConsumerOffset()); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, record); + mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, record); } - return; } - //以当前消息队列的最新点位为起始点位 - TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, topic.getMessageQueue().getLatestOffset() + 1); - mqttContext.getEventBus().publish(EventType.SUBSCRIBE_TOPIC, subscription); - subscription.setTopicFilterToken(topicToken); - topic.getConsumeOffsets().put(MqttSession.this, subscription); - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, subscription); + } public void resubscribe() { - subscribers.values().stream().filter(subscriber -> subscriber.getTopicFilterToken().isWildcards()).forEach(subscriber -> { - mqttContext.getPublishTopicTree().match(subscriber.getTopicFilterToken(), topic -> subscribeSuccess(subscriber.getMqttQoS(), subscriber.getTopicFilterToken(), topic)); - }); + subscribers.values().stream().filter(subscriber -> subscriber.getTopicFilterToken().isWildcards()).forEach(subscriber -> mqttContext.getPublishTopicTree().match(subscriber.getTopicFilterToken(), topic -> subscribeSuccess(subscriber, topic))); } public void unsubscribe(String topicFilter) { - TopicFilterSubscriber filterSubscriber = subscribers.remove(topicFilter); + //移除当前Session的映射关系 + TopicSubscriber filterSubscriber = subscribers.remove(topicFilter); if (filterSubscriber == null) { + LOGGER.warn("unsubscribe waring! topic:{} is not exists", topicFilter); return; } - filterSubscriber.getTopicSubscribers().values().forEach(subscriber -> { - TopicSubscriber removeSubscriber = subscriber.getTopic().getConsumeOffsets().remove(this); - if (subscriber == removeSubscriber) { - removeSubscriber.disable(); - mqttContext.getEventBus().publish(EventType.UNSUBSCRIBE_TOPIC, removeSubscriber); - LOGGER.debug("remove subscriber:{} success!", subscriber.getTopic().getTopic()); + //移除关联Broker中的映射关系 + filterSubscriber.getTopicSubscribers().forEach((brokerTopic, subscriber) -> { + SubscriberGroup subscriberGroup = brokerTopic.getSubscriberGroup(filterSubscriber.getTopicFilterToken()); + AbstractConsumerRecord consumerRecord = subscriberGroup.removeSubscriber(this); + if (subscriber == consumerRecord) { + consumerRecord.disable(); + mqttContext.getEventBus().publish(EventType.UNSUBSCRIBE_TOPIC, consumerRecord); + LOGGER.debug("remove subscriber:{} success!", brokerTopic.getTopic()); } else { - LOGGER.error("remove subscriber:{} error!", removeSubscriber); + LOGGER.error("remove subscriber:{} error!", subscriberGroup); } }); mqttContext.getTopicSubscribeTree().unsubscribe(this, filterSubscriber); @@ -252,13 +262,13 @@ public class MqttSession extends AbstractSession { this.willMessage = willMessage; } - public BrokerContext getMqttContext() { - return mqttContext; - } - - public Map getSubscribers() { - return subscribers; - } +// public BrokerContext getMqttContext() { +// return mqttContext; +// } +// +// public Map getSubscribers() { +// return subscribers; +// } public long getLatestReceiveMessageTime() { return latestReceiveMessageTime; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java similarity index 79% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index a36e4dda3f677d07895214b0785255eb86df48f9..22d91482f3db351216d96896772b11cbce8c9cd8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -11,7 +11,7 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.topic.BrokerTopic; -import org.smartboot.mqtt.broker.topic.TopicSubscriber; +import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -24,7 +24,7 @@ import java.util.Map; * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/7/13 */ -public class TopicFilterSubscriber { +public class TopicSubscriber { private final TopicToken topicFilterToken; private MqttQoS mqttQoS; @@ -32,9 +32,9 @@ public class TopicFilterSubscriber { /** * 客户端订阅所匹配的Topic。通配符订阅时可能有多个 */ - private final Map topicSubscribers = new HashMap<>(); + private final Map topicSubscribers = new HashMap<>(); - TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { + TopicSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { this.topicFilterToken = topicFilterToken; this.mqttQoS = mqttQoS; } @@ -51,7 +51,8 @@ public class TopicFilterSubscriber { this.mqttQoS = mqttQoS; } - public Map getTopicSubscribers() { + Map getTopicSubscribers() { return topicSubscribers; } } + diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java index a9b9452f7fcfda7b87645887d3db1534cdb43f0b..bba1455e1b1fbdac726572b3e639f6ff40fdb376 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/EventType.java @@ -13,8 +13,9 @@ package org.smartboot.mqtt.broker.eventbus; import org.smartboot.mqtt.broker.BrokerConfigure; import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.broker.topic.AbstractConsumerRecord; import org.smartboot.mqtt.broker.topic.BrokerTopic; -import org.smartboot.mqtt.broker.topic.TopicSubscriber; +import org.smartboot.mqtt.broker.topic.TopicConsumerRecord; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; @@ -79,17 +80,17 @@ public class EventType { /** * 客户端订阅Topic */ - public static final EventType SUBSCRIBE_TOPIC = new EventType<>("subscribeTopic"); + public static final EventType> SUBSCRIBE_TOPIC = new EventType<>("subscribeTopic"); /** * 客户端取消订阅Topic */ - public static final EventType UNSUBSCRIBE_TOPIC = new EventType<>("unsubscribe_topic"); + public static final EventType UNSUBSCRIBE_TOPIC = new EventType<>("unsubscribe_topic"); /** * 客户端订阅Topic */ - public static final EventType SUBSCRIBE_REFRESH_TOPIC = new EventType<>("subscribe_refresh_topic"); + public static final EventType SUBSCRIBE_REFRESH_TOPIC = new EventType<>("subscribe_refresh_topic"); /** * 客户端连接请求 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index e0fd08f78ade634e050adb4f2101ba07dd8ee370..2710c112bb294d43a2127fb3e2ebe0fd69d89581 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -16,7 +16,6 @@ import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.topic.BrokerTopic; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.enums.MqttReasonCode; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; @@ -64,6 +63,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor consumeOffsets = new ConcurrentHashMap<>(); + private final Map shareSubscribers = new ConcurrentHashMap<>(); /** * 当前Topic是否圈闭推送完成 */ @@ -44,12 +49,15 @@ public class BrokerTopic { private boolean enabled = true; + private final AtomicInteger version = new AtomicInteger(); + private final AsyncTask asyncTask = new AsyncTask() { @Override public void execute() { - TopicSubscriber subscriber; - int i = 0; - while (++i < 1000 && (subscriber = queue.poll()) != null) { + AbstractConsumerRecord subscriber; + queue.offer(BREAK); + int mark = version.get(); + while ((subscriber = queue.poll()) != BREAK) { try { subscriber.pushToClient(); } catch (Exception e) { @@ -57,7 +65,7 @@ public class BrokerTopic { } } semaphore.release(); - if (!queue.isEmpty()) { + if (mark != version.get() && !queue.isEmpty()) { push(); } } @@ -72,8 +80,14 @@ public class BrokerTopic { /** * 当前Topic处于监听状态的订阅者 */ - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private static final AbstractConsumerRecord BREAK = new AbstractConsumerRecord(null, null, -1) { + @Override + public void pushToClient() { + throw new UnsupportedOperationException(); + } + }; public BrokerTopic(String topic) { this(topic, null); @@ -84,11 +98,21 @@ public class BrokerTopic { this.executorService = executorService; } - public Map getConsumeOffsets() { - return consumeOffsets; + + public SubscriberGroup getSubscriberGroup(TopicToken topicToken) { + if (topicToken.isShared()) { + return shareSubscribers.computeIfAbsent(topicToken.getTopicFilter(), s -> new SubscriberSharedGroup(topicToken, BrokerTopic.this)); + } else { + return defaultGroup; + } + } + + void removeShareGroup(String topicFilter) { + shareSubscribers.remove(topicFilter); } - public void addSubscriber(TopicSubscriber subscriber) { + + public void addSubscriber(AbstractConsumerRecord subscriber) { queue.offer(subscriber); } @@ -127,6 +151,10 @@ public class BrokerTopic { } } + public AtomicInteger getVersion() { + return version; + } + public void disable() { this.enabled = false; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java new file mode 100644 index 0000000000000000000000000000000000000000..66879f111d10225ef694162d7d8bea757360ffeb --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberGroup.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.MqttSession; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SubscriberGroup { + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberGroup.class); + + protected final Map subscribers = new ConcurrentHashMap<>(); + + public AbstractConsumerRecord getSubscriber(MqttSession session) { + return subscribers.get(session); + } + + public AbstractConsumerRecord removeSubscriber(MqttSession session) { + return subscribers.remove(session); + } + + public void addSubscriber(TopicConsumerRecord subscriber) { + subscribers.put(subscriber.getMqttSession(), subscriber); + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java new file mode 100644 index 0000000000000000000000000000000000000000..351fbfee7ae5392443f44ab8742283611ed8202e --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/SubscriberSharedGroup.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.common.TopicToken; + +class SubscriberSharedGroup extends SubscriberGroup { + private final TopicConsumerOrderShareRecord record; + + public SubscriberSharedGroup(TopicToken topicFilterToken, BrokerTopic brokerTopic) { + record = new TopicConsumerOrderShareRecord(brokerTopic, topicFilterToken); + } + + @Override + public AbstractConsumerRecord getSubscriber(MqttSession session) { + return record; + } + + @Override + public void addSubscriber(TopicConsumerRecord subscriber) { + super.addSubscriber(subscriber); + record.getQueue().offer(subscriber); + } + + @Override + public AbstractConsumerRecord removeSubscriber(MqttSession session) { + AbstractConsumerRecord consumerRecord = super.removeSubscriber(session); + if (subscribers.isEmpty()) { + record.disable(); + record.topic.removeShareGroup(record.getTopicFilterToken().getTopicFilter()); + } + return consumerRecord; + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..32d14c2ebfba8b9d194ec6e961e2260e5cf0b773 --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerOrderShareRecord.java @@ -0,0 +1,103 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.eventbus.messagebus.Message; +import org.smartboot.mqtt.common.TopicToken; +import org.smartboot.mqtt.common.enums.MqttQoS; +import org.smartboot.mqtt.common.enums.MqttVersion; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; +import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; +import org.smartboot.mqtt.common.util.MqttMessageBuilders; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; + +/** + * 顺序共享订阅 + */ +class TopicConsumerOrderShareRecord extends AbstractConsumerRecord { + private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumerOrderShareRecord.class); + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + private final Semaphore semaphore = new Semaphore(1); + + public TopicConsumerOrderShareRecord(BrokerTopic topic, TopicToken topicFilterToken) { + super(topic, topicFilterToken, topic.getMessageQueue().getLatestOffset() + 1); + topic.addSubscriber(this); + } + + public ConcurrentLinkedQueue getQueue() { + return queue; + } + + @Override + public void pushToClient() { + if (semaphore.tryAcquire()) { + try { + push0(); + } finally { + semaphore.release(); + } + topic.addSubscriber(this); + if (topic.getMessageQueue().get(nextConsumerOffset) != null && !queue.isEmpty()) { + //触发下一轮推送 + topic.getVersion().incrementAndGet(); + } + } + } + + private void push0() { + int i = 10000; + while (i-- > 0) { + Message message = topic.getMessageQueue().get(nextConsumerOffset); + if (message == null) { + return; + } + TopicConsumerRecord record = queue.poll(); + //共享订阅列表无可用通道 + if (record == null) { + return; + } + + if (!record.enable || record.getMqttSession().isDisconnect()) { + continue; + } + + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(record.getMqttQoS()).topicName(message.getTopic()); + if (record.getMqttSession().getMqttVersion() == MqttVersion.MQTT_5) { + publishBuilder.publishProperties(new PublishProperties()); + } + + //Qos0直接发送 + if (record.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { + nextConsumerOffset++; + record.getMqttSession().write(publishBuilder.build()); + queue.offer(record); + LOGGER.debug("publish share subscribe:{} to {}", topicFilterToken.getTopicFilter(), record.getMqttSession().getClientId()); + continue; + } + + CompletableFuture> future = record.getMqttSession().getInflightQueue().offer(publishBuilder, () -> { + queue.offer(record); + }); + if (future != null) { + nextConsumerOffset++; + record.getMqttSession().flush(); + queue.offer(record); + } + } + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java similarity index 64% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscriber.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java index 28c06a3e6964814e7552739b62c53194c2f15862..cc624a155760cf18041036a87534773c43e552f5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicConsumerRecord.java @@ -13,8 +13,8 @@ package org.smartboot.mqtt.broker.topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.broker.TopicSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.Message; -import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; @@ -31,45 +31,24 @@ import java.util.concurrent.atomic.AtomicBoolean; * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/3/25 */ -public class TopicSubscriber { - private static final Logger LOGGER = LoggerFactory.getLogger(TopicSubscriber.class); +public class TopicConsumerRecord extends AbstractConsumerRecord { + private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumerRecord.class); private final MqttSession mqttSession; - /** - * 定义消息主题 - */ - private final BrokerTopic topic; - /** - * 服务端向客户端发送应用消息所允许的最大 QoS 等级 - */ - private MqttQoS mqttQoS; - - /** - * 期望消费的点位 - */ - private long nextConsumerOffset; - - /** - * 最近一次订阅时间 - */ - private final long latestSubscribeTime = System.currentTimeMillis(); - - private TopicToken topicFilterToken; - private final AtomicBoolean semaphore = new AtomicBoolean(false); + protected final AtomicBoolean semaphore = new AtomicBoolean(false); - private boolean enable = true; + private final TopicSubscriber topicSubscriber; - public TopicSubscriber(BrokerTopic topic, MqttSession session, MqttQoS mqttQoS, long nextConsumerOffset) { - this.topic = topic; + public TopicConsumerRecord(BrokerTopic topic, MqttSession session, TopicSubscriber topicSubscriber, long nextConsumerOffset) { + super(topic, topicSubscriber.getTopicFilterToken(), nextConsumerOffset); this.mqttSession = session; - this.mqttQoS = mqttQoS; - this.nextConsumerOffset = nextConsumerOffset; + this.topicSubscriber = topicSubscriber; } /** * 推送消息到客户端 */ - void pushToClient() { + public void pushToClient() { if (mqttSession.isDisconnect() || !enable) { return; } @@ -91,14 +70,14 @@ public class TopicSubscriber { return; } - MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(mqttQoS).topicName(message.getTopic()); + MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(topicSubscriber.getMqttQoS()).topicName(message.getTopic()); if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } nextConsumerOffset = message.getOffset() + 1; //Qos0直接发送 - if (mqttQoS == MqttQoS.AT_MOST_ONCE) { + if (topicSubscriber.getMqttQoS() == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); push0(); return; @@ -106,7 +85,7 @@ public class TopicSubscriber { CompletableFuture> future = mqttSession.getInflightQueue().offer(publishBuilder, () -> { if (semaphore.compareAndSet(true, false)) { - topic.addSubscriber(TopicSubscriber.this); + topic.addSubscriber(this); } topic.push(); }); @@ -118,35 +97,11 @@ public class TopicSubscriber { push0(); } - public BrokerTopic getTopic() { - return topic; - } - public MqttSession getMqttSession() { return mqttSession; } public MqttQoS getMqttQoS() { - return mqttQoS; - } - - public long getLatestSubscribeTime() { - return latestSubscribeTime; - } - - public TopicToken getTopicFilterToken() { - return topicFilterToken; - } - - public void setTopicFilterToken(TopicToken topicFilterToken) { - this.topicFilterToken = topicFilterToken; - } - - public void disable() { - this.enable = false; - } - - public void setMqttQoS(MqttQoS mqttQoS) { - this.mqttQoS = mqttQoS; + return topicSubscriber.getMqttQoS(); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java index 6d0125af10f112088a4d2b61e611e40d1fc6f01b..7d8a9a336f227497914ea6f8a92e3d4c1973c2a4 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java @@ -38,7 +38,11 @@ public class TopicPublishTree { } public void match(TopicToken topicToken, Consumer consumer) { - match(this, topicToken, consumer); + if (topicToken.isShared()) { + match(this, topicToken.getNextNode().getNextNode(), consumer); + } else { + match(this, topicToken, consumer); + } } private void match(TopicPublishTree treeNode, TopicToken topicToken, Consumer consumer) { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java index 26e241ec63206e675e2d311c9a8011f85f21d895..89b0d30bd442e0c6d76757e8a5224594c21aeb16 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -11,7 +11,7 @@ package org.smartboot.mqtt.broker.topic; import org.smartboot.mqtt.broker.MqttSession; -import org.smartboot.mqtt.broker.TopicFilterSubscriber; +import org.smartboot.mqtt.broker.TopicSubscriber; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.util.ValidateUtils; @@ -24,10 +24,13 @@ import java.util.function.BiConsumer; * @version V1.0 , 5/28/23 */ public class TopicSubscribeTree { - private final Map subscribers = new ConcurrentHashMap<>(); + private final Map subscribers = new ConcurrentHashMap<>(); private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); - public void subscribeTopic(MqttSession session, TopicFilterSubscriber subscriber) { + /** + * 将此订阅注册到订阅树 + */ + public void subscribeTopic(MqttSession session, TopicSubscriber subscriber) { TopicSubscribeTree treeNode = this; TopicToken token = subscriber.getTopicFilterToken(); do { @@ -36,7 +39,7 @@ public class TopicSubscribeTree { treeNode.subscribers.put(session, subscriber); } - public void unsubscribe(MqttSession session, TopicFilterSubscriber subscriber) { + public void unsubscribe(MqttSession session, TopicSubscriber subscriber) { TopicSubscribeTree subscribeTree = this; TopicToken topicToken = subscriber.getTopicFilterToken(); while (true) { @@ -49,18 +52,26 @@ public class TopicSubscribeTree { subscribeTree.subscribers.remove(session); } - public void match(BrokerTopic topicToken, BiConsumer consumer) { - match(topicToken.getTopicToken(), consumer); + /** + * 新增的Topic触发与订阅树匹配关系的刷新 + */ + public void refreshMatchRelation(BrokerTopic topicToken, BiConsumer consumer) { + //遍历共享订阅 + TopicSubscribeTree shareTree = subNode.get("$share"); + if (shareTree != null) { + shareTree.subNode.values().forEach(tree -> tree.match0(topicToken.getTopicToken(), consumer)); + } + match0(topicToken.getTopicToken(), consumer); } - private void match(TopicToken topicToken, BiConsumer consumer) { + private void match0(TopicToken topicToken, BiConsumer consumer) { //精确匹配 TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode()); if (subscribeTree != null) { if (topicToken.getNextNode() == null) { subscribers.forEach(consumer); } else { - subscribeTree.match(topicToken.getNextNode(), consumer); + subscribeTree.match0(topicToken.getNextNode(), consumer); } } subscribeTree = subNode.get("#"); @@ -74,7 +85,7 @@ public class TopicSubscribeTree { if (topicToken.getNextNode() == null) { subscribers.forEach(consumer); } else { - subscribeTree.subNode.values().forEach(t -> match(topicToken.getNextNode(), consumer)); + subscribeTree.subNode.values().forEach(t -> match0(topicToken.getNextNode(), consumer)); } } } diff --git a/smart-mqtt-broker/src/main/resources/logback.xml b/smart-mqtt-broker/src/main/resources/logback.xml index 03eecb5dc232710b20ae685304d0f57db980207d..0929c4828b06a4e6ff37651a8b0c5ac4c011b58c 100644 --- a/smart-mqtt-broker/src/main/resources/logback.xml +++ b/smart-mqtt-broker/src/main/resources/logback.xml @@ -11,7 +11,8 @@ - [%d{YYYY-MM-dd HH:mm:ss.SSS}] [%thread] %-5level [%logger{0}:%L] - %msg%n + + [%d{YYYY-MM-dd HH:mm:ss.SSS}] %-5level- %msg%n diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java index fc007c603e787b3334c1e5f07909ce051a49f83a..76d1b12232f890f3970aac2baac1a4ce56ecd74e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/TopicToken.java @@ -18,26 +18,31 @@ import org.smartboot.mqtt.common.util.ValidateUtils; */ public class TopicToken { private final String node; - private String topicFilter; - private TopicToken nextNode; + private final String topicFilter; + private final TopicToken nextNode; public TopicToken(String node) { this(node, 0); - this.topicFilter = node; } - public TopicToken(String node, int offset) { + TopicToken(String node, int offset) { int index = node.indexOf('/', offset); if (index == -1) { this.node = node.substring(offset); ValidateUtils.isTrue(this.node.indexOf('#') == -1 || this.node.length() == 1, "invalid topic filter"); ValidateUtils.isTrue(this.node.indexOf('+') == -1 || this.node.length() == 1, "invalid topic filter"); + this.nextNode = null; } else { this.node = node.substring(offset, index); ValidateUtils.isTrue(this.node.indexOf('#') == -1, "invalid topic filter"); ValidateUtils.isTrue(this.node.indexOf('+') == -1 || this.node.length() == 1, "invalid topic filter"); this.nextNode = new TopicToken(node, index + 1); } + if (offset == 0) { + this.topicFilter = node; + } else { + this.topicFilter = null; + } } public String getNode() { @@ -58,4 +63,8 @@ public class TopicToken { } return nextNode != null && nextNode.isWildcards(); } + + public boolean isShared() { + return this.node.equals("$share"); + } }