diff --git a/dashboard/package.json b/dashboard/package.json index 895897027d35ced40e031149b5ca2ae81a989e1d..677d7af0486a9f7a0b7597e02781e2d5d2c33549 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -8,7 +8,7 @@ "serve": "vite preview" }, "dependencies": { - "@layui/layui-vue": "1.11.4", + "@layui/layui-vue": "2.3.0", "axios": "^1.2.1", "chart.js": "^4.2.1", "echarts": "^5.4.1", diff --git a/dashboard/src/mockjs/user.ts b/dashboard/src/mockjs/user.ts index 6969196fd5a92c7ee023495545c87bdb7afba11f..e741887af4c0ce7557139ca47e3f73189e6230b6 100644 --- a/dashboard/src/mockjs/user.ts +++ b/dashboard/src/mockjs/user.ts @@ -62,6 +62,11 @@ const menus = [ title: "用户", icon:"layui-icon-group" }, + { + id: "/system/cluster", + title: "集群管理", + icon: "layui-icon-slider" + }, { id: "/system/setting", title: "设置", diff --git a/dashboard/src/router/module/base-routes.ts b/dashboard/src/router/module/base-routes.ts index 4047a9df0cfdce3d752651cb7d388c88a9c2e1f3..5ad8e03ee662f45ee48f322ef312779084ee1662 100644 --- a/dashboard/src/router/module/base-routes.ts +++ b/dashboard/src/router/module/base-routes.ts @@ -84,6 +84,11 @@ export default [ component: () => import('../../views/System/user.vue'), meta: {title: '用户', requireAuth: true}, }, + { + path: '/system/cluster', + component: () => import('../../views/System/cluster.vue'), + meta: {title: '集群管理', requireAuth: true}, + }, { path: '/system/setting', component: () => import('../../views/System/setting.vue'), diff --git a/dashboard/src/views/System/cluster.vue b/dashboard/src/views/System/cluster.vue new file mode 100644 index 0000000000000000000000000000000000000000..f4d53f6cfe12f26becc59069526c5815da594ed5 --- /dev/null +++ b/dashboard/src/views/System/cluster.vue @@ -0,0 +1,211 @@ + + + \ No newline at end of file diff --git a/dashboard/vite.config.ts b/dashboard/vite.config.ts index e00bebd66ac319efd365b09ffe4f83094c8512ac..e13d05e73dad8be5a2853ac9855d23ecbc3160b8 100644 --- a/dashboard/vite.config.ts +++ b/dashboard/vite.config.ts @@ -11,8 +11,8 @@ export default defineConfig({ server:{ proxy:{ '/api': { - target: 'http://127.0.0.1:18083/api/', - // target: 'http://82.157.162.230:8083/api/', + // target: 'http://127.0.0.1:18083/api/', + target: 'http://82.157.162.230:8083/api/', changeOrigin: true, rewrite: path => path.replace(/^\/api/, '') } diff --git a/plugins/pom.xml b/plugins/pom.xml index bee091216462c50f10872c5e2ef5c1d2c60dc838..8507d7b91c8d46441c9fbb708975f879f3b2966a 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -16,7 +16,7 @@ org.smartboot.mqtt smart-mqtt - 0.21 + 0.22 ../pom.xml pom diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml index f4e16694b6ad5b6f2d9a1e9c84def2e413a8d367..25c88e241a08468b9fbbe0a8e47517d74b331a1c 100644 --- a/plugins/redis-bridge-plugin/pom.xml +++ b/plugins/redis-bridge-plugin/pom.xml @@ -3,7 +3,7 @@ plugins org.smartboot.mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 6e52628a2764665e6846beeb173b3cd5bfafc3f5..f5c21f88f006c17baeaa1cc9095c5f220bde9adf 100644 --- a/pom.xml +++ b/pom.xml @@ -4,14 +4,14 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.21 + 0.22 4.0.0 mqtt broker - 0.21 - 1.5.29 - 1.1.22 + 0.22 + 1.5.30 + 1.2.4 2.6 4.3 4.13.2 diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index aacc07888dd035fbdb3ae17357429d74f670f55f..5a2e35b20cc4771b5c5026fec85ea32138feef88 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 408aa30e2b43cdd674a07353d5dd388bbab7f51e..d1f2213f3a285488fb798c85b8ab911d165936db 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -12,8 +12,10 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.extension.plugins.Plugin; +import java.nio.channels.AsynchronousChannelGroup; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -42,7 +44,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.21"; + public static final String VERSION = "v0.22"; static final Map SystemEnvironments = new HashMap<>(); @@ -106,6 +108,10 @@ public class BrokerConfigure extends ToString { private final List> plugins = new LinkedList<>(); + private AsynchronousChannelGroup channelGroup; + + private BufferPagePool bufferPagePool; + public int getPort() { return port; } @@ -203,6 +209,22 @@ public class BrokerConfigure extends ToString { return plugins; } + public AsynchronousChannelGroup getChannelGroup() { + return channelGroup; + } + + public void setChannelGroup(AsynchronousChannelGroup channelGroup) { + this.channelGroup = channelGroup; + } + + public BufferPagePool getBufferPagePool() { + return bufferPagePool; + } + + public void setBufferPagePool(BufferPagePool bufferPagePool) { + this.bufferPagePool = bufferPagePool; + } + @Override public String toString() { return "BrokerConfigure{" + diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 1e68393344379b50b4def018e9ae3b9f2992a931..8e3fb139e3fe5f07370c1f3f72ac2abe5282fd13 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -92,4 +92,8 @@ public interface BrokerContext { TopicPublishTree getPublishTopicTree(); TopicSubscribeTree getTopicSubscribeTree(); + + void bundle(String key, T resource); + + T getBundle(String key); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index bdf8b6fade4e74403fa5accc3c6c2f847d6873d2..c6da2bfab548b79cd104b78d897932a1fe620d82 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -72,7 +72,6 @@ import org.yaml.snakeyaml.Yaml; import java.io.IOException; import java.io.InputStream; import java.lang.management.ManagementFactory; -import java.nio.channels.AsynchronousChannelGroup; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -80,6 +79,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.Properties; @@ -132,15 +132,13 @@ public class BrokerContextImpl implements BrokerContext { * Broker Server */ private AioQuickServer server; - private BufferPagePool pagePool; private final MqttBrokerMessageProcessor processor = new MqttBrokerMessageProcessor(this); //配置文件内容 private String configJson; private final static BrokerTopic SHUTDOWN_TOPIC = new BrokerTopic(""); - private AsynchronousChannelGroup asynchronousChannelGroup; - + private final Map resources = new Hashtable<>(); private final Map, MqttProcessor> processors; { @@ -173,20 +171,11 @@ public class BrokerContextImpl implements BrokerContext { try { - asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { - int i; - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "smart-mqtt-broker-" + (++i)); - } - }); - pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); brokerConfigure.addPlugin(new QosRetryPlugin()); brokerConfigure.getPlugins().forEach(processor::addPlugin); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); - server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); - server.start(asynchronousChannelGroup); + server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(brokerConfigure.getBufferPagePool()).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); + server.start(brokerConfigure.getChannelGroup()); System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt"); System.out.println("Github: https://github.com/smartboot/smart-mqtt"); @@ -360,7 +349,7 @@ public class BrokerContextImpl implements BrokerContext { } }); - eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object.getTopicToken(), (session, topicFilterSubscriber) -> { + eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object, (session, topicFilterSubscriber) -> { if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) { return; } @@ -416,6 +405,15 @@ public class BrokerContextImpl implements BrokerContext { brokerConfigure.setHost("0.0.0.0"); } + brokerConfigure.setChannelGroup(new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + int i; + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "smart-mqtt-broker-" + (++i)); + } + })); + brokerConfigure.setBufferPagePool(new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true)); // System.out.println("brokerConfigure: " + brokerConfigure); } @@ -527,6 +525,16 @@ public class BrokerContextImpl implements BrokerContext { return subscribeTopicTree; } + @Override + public void bundle(String key, T resource) { + resources.put(key, resource); + } + + @Override + public T getBundle(String key) { + return (T) resources.get(key); + } + public void loadYamlConfig() throws IOException { String brokerConfig = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig); InputStream inputStream = null; @@ -561,8 +569,8 @@ public class BrokerContextImpl implements BrokerContext { pushTopicQueue.offer(SHUTDOWN_TOPIC); pushThreadPool.shutdown(); server.shutdown(); - asynchronousChannelGroup.shutdown(); - pagePool.release(); + brokerConfigure.getChannelGroup().shutdown(); + brokerConfigure.getBufferPagePool().release(); //卸载插件 plugins.forEach(Plugin::uninstall); plugins.clear(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 02a990fcbdeeb3226bbf681b34cd86a7a5a22ae1..857f8eac867a374961e0ad1bc3ce6cc20e143f12 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -158,6 +158,9 @@ public class MqttSession extends AbstractSession { } public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { + if (!mqttContext.getProviders().getSubscribeProvider().matchTopic(topic, this)) { + return; + } TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this); if (topicSubscriber != null) { //此前的订阅关系 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 09378fd580f6cd43bb63f8716df9b1eb2d20d362..be2e1b77c9b36e0e0f5281b89a80240b28920b17 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -19,7 +19,6 @@ import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; -import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; @@ -79,27 +78,22 @@ public class TopicSubscriber { return; } semaphore.release(); - int i = 16; - while (publishAvailable(brokerContext)) { - if (i-- == 0) { - if (semaphore.tryAcquire()) { - topic.getQueue().offer(this); - topic.getVersion().incrementAndGet(); - } - break; - } - } + publishAvailable(brokerContext); mqttSession.flush(); } - private boolean publishAvailable(BrokerContext brokerContext) { + private void publishAvailable(BrokerContext brokerContext) { PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider(); PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset); if (persistenceMessage == null) { if (semaphore.tryAcquire()) { topic.getQueue().offer(this); + if (persistenceProvider.get(topic.getTopic(), nextConsumerOffset) != null) { + topic.getVersion().incrementAndGet(); + brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); + } } - return false; + return; } MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic()); @@ -110,22 +104,22 @@ public class TopicSubscriber { InflightQueue inflightQueue = mqttSession.getInflightQueue(); long offset = persistenceMessage.getOffset(); nextConsumerOffset = offset + 1; - brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); - return true; + publishAvailable(brokerContext); + return; } - CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> { + CompletableFuture> future = inflightQueue.offer(publishBuilder, mqttPacketIdentifierMessage -> { if (semaphore.tryAcquire()) { - topic.getQueue().offer(this); + topic.getQueue().offer(TopicSubscriber.this); topic.getVersion().incrementAndGet(); } brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); }); if (future == null) { - return false; + return; } future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { //最早发送的消息若收到响应,则更新点位 @@ -137,7 +131,7 @@ public class TopicSubscriber { publishAvailable(brokerContext); }); - return true; + publishAvailable(brokerContext); } public BrokerTopic getTopic() { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java index 0a4b9feda17853b73eccef1ed51eee297e6e5ef0..e475b402ef8e02ceca66f861aab3d4f262f99465 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java @@ -24,7 +24,8 @@ public class Providers { private PersistenceProvider retainMessageProvider = new MemoryPersistenceProvider(); private PersistenceProvider persistenceProvider = new MemoryPersistenceProvider(); - private SubscribeProvider subscribeProvider = (topicFilter, session) -> true; + private SubscribeProvider subscribeProvider = new SubscribeProvider() { + }; /** * OpenAPI 处理器 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java index 21b517b4d11a8c170882d8f7527f52118b1577ad..de44e37b54ed4b1a9baf8594ef104e65e25c46d3 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java @@ -10,6 +10,7 @@ package org.smartboot.mqtt.broker.provider; +import org.smartboot.mqtt.broker.BrokerTopic; import org.smartboot.mqtt.broker.MqttSession; /** @@ -19,5 +20,13 @@ import org.smartboot.mqtt.broker.MqttSession; * @version V1.0 , 2022/12/28 */ public interface SubscribeProvider { - boolean subscribeTopic(String topicFilter, MqttSession session); + default boolean subscribeTopic(String topicFilter, MqttSession session) { + //4.7.2 应用不能使用 $ 字符开头的主题 + return !topicFilter.startsWith("$"); + } + + default boolean matchTopic(BrokerTopic brokerTopic, MqttSession session) { + return !brokerTopic.getTopic().startsWith("$SYS/"); + } + } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java index 1ab0df22f8732e3756f44a638c20456bc5220e68..2bab99d2489d2111b1c7607d3d6431d68db911df 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -10,6 +10,7 @@ package org.smartboot.mqtt.broker.topic; +import org.smartboot.mqtt.broker.BrokerTopic; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.TopicFilterSubscriber; import org.smartboot.mqtt.common.TopicToken; @@ -49,8 +50,11 @@ public class TopicSubscribeTree { subscribeTree.subscribers.remove(session); } + public void match(BrokerTopic topicToken, BiConsumer consumer) { + match(topicToken.getTopicToken(), consumer); + } - public void match(TopicToken topicToken, BiConsumer consumer) { + private void match(TopicToken topicToken, BiConsumer consumer) { //精确匹配 TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode()); if (subscribeTree != null) { diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index aa4dfbf9cd12a4aad0506a89a2763d9f4ebb1af4..68fcd766981766d5abbad959872f613a76a7da45 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index c4160ddb4f2409b70d23c0d6f68f0687f21aaac6..715c4255104c33fe9b582594f645e1db6121902f 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -11,9 +11,11 @@ package org.smartboot.mqtt.client; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; +import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.DefaultMqttWriter; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; @@ -46,6 +48,7 @@ import org.smartboot.mqtt.common.message.variable.properties.SubscribeProperties import org.smartboot.mqtt.common.message.variable.properties.WillProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.MqttMessageBuilders; +import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; @@ -112,14 +115,35 @@ public class MqttClient extends AbstractSession { private Consumer reconnectConsumer; private boolean pingTimeout = false; + public MqttClient(String uri) { + this(uri, MqttUtil.createClientId()); + } + + public MqttClient(String uri, String clientId) { + this(uri, clientId, MqttVersion.MQTT_3_1_1); + } + public MqttClient(String host, int port, String clientId) { this(host, port, clientId, MqttVersion.MQTT_3_1_1); } public MqttClient(String host, int port, String clientId, MqttVersion mqttVersion) { + this("mqtt://" + host + ":" + port, clientId, mqttVersion); + } + + public MqttClient(String uri, String clientId, MqttVersion mqttVersion) { super(new EventBusImpl(EventType.types())); - clientConfigure.setHost(host); - clientConfigure.setPort(port); + + String[] array = uri.split(":"); + if (array[0].equals("mqtts")) { + clientConfigure.setHost(array[1].substring(2)); + //加密通信 + } else if (array[0].equals("mqtt")) { + clientConfigure.setHost(array[1].substring(2)); + } else { + throw new IllegalStateException("invalid URI Scheme, uri: " + uri); + } + clientConfigure.setPort(NumberUtils.toInt(array[2])); clientConfigure.setMqttVersion(mqttVersion); this.clientId = clientId; //ping-pong消息超时监听 @@ -135,6 +159,7 @@ public class MqttClient extends AbstractSession { }); } + public void connect() { try { asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(2, new ThreadFactory() { @@ -169,42 +194,23 @@ public class MqttClient extends AbstractSession { public void connect(AsynchronousChannelGroup asynchronousChannelGroup, BufferPagePool bufferPagePool, Consumer consumer) { //设置 connect ack 回调事件 - this.connectConsumer = mqttConnAckMessage -> { - if (!clientConfigure.isAutomaticReconnect()) { - gcConfigure(); - } - - //连接成功,注册订阅消息 - if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - setInflightQueue(new InflightQueue(this, 16)); - connected = true; - //重连情况下重新触发订阅逻辑 - subscribes.forEach((k, v) -> { - subscribe(k, v.getQoS(), v.getConsumer()); - }); - consumeTask(); - } - //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 - //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 - //服务端重发消息的情况。 - if (!clientConfigure.isCleanSession()) { - //todo - } - consumer.accept(mqttConnAckMessage); - connected = true; - }; + this.connectConsumer = consumer; //启动心跳插件 if (clientConfigure.getKeepAliveInterval() > 0) { - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() { + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() { @Override - public void run() { + public void execute() { //客户端发送了 PINGREQ 报文之后,如果在合理的时间内仍没有收到 PINGRESP 报文, // 它应该关闭到服务端的网络连接。 if (pingTimeout) { pingTimeout = false; client.shutdown(); } - if (session.isInvalid()) { + if (session == null || session.isInvalid()) { + if (client != null) { + client.shutdown(); + client = null; + } if (clientConfigure.isAutomaticReconnect()) { LOGGER.warn("mqtt client is disconnect, try to reconnect..."); connect(asynchronousChannelGroup, reconnectConsumer == null ? consumer : reconnectConsumer); @@ -214,9 +220,9 @@ public class MqttClient extends AbstractSession { long delay = System.currentTimeMillis() - getLatestSendMessageTime() - clientConfigure.getKeepAliveInterval() * 1000L; //gap 10ms if (delay > -10) { + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS); MqttPingReqMessage pingReqMessage = new MqttPingReqMessage(); write(pingReqMessage); - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS); } else { QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, -delay, TimeUnit.MILLISECONDS); } @@ -310,8 +316,9 @@ public class MqttClient extends AbstractSession { unsubscribeBuilder.properties(properties); } // wait ack message. - CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder, () -> registeredTasks.offer(() -> unsubscribe0(topics))); + CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder); if (future == null) { + registeredTasks.offer(() -> unsubscribe0(topics)); return; } future.whenComplete((message, throwable) -> { @@ -360,12 +367,7 @@ public class MqttClient extends AbstractSession { } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - CompletableFuture> future = getInflightQueue().offer(subscribeBuilder, new Runnable() { - @Override - public void run() { - - } - }); + CompletableFuture> future = getInflightQueue().offer(subscribeBuilder); if (future == null) { registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); return; @@ -397,7 +399,28 @@ public class MqttClient extends AbstractSession { } public void notifyResponse(MqttConnAckMessage connAckMessage) { + if (!clientConfigure.isAutomaticReconnect()) { + gcConfigure(); + } + + //连接成功,注册订阅消息 + if (connAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { + setInflightQueue(new InflightQueue(this, 16)); + connected = true; + //重连情况下重新触发订阅逻辑 + subscribes.forEach((k, v) -> { + subscribe(k, v.getQoS(), v.getConsumer()); + }); + consumeTask(); + } + //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 + //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 + //服务端重发消息的情况。 + if (!clientConfigure.isCleanSession()) { + //todo + } connectConsumer.accept(connAckMessage); + connected = true; } @@ -493,6 +516,7 @@ public class MqttClient extends AbstractSession { clientConfigure.setAutomaticReconnect(false); disconnect = true; client.shutdown(); + client = null; } public void setReconnectConsumer(Consumer reconnectConsumer) { diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 2a2c23d2c2580a00d988208b2f9448fd8f9a8c46..e94e85134d224334dbedb035d70814116233078d 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.21 + 0.22 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 9859587effad42794871fdf5ccd7b9b51e913b81..846240e7f81a47744cafbba066b5875fa7d2dd6c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; /** * @author 三刀(zhengjunweimail@163.com) @@ -80,12 +81,12 @@ public class InflightQueue { } public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) { - return offer(publishBuilder, () -> { + return offer(publishBuilder, mqttPacketIdentifierMessage -> { }); } - public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) { + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { final ReentrantLock lock = this.lock; lock.lock(); try { @@ -94,7 +95,7 @@ public class InflightQueue { if (i < 0) { i = queue.length - 1; } - queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run()); + queue[i].getFuture().thenAccept(consumer); return null; } else { return enqueue(publishBuilder); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java index f3e02c4562719686cfb82dfa560d24e611957672..05fb32def09be827015098c72771a8e5362b083b 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java @@ -31,11 +31,6 @@ public class EventType { //连接断开 public static final EventType DISCONNECT = new EventType<>("disconnect"); - /** - * Broker推送消息至客户端 - */ - public static final EventType PUSH_PUBLISH_MESSAGE = new EventType<>("pushPublishMessage"); - /** * 接收到客户端发送的任何消息 */