From 10c16477c5e651f15193b402f43d5f734a4c6ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 4 Jun 2023 14:42:22 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/pom.xml | 2 +- plugins/redis-bridge-plugin/pom.xml | 2 +- pom.xml | 4 +-- smart-mqtt-broker/pom.xml | 2 +- .../mqtt/broker/BrokerConfigure.java | 2 +- .../mqtt/broker/BrokerContextImpl.java | 2 +- .../mqtt/broker/TopicSubscriber.java | 34 +++++++------------ .../mqtt/broker/topic/TopicSubscribeTree.java | 6 +++- smart-mqtt-client/pom.xml | 2 +- .../org/smartboot/mqtt/client/MqttClient.java | 10 ++---- smart-mqtt-common/pom.xml | 2 +- .../smartboot/mqtt/common/InflightQueue.java | 7 ---- 12 files changed, 29 insertions(+), 46 deletions(-) diff --git a/plugins/pom.xml b/plugins/pom.xml index bee09121..8507d7b9 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -16,7 +16,7 @@ org.smartboot.mqtt smart-mqtt - 0.21 + 0.22 ../pom.xml pom diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml index f4e16694..25c88e24 100644 --- a/plugins/redis-bridge-plugin/pom.xml +++ b/plugins/redis-bridge-plugin/pom.xml @@ -3,7 +3,7 @@ plugins org.smartboot.mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 6e52628a..f5af912e 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.21 + 0.22 4.0.0 mqtt broker - 0.21 + 0.22 1.5.29 1.1.22 2.6 diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index aacc0788..5a2e35b2 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 408aa30e..8f4f2c33 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -42,7 +42,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.21"; + public static final String VERSION = "v0.22"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index bdf8b6fa..23aea81f 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -360,7 +360,7 @@ public class BrokerContextImpl implements BrokerContext { } }); - eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object.getTopicToken(), (session, topicFilterSubscriber) -> { + eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object, (session, topicFilterSubscriber) -> { if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) { return; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 09378fd5..38870a01 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -79,27 +79,22 @@ public class TopicSubscriber { return; } semaphore.release(); - int i = 16; - while (publishAvailable(brokerContext)) { - if (i-- == 0) { - if (semaphore.tryAcquire()) { - topic.getQueue().offer(this); - topic.getVersion().incrementAndGet(); - } - break; - } - } + publishAvailable(brokerContext); mqttSession.flush(); } - private boolean publishAvailable(BrokerContext brokerContext) { + private void publishAvailable(BrokerContext brokerContext) { PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider(); PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset); if (persistenceMessage == null) { if (semaphore.tryAcquire()) { topic.getQueue().offer(this); + if (persistenceProvider.get(topic.getTopic(), nextConsumerOffset) != null) { + topic.getVersion().incrementAndGet(); + brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); + } } - return false; + return; } MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic()); @@ -114,18 +109,13 @@ public class TopicSubscriber { //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); - return true; + publishAvailable(brokerContext); + return; } - CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> { - if (semaphore.tryAcquire()) { - topic.getQueue().offer(this); - topic.getVersion().incrementAndGet(); - } - brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); - }); + CompletableFuture> future = inflightQueue.offer(publishBuilder); if (future == null) { - return false; + return; } future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { //最早发送的消息若收到响应,则更新点位 @@ -137,7 +127,7 @@ public class TopicSubscriber { publishAvailable(brokerContext); }); - return true; + publishAvailable(brokerContext); } public BrokerTopic getTopic() { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java index 1ab0df22..2bab99d2 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -10,6 +10,7 @@ package org.smartboot.mqtt.broker.topic; +import org.smartboot.mqtt.broker.BrokerTopic; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.TopicFilterSubscriber; import org.smartboot.mqtt.common.TopicToken; @@ -49,8 +50,11 @@ public class TopicSubscribeTree { subscribeTree.subscribers.remove(session); } + public void match(BrokerTopic topicToken, BiConsumer consumer) { + match(topicToken.getTopicToken(), consumer); + } - public void match(TopicToken topicToken, BiConsumer consumer) { + private void match(TopicToken topicToken, BiConsumer consumer) { //精确匹配 TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode()); if (subscribeTree != null) { diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index aa4dfbf9..68fcd766 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index c4160ddb..fa8f0377 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -310,8 +310,9 @@ public class MqttClient extends AbstractSession { unsubscribeBuilder.properties(properties); } // wait ack message. - CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder, () -> registeredTasks.offer(() -> unsubscribe0(topics))); + CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder); if (future == null) { + registeredTasks.offer(() -> unsubscribe0(topics)); return; } future.whenComplete((message, throwable) -> { @@ -360,12 +361,7 @@ public class MqttClient extends AbstractSession { } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - CompletableFuture> future = getInflightQueue().offer(subscribeBuilder, new Runnable() { - @Override - public void run() { - - } - }); + CompletableFuture> future = getInflightQueue().offer(subscribeBuilder); if (future == null) { registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); return; diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 2a2c23d2..e94e8513 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 9859587e..6df9af87 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -80,12 +80,6 @@ public class InflightQueue { } public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) { - return offer(publishBuilder, () -> { - - }); - } - - public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) { final ReentrantLock lock = this.lock; lock.lock(); try { @@ -94,7 +88,6 @@ public class InflightQueue { if (i < 0) { i = queue.length - 1; } - queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run()); return null; } else { return enqueue(publishBuilder); -- Gitee From 21be363e2ad76c63918e8e236fd8ad9d3a29e24d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 4 Jun 2023 15:08:31 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/smartboot/mqtt/broker/TopicSubscriber.java | 2 -- .../main/java/org/smartboot/mqtt/common/InflightQueue.java | 4 ---- .../java/org/smartboot/mqtt/common/eventbus/EventType.java | 5 ----- 3 files changed, 11 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 38870a01..87980157 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -19,7 +19,6 @@ import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; -import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; @@ -105,7 +104,6 @@ public class TopicSubscriber { InflightQueue inflightQueue = mqttSession.getInflightQueue(); long offset = persistenceMessage.getOffset(); nextConsumerOffset = offset + 1; - brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 6df9af87..e2741863 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -84,10 +84,6 @@ public class InflightQueue { lock.lock(); try { if (count == queue.length) { - int i = putIndex - 1; - if (i < 0) { - i = queue.length - 1; - } return null; } else { return enqueue(publishBuilder); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java index f3e02c45..05fb32de 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java @@ -31,11 +31,6 @@ public class EventType { //连接断开 public static final EventType DISCONNECT = new EventType<>("disconnect"); - /** - * Broker推送消息至客户端 - */ - public static final EventType PUSH_PUBLISH_MESSAGE = new EventType<>("pushPublishMessage"); - /** * 接收到客户端发送的任何消息 */ -- Gitee From 3c50e96d8708233657b556854c39af29dfd50f14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 10 Jun 2023 21:24:40 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 +-- .../smartboot/mqtt/broker/BrokerContext.java | 4 +++ .../mqtt/broker/BrokerContextImpl.java | 12 +++++++++ .../org/smartboot/mqtt/client/MqttClient.java | 27 +++++++++++++++++-- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index f5af912e..06f0b048 100644 --- a/pom.xml +++ b/pom.xml @@ -10,8 +10,8 @@ 0.22 - 1.5.29 - 1.1.22 + 1.5.30 + 1.2.3 2.6 4.3 4.13.2 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 1e683933..8e3fb139 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -92,4 +92,8 @@ public interface BrokerContext { TopicPublishTree getPublishTopicTree(); TopicSubscribeTree getTopicSubscribeTree(); + + void bundle(String key, T resource); + + T getBundle(String key); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 23aea81f..f3a72b27 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -80,6 +80,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.Properties; @@ -141,6 +142,7 @@ public class BrokerContextImpl implements BrokerContext { private AsynchronousChannelGroup asynchronousChannelGroup; + private final Map resources = new Hashtable<>(); private final Map, MqttProcessor> processors; { @@ -527,6 +529,16 @@ public class BrokerContextImpl implements BrokerContext { return subscribeTopicTree; } + @Override + public void bundle(String key, T resource) { + resources.put(key, resource); + } + + @Override + public T getBundle(String key) { + return (T) resources.get(key); + } + public void loadYamlConfig() throws IOException { String brokerConfig = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig); InputStream inputStream = null; diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index fa8f0377..fecff91d 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -11,6 +11,7 @@ package org.smartboot.mqtt.client; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; @@ -46,6 +47,7 @@ import org.smartboot.mqtt.common.message.variable.properties.SubscribeProperties import org.smartboot.mqtt.common.message.variable.properties.WillProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.MqttMessageBuilders; +import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; @@ -112,14 +114,34 @@ public class MqttClient extends AbstractSession { private Consumer reconnectConsumer; private boolean pingTimeout = false; + public MqttClient(String uri) { + this(uri, MqttUtil.createClientId()); + } + + public MqttClient(String uri, String clientId) { + this(uri, clientId, MqttVersion.MQTT_3_1_1); + } + public MqttClient(String host, int port, String clientId) { this(host, port, clientId, MqttVersion.MQTT_3_1_1); } public MqttClient(String host, int port, String clientId, MqttVersion mqttVersion) { + this("mqtt://" + host + ":" + port, clientId, mqttVersion); + } + + public MqttClient(String uri, String clientId, MqttVersion mqttVersion) { super(new EventBusImpl(EventType.types())); - clientConfigure.setHost(host); - clientConfigure.setPort(port); + + String[] array = uri.split(":"); + if (array[0].startsWith("mqtts://")) { + clientConfigure.setHost(array[0].substring(8)); + } else if (array[0].startsWith("mqtt://")) { + clientConfigure.setHost(array[0].substring(7)); + } else { + throw new IllegalStateException("invalid URI Scheme, uri: " + uri); + } + clientConfigure.setPort(NumberUtils.toInt(array[1])); clientConfigure.setMqttVersion(mqttVersion); this.clientId = clientId; //ping-pong消息超时监听 @@ -135,6 +157,7 @@ public class MqttClient extends AbstractSession { }); } + public void connect() { try { asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(2, new ThreadFactory() { -- Gitee From 3be26726841b246c4abda1865773f9b991bedd5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 11 Jun 2023 17:57:53 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/client/MqttClient.java | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index fecff91d..715c4255 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -15,6 +15,7 @@ import org.apache.commons.lang.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; +import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.DefaultMqttWriter; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; @@ -134,14 +135,15 @@ public class MqttClient extends AbstractSession { super(new EventBusImpl(EventType.types())); String[] array = uri.split(":"); - if (array[0].startsWith("mqtts://")) { - clientConfigure.setHost(array[0].substring(8)); - } else if (array[0].startsWith("mqtt://")) { - clientConfigure.setHost(array[0].substring(7)); + if (array[0].equals("mqtts")) { + clientConfigure.setHost(array[1].substring(2)); + //加密通信 + } else if (array[0].equals("mqtt")) { + clientConfigure.setHost(array[1].substring(2)); } else { throw new IllegalStateException("invalid URI Scheme, uri: " + uri); } - clientConfigure.setPort(NumberUtils.toInt(array[1])); + clientConfigure.setPort(NumberUtils.toInt(array[2])); clientConfigure.setMqttVersion(mqttVersion); this.clientId = clientId; //ping-pong消息超时监听 @@ -192,42 +194,23 @@ public class MqttClient extends AbstractSession { public void connect(AsynchronousChannelGroup asynchronousChannelGroup, BufferPagePool bufferPagePool, Consumer consumer) { //设置 connect ack 回调事件 - this.connectConsumer = mqttConnAckMessage -> { - if (!clientConfigure.isAutomaticReconnect()) { - gcConfigure(); - } - - //连接成功,注册订阅消息 - if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - setInflightQueue(new InflightQueue(this, 16)); - connected = true; - //重连情况下重新触发订阅逻辑 - subscribes.forEach((k, v) -> { - subscribe(k, v.getQoS(), v.getConsumer()); - }); - consumeTask(); - } - //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 - //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 - //服务端重发消息的情况。 - if (!clientConfigure.isCleanSession()) { - //todo - } - consumer.accept(mqttConnAckMessage); - connected = true; - }; + this.connectConsumer = consumer; //启动心跳插件 if (clientConfigure.getKeepAliveInterval() > 0) { - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() { + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() { @Override - public void run() { + public void execute() { //客户端发送了 PINGREQ 报文之后,如果在合理的时间内仍没有收到 PINGRESP 报文, // 它应该关闭到服务端的网络连接。 if (pingTimeout) { pingTimeout = false; client.shutdown(); } - if (session.isInvalid()) { + if (session == null || session.isInvalid()) { + if (client != null) { + client.shutdown(); + client = null; + } if (clientConfigure.isAutomaticReconnect()) { LOGGER.warn("mqtt client is disconnect, try to reconnect..."); connect(asynchronousChannelGroup, reconnectConsumer == null ? consumer : reconnectConsumer); @@ -237,9 +220,9 @@ public class MqttClient extends AbstractSession { long delay = System.currentTimeMillis() - getLatestSendMessageTime() - clientConfigure.getKeepAliveInterval() * 1000L; //gap 10ms if (delay > -10) { + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS); MqttPingReqMessage pingReqMessage = new MqttPingReqMessage(); write(pingReqMessage); - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS); } else { QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, -delay, TimeUnit.MILLISECONDS); } @@ -416,7 +399,28 @@ public class MqttClient extends AbstractSession { } public void notifyResponse(MqttConnAckMessage connAckMessage) { + if (!clientConfigure.isAutomaticReconnect()) { + gcConfigure(); + } + + //连接成功,注册订阅消息 + if (connAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { + setInflightQueue(new InflightQueue(this, 16)); + connected = true; + //重连情况下重新触发订阅逻辑 + subscribes.forEach((k, v) -> { + subscribe(k, v.getQoS(), v.getConsumer()); + }); + consumeTask(); + } + //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 + //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 + //服务端重发消息的情况。 + if (!clientConfigure.isCleanSession()) { + //todo + } connectConsumer.accept(connAckMessage); + connected = true; } @@ -512,6 +516,7 @@ public class MqttClient extends AbstractSession { clientConfigure.setAutomaticReconnect(false); disconnect = true; client.shutdown(); + client = null; } public void setReconnectConsumer(Consumer reconnectConsumer) { -- Gitee From e5888b6c944269c51ca0eb32afbbb6da9d33936f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 16 Jun 2023 23:29:45 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/BrokerConfigure.java | 22 ++++++++++++++ .../mqtt/broker/BrokerContextImpl.java | 30 ++++++++----------- .../smartboot/mqtt/broker/MqttSession.java | 3 ++ .../mqtt/broker/provider/Providers.java | 3 +- .../broker/provider/SubscribeProvider.java | 11 ++++++- 5 files changed, 50 insertions(+), 19 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 8f4f2c33..d1f2213f 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -12,8 +12,10 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.extension.plugins.Plugin; +import java.nio.channels.AsynchronousChannelGroup; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -106,6 +108,10 @@ public class BrokerConfigure extends ToString { private final List> plugins = new LinkedList<>(); + private AsynchronousChannelGroup channelGroup; + + private BufferPagePool bufferPagePool; + public int getPort() { return port; } @@ -203,6 +209,22 @@ public class BrokerConfigure extends ToString { return plugins; } + public AsynchronousChannelGroup getChannelGroup() { + return channelGroup; + } + + public void setChannelGroup(AsynchronousChannelGroup channelGroup) { + this.channelGroup = channelGroup; + } + + public BufferPagePool getBufferPagePool() { + return bufferPagePool; + } + + public void setBufferPagePool(BufferPagePool bufferPagePool) { + this.bufferPagePool = bufferPagePool; + } + @Override public String toString() { return "BrokerConfigure{" + diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index f3a72b27..c6da2bfa 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -72,7 +72,6 @@ import org.yaml.snakeyaml.Yaml; import java.io.IOException; import java.io.InputStream; import java.lang.management.ManagementFactory; -import java.nio.channels.AsynchronousChannelGroup; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -133,15 +132,12 @@ public class BrokerContextImpl implements BrokerContext { * Broker Server */ private AioQuickServer server; - private BufferPagePool pagePool; private final MqttBrokerMessageProcessor processor = new MqttBrokerMessageProcessor(this); //配置文件内容 private String configJson; private final static BrokerTopic SHUTDOWN_TOPIC = new BrokerTopic(""); - private AsynchronousChannelGroup asynchronousChannelGroup; - private final Map resources = new Hashtable<>(); private final Map, MqttProcessor> processors; @@ -175,20 +171,11 @@ public class BrokerContextImpl implements BrokerContext { try { - asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { - int i; - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "smart-mqtt-broker-" + (++i)); - } - }); - pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); brokerConfigure.addPlugin(new QosRetryPlugin()); brokerConfigure.getPlugins().forEach(processor::addPlugin); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); - server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); - server.start(asynchronousChannelGroup); + server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(brokerConfigure.getBufferPagePool()).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); + server.start(brokerConfigure.getChannelGroup()); System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt"); System.out.println("Github: https://github.com/smartboot/smart-mqtt"); @@ -418,6 +405,15 @@ public class BrokerContextImpl implements BrokerContext { brokerConfigure.setHost("0.0.0.0"); } + brokerConfigure.setChannelGroup(new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + int i; + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "smart-mqtt-broker-" + (++i)); + } + })); + brokerConfigure.setBufferPagePool(new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true)); // System.out.println("brokerConfigure: " + brokerConfigure); } @@ -573,8 +569,8 @@ public class BrokerContextImpl implements BrokerContext { pushTopicQueue.offer(SHUTDOWN_TOPIC); pushThreadPool.shutdown(); server.shutdown(); - asynchronousChannelGroup.shutdown(); - pagePool.release(); + brokerConfigure.getChannelGroup().shutdown(); + brokerConfigure.getBufferPagePool().release(); //卸载插件 plugins.forEach(Plugin::uninstall); plugins.clear(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 02a990fc..857f8eac 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -158,6 +158,9 @@ public class MqttSession extends AbstractSession { } public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { + if (!mqttContext.getProviders().getSubscribeProvider().matchTopic(topic, this)) { + return; + } TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this); if (topicSubscriber != null) { //此前的订阅关系 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java index 0a4b9fed..e475b402 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java @@ -24,7 +24,8 @@ public class Providers { private PersistenceProvider retainMessageProvider = new MemoryPersistenceProvider(); private PersistenceProvider persistenceProvider = new MemoryPersistenceProvider(); - private SubscribeProvider subscribeProvider = (topicFilter, session) -> true; + private SubscribeProvider subscribeProvider = new SubscribeProvider() { + }; /** * OpenAPI 处理器 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java index 21b517b4..de44e37b 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java @@ -10,6 +10,7 @@ package org.smartboot.mqtt.broker.provider; +import org.smartboot.mqtt.broker.BrokerTopic; import org.smartboot.mqtt.broker.MqttSession; /** @@ -19,5 +20,13 @@ import org.smartboot.mqtt.broker.MqttSession; * @version V1.0 , 2022/12/28 */ public interface SubscribeProvider { - boolean subscribeTopic(String topicFilter, MqttSession session); + default boolean subscribeTopic(String topicFilter, MqttSession session) { + //4.7.2 应用不能使用 $ 字符开头的主题 + return !topicFilter.startsWith("$"); + } + + default boolean matchTopic(BrokerTopic brokerTopic, MqttSession session) { + return !brokerTopic.getTopic().startsWith("$SYS/"); + } + } -- Gitee From 31ef543c7f368cb4249ffddd8dd5c6e8a6467782 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 16 Jun 2023 23:38:14 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dashboard/package.json | 2 +- dashboard/src/mockjs/user.ts | 5 + dashboard/src/router/module/base-routes.ts | 5 + dashboard/src/views/System/cluster.vue | 211 +++++++++++++++++++++ dashboard/vite.config.ts | 4 +- 5 files changed, 224 insertions(+), 3 deletions(-) create mode 100644 dashboard/src/views/System/cluster.vue diff --git a/dashboard/package.json b/dashboard/package.json index 89589702..677d7af0 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -8,7 +8,7 @@ "serve": "vite preview" }, "dependencies": { - "@layui/layui-vue": "1.11.4", + "@layui/layui-vue": "2.3.0", "axios": "^1.2.1", "chart.js": "^4.2.1", "echarts": "^5.4.1", diff --git a/dashboard/src/mockjs/user.ts b/dashboard/src/mockjs/user.ts index 6969196f..e741887a 100644 --- a/dashboard/src/mockjs/user.ts +++ b/dashboard/src/mockjs/user.ts @@ -62,6 +62,11 @@ const menus = [ title: "用户", icon:"layui-icon-group" }, + { + id: "/system/cluster", + title: "集群管理", + icon: "layui-icon-slider" + }, { id: "/system/setting", title: "设置", diff --git a/dashboard/src/router/module/base-routes.ts b/dashboard/src/router/module/base-routes.ts index 4047a9df..5ad8e03e 100644 --- a/dashboard/src/router/module/base-routes.ts +++ b/dashboard/src/router/module/base-routes.ts @@ -84,6 +84,11 @@ export default [ component: () => import('../../views/System/user.vue'), meta: {title: '用户', requireAuth: true}, }, + { + path: '/system/cluster', + component: () => import('../../views/System/cluster.vue'), + meta: {title: '集群管理', requireAuth: true}, + }, { path: '/system/setting', component: () => import('../../views/System/setting.vue'), diff --git a/dashboard/src/views/System/cluster.vue b/dashboard/src/views/System/cluster.vue new file mode 100644 index 00000000..f4d53f6c --- /dev/null +++ b/dashboard/src/views/System/cluster.vue @@ -0,0 +1,211 @@ + + + \ No newline at end of file diff --git a/dashboard/vite.config.ts b/dashboard/vite.config.ts index e00bebd6..e13d05e7 100644 --- a/dashboard/vite.config.ts +++ b/dashboard/vite.config.ts @@ -11,8 +11,8 @@ export default defineConfig({ server:{ proxy:{ '/api': { - target: 'http://127.0.0.1:18083/api/', - // target: 'http://82.157.162.230:8083/api/', + // target: 'http://127.0.0.1:18083/api/', + target: 'http://82.157.162.230:8083/api/', changeOrigin: true, rewrite: path => path.replace(/^\/api/, '') } -- Gitee From 2f30f385cb86f9b5ee87a3dd965d8112c91da833 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 17 Jun 2023 16:29:03 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/broker/TopicSubscriber.java | 8 +++++++- .../java/org/smartboot/mqtt/common/InflightQueue.java | 11 +++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 87980157..ae808fbc 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -111,7 +111,13 @@ public class TopicSubscriber { return; } - CompletableFuture> future = inflightQueue.offer(publishBuilder); + CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> { + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + topic.getVersion().incrementAndGet(); + } + brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); + }); if (future == null) { return; } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index e2741863..9859587e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -80,10 +80,21 @@ public class InflightQueue { } public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) { + return offer(publishBuilder, () -> { + + }); + } + + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == queue.length) { + int i = putIndex - 1; + if (i < 0) { + i = queue.length - 1; + } + queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run()); return null; } else { return enqueue(publishBuilder); -- Gitee From 3d843f2f162879e5a09a264dc965bf109cc644ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 17 Jun 2023 16:56:52 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/broker/TopicSubscriber.java | 4 ++-- .../main/java/org/smartboot/mqtt/common/InflightQueue.java | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index ae808fbc..be2e1b77 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -111,9 +111,9 @@ public class TopicSubscriber { return; } - CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> { + CompletableFuture> future = inflightQueue.offer(publishBuilder, mqttPacketIdentifierMessage -> { if (semaphore.tryAcquire()) { - topic.getQueue().offer(this); + topic.getQueue().offer(TopicSubscriber.this); topic.getVersion().incrementAndGet(); } brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 9859587e..846240e7 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; /** * @author 三刀(zhengjunweimail@163.com) @@ -80,12 +81,12 @@ public class InflightQueue { } public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) { - return offer(publishBuilder, () -> { + return offer(publishBuilder, mqttPacketIdentifierMessage -> { }); } - public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) { + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { final ReentrantLock lock = this.lock; lock.lock(); try { @@ -94,7 +95,7 @@ public class InflightQueue { if (i < 0) { i = queue.length - 1; } - queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run()); + queue[i].getFuture().thenAccept(consumer); return null; } else { return enqueue(publishBuilder); -- Gitee From ff2b477e34240de8229d87752ce0ad2ea2311b5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 17 Jun 2023 19:43:14 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E5=8D=87=E7=BA=A7smart-http=E8=87=B31.2.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 06f0b048..f5c21f88 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ 0.22 1.5.30 - 1.2.3 + 1.2.4 2.6 4.3 4.13.2 -- Gitee