diff --git a/dashboard/package.json b/dashboard/package.json
index 895897027d35ced40e031149b5ca2ae81a989e1d..677d7af0486a9f7a0b7597e02781e2d5d2c33549 100644
--- a/dashboard/package.json
+++ b/dashboard/package.json
@@ -8,7 +8,7 @@
"serve": "vite preview"
},
"dependencies": {
- "@layui/layui-vue": "1.11.4",
+ "@layui/layui-vue": "2.3.0",
"axios": "^1.2.1",
"chart.js": "^4.2.1",
"echarts": "^5.4.1",
diff --git a/dashboard/src/mockjs/user.ts b/dashboard/src/mockjs/user.ts
index 6969196fd5a92c7ee023495545c87bdb7afba11f..e741887af4c0ce7557139ca47e3f73189e6230b6 100644
--- a/dashboard/src/mockjs/user.ts
+++ b/dashboard/src/mockjs/user.ts
@@ -62,6 +62,11 @@ const menus = [
title: "用户",
icon:"layui-icon-group"
},
+ {
+ id: "/system/cluster",
+ title: "集群管理",
+ icon: "layui-icon-slider"
+ },
{
id: "/system/setting",
title: "设置",
diff --git a/dashboard/src/router/module/base-routes.ts b/dashboard/src/router/module/base-routes.ts
index 4047a9df0cfdce3d752651cb7d388c88a9c2e1f3..5ad8e03ee662f45ee48f322ef312779084ee1662 100644
--- a/dashboard/src/router/module/base-routes.ts
+++ b/dashboard/src/router/module/base-routes.ts
@@ -84,6 +84,11 @@ export default [
component: () => import('../../views/System/user.vue'),
meta: {title: '用户', requireAuth: true},
},
+ {
+ path: '/system/cluster',
+ component: () => import('../../views/System/cluster.vue'),
+ meta: {title: '集群管理', requireAuth: true},
+ },
{
path: '/system/setting',
component: () => import('../../views/System/setting.vue'),
diff --git a/dashboard/src/views/System/cluster.vue b/dashboard/src/views/System/cluster.vue
new file mode 100644
index 0000000000000000000000000000000000000000..f4d53f6cfe12f26becc59069526c5815da594ed5
--- /dev/null
+++ b/dashboard/src/views/System/cluster.vue
@@ -0,0 +1,211 @@
+
+
+
+
+
+ {{ data.name }}
+
+
+
+
+
+ 运行中
+
+
+
+ 已离线
+
+
+
+ 未知
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dashboard/vite.config.ts b/dashboard/vite.config.ts
index e00bebd66ac319efd365b09ffe4f83094c8512ac..e13d05e73dad8be5a2853ac9855d23ecbc3160b8 100644
--- a/dashboard/vite.config.ts
+++ b/dashboard/vite.config.ts
@@ -11,8 +11,8 @@ export default defineConfig({
server:{
proxy:{
'/api': {
- target: 'http://127.0.0.1:18083/api/',
- // target: 'http://82.157.162.230:8083/api/',
+ // target: 'http://127.0.0.1:18083/api/',
+ target: 'http://82.157.162.230:8083/api/',
changeOrigin: true,
rewrite: path => path.replace(/^\/api/, '')
}
diff --git a/plugins/pom.xml b/plugins/pom.xml
index bee091216462c50f10872c5e2ef5c1d2c60dc838..8507d7b91c8d46441c9fbb708975f879f3b2966a 100644
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -16,7 +16,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.21
+ 0.22
../pom.xml
pom
diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml
index f4e16694b6ad5b6f2d9a1e9c84def2e413a8d367..25c88e241a08468b9fbbe0a8e47517d74b331a1c 100644
--- a/plugins/redis-bridge-plugin/pom.xml
+++ b/plugins/redis-bridge-plugin/pom.xml
@@ -3,7 +3,7 @@
plugins
org.smartboot.mqtt
- 0.21
+ 0.22
../pom.xml
4.0.0
diff --git a/pom.xml b/pom.xml
index 6e52628a2764665e6846beeb173b3cd5bfafc3f5..f5c21f88f006c17baeaa1cc9095c5f220bde9adf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,14 +4,14 @@
org.smartboot.mqtt
smart-mqtt
smart-mqtt
- 0.21
+ 0.22
4.0.0
mqtt broker
- 0.21
- 1.5.29
- 1.1.22
+ 0.22
+ 1.5.30
+ 1.2.4
2.6
4.3
4.13.2
diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml
index aacc07888dd035fbdb3ae17357429d74f670f55f..5a2e35b20cc4771b5c5026fec85ea32138feef88 100644
--- a/smart-mqtt-broker/pom.xml
+++ b/smart-mqtt-broker/pom.xml
@@ -5,7 +5,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.21
+ 0.22
../pom.xml
4.0.0
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
index 408aa30e2b43cdd674a07353d5dd388bbab7f51e..d1f2213f3a285488fb798c85b8ab911d165936db 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
@@ -12,8 +12,10 @@ package org.smartboot.mqtt.broker;
import org.smartboot.mqtt.common.ToString;
import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.extension.plugins.Plugin;
+import java.nio.channels.AsynchronousChannelGroup;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -42,7 +44,7 @@ public class BrokerConfigure extends ToString {
/**
* 当前smart-mqtt
*/
- public static final String VERSION = "v0.21";
+ public static final String VERSION = "v0.22";
static final Map SystemEnvironments = new HashMap<>();
@@ -106,6 +108,10 @@ public class BrokerConfigure extends ToString {
private final List> plugins = new LinkedList<>();
+ private AsynchronousChannelGroup channelGroup;
+
+ private BufferPagePool bufferPagePool;
+
public int getPort() {
return port;
}
@@ -203,6 +209,22 @@ public class BrokerConfigure extends ToString {
return plugins;
}
+ public AsynchronousChannelGroup getChannelGroup() {
+ return channelGroup;
+ }
+
+ public void setChannelGroup(AsynchronousChannelGroup channelGroup) {
+ this.channelGroup = channelGroup;
+ }
+
+ public BufferPagePool getBufferPagePool() {
+ return bufferPagePool;
+ }
+
+ public void setBufferPagePool(BufferPagePool bufferPagePool) {
+ this.bufferPagePool = bufferPagePool;
+ }
+
@Override
public String toString() {
return "BrokerConfigure{" +
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
index 1e68393344379b50b4def018e9ae3b9f2992a931..8e3fb139e3fe5f07370c1f3f72ac2abe5282fd13 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
@@ -92,4 +92,8 @@ public interface BrokerContext {
TopicPublishTree getPublishTopicTree();
TopicSubscribeTree getTopicSubscribeTree();
+
+ void bundle(String key, T resource);
+
+ T getBundle(String key);
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
index bdf8b6fade4e74403fa5accc3c6c2f847d6873d2..c6da2bfab548b79cd104b78d897932a1fe620d82 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
@@ -72,7 +72,6 @@ import org.yaml.snakeyaml.Yaml;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
-import java.nio.channels.AsynchronousChannelGroup;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -80,6 +79,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -132,15 +132,13 @@ public class BrokerContextImpl implements BrokerContext {
* Broker Server
*/
private AioQuickServer server;
- private BufferPagePool pagePool;
private final MqttBrokerMessageProcessor processor = new MqttBrokerMessageProcessor(this);
//配置文件内容
private String configJson;
private final static BrokerTopic SHUTDOWN_TOPIC = new BrokerTopic("");
- private AsynchronousChannelGroup asynchronousChannelGroup;
-
+ private final Map resources = new Hashtable<>();
private final Map, MqttProcessor>> processors;
{
@@ -173,20 +171,11 @@ public class BrokerContextImpl implements BrokerContext {
try {
- asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
- int i;
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "smart-mqtt-broker-" + (++i));
- }
- });
- pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true);
brokerConfigure.addPlugin(new QosRetryPlugin());
brokerConfigure.getPlugins().forEach(processor::addPlugin);
server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor);
- server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum()));
- server.start(asynchronousChannelGroup);
+ server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(brokerConfigure.getBufferPagePool()).setThreadNum(Math.max(2, brokerConfigure.getThreadNum()));
+ server.start(brokerConfigure.getChannelGroup());
System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")");
System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt");
System.out.println("Github: https://github.com/smartboot/smart-mqtt");
@@ -360,7 +349,7 @@ public class BrokerContextImpl implements BrokerContext {
}
});
- eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object.getTopicToken(), (session, topicFilterSubscriber) -> {
+ eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object, (session, topicFilterSubscriber) -> {
if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) {
return;
}
@@ -416,6 +405,15 @@ public class BrokerContextImpl implements BrokerContext {
brokerConfigure.setHost("0.0.0.0");
}
+ brokerConfigure.setChannelGroup(new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
+ int i;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "smart-mqtt-broker-" + (++i));
+ }
+ }));
+ brokerConfigure.setBufferPagePool(new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true));
// System.out.println("brokerConfigure: " + brokerConfigure);
}
@@ -527,6 +525,16 @@ public class BrokerContextImpl implements BrokerContext {
return subscribeTopicTree;
}
+ @Override
+ public void bundle(String key, T resource) {
+ resources.put(key, resource);
+ }
+
+ @Override
+ public T getBundle(String key) {
+ return (T) resources.get(key);
+ }
+
public void loadYamlConfig() throws IOException {
String brokerConfig = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig);
InputStream inputStream = null;
@@ -561,8 +569,8 @@ public class BrokerContextImpl implements BrokerContext {
pushTopicQueue.offer(SHUTDOWN_TOPIC);
pushThreadPool.shutdown();
server.shutdown();
- asynchronousChannelGroup.shutdown();
- pagePool.release();
+ brokerConfigure.getChannelGroup().shutdown();
+ brokerConfigure.getBufferPagePool().release();
//卸载插件
plugins.forEach(Plugin::uninstall);
plugins.clear();
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
index 02a990fcbdeeb3226bbf681b34cd86a7a5a22ae1..857f8eac867a374961e0ad1bc3ce6cc20e143f12 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
@@ -158,6 +158,9 @@ public class MqttSession extends AbstractSession {
}
public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) {
+ if (!mqttContext.getProviders().getSubscribeProvider().matchTopic(topic, this)) {
+ return;
+ }
TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this);
if (topicSubscriber != null) {
//此前的订阅关系
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
index 09378fd580f6cd43bb63f8716df9b1eb2d20d362..be2e1b77c9b36e0e0f5281b89a80240b28920b17 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
@@ -19,7 +19,6 @@ import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
-import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
@@ -79,27 +78,22 @@ public class TopicSubscriber {
return;
}
semaphore.release();
- int i = 16;
- while (publishAvailable(brokerContext)) {
- if (i-- == 0) {
- if (semaphore.tryAcquire()) {
- topic.getQueue().offer(this);
- topic.getVersion().incrementAndGet();
- }
- break;
- }
- }
+ publishAvailable(brokerContext);
mqttSession.flush();
}
- private boolean publishAvailable(BrokerContext brokerContext) {
+ private void publishAvailable(BrokerContext brokerContext) {
PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider();
PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset);
if (persistenceMessage == null) {
if (semaphore.tryAcquire()) {
topic.getQueue().offer(this);
+ if (persistenceProvider.get(topic.getTopic(), nextConsumerOffset) != null) {
+ topic.getVersion().incrementAndGet();
+ brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic);
+ }
}
- return false;
+ return;
}
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic());
@@ -110,22 +104,22 @@ public class TopicSubscriber {
InflightQueue inflightQueue = mqttSession.getInflightQueue();
long offset = persistenceMessage.getOffset();
nextConsumerOffset = offset + 1;
- brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession);
//Qos0直接发送
if (mqttQoS == MqttQoS.AT_MOST_ONCE) {
mqttSession.write(publishBuilder.build(), false);
- return true;
+ publishAvailable(brokerContext);
+ return;
}
- CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> {
+ CompletableFuture> future = inflightQueue.offer(publishBuilder, mqttPacketIdentifierMessage -> {
if (semaphore.tryAcquire()) {
- topic.getQueue().offer(this);
+ topic.getQueue().offer(TopicSubscriber.this);
topic.getVersion().incrementAndGet();
}
brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic);
});
if (future == null) {
- return false;
+ return;
}
future.whenComplete((mqttPacketIdentifierMessage, throwable) -> {
//最早发送的消息若收到响应,则更新点位
@@ -137,7 +131,7 @@ public class TopicSubscriber {
publishAvailable(brokerContext);
});
- return true;
+ publishAvailable(brokerContext);
}
public BrokerTopic getTopic() {
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java
index 0a4b9feda17853b73eccef1ed51eee297e6e5ef0..e475b402ef8e02ceca66f861aab3d4f262f99465 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java
@@ -24,7 +24,8 @@ public class Providers {
private PersistenceProvider retainMessageProvider = new MemoryPersistenceProvider();
private PersistenceProvider persistenceProvider = new MemoryPersistenceProvider();
- private SubscribeProvider subscribeProvider = (topicFilter, session) -> true;
+ private SubscribeProvider subscribeProvider = new SubscribeProvider() {
+ };
/**
* OpenAPI 处理器
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
index 21b517b4d11a8c170882d8f7527f52118b1577ad..de44e37b54ed4b1a9baf8594ef104e65e25c46d3 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
@@ -10,6 +10,7 @@
package org.smartboot.mqtt.broker.provider;
+import org.smartboot.mqtt.broker.BrokerTopic;
import org.smartboot.mqtt.broker.MqttSession;
/**
@@ -19,5 +20,13 @@ import org.smartboot.mqtt.broker.MqttSession;
* @version V1.0 , 2022/12/28
*/
public interface SubscribeProvider {
- boolean subscribeTopic(String topicFilter, MqttSession session);
+ default boolean subscribeTopic(String topicFilter, MqttSession session) {
+ //4.7.2 应用不能使用 $ 字符开头的主题
+ return !topicFilter.startsWith("$");
+ }
+
+ default boolean matchTopic(BrokerTopic brokerTopic, MqttSession session) {
+ return !brokerTopic.getTopic().startsWith("$SYS/");
+ }
+
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java
index 1ab0df22f8732e3756f44a638c20456bc5220e68..2bab99d2489d2111b1c7607d3d6431d68db911df 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java
@@ -10,6 +10,7 @@
package org.smartboot.mqtt.broker.topic;
+import org.smartboot.mqtt.broker.BrokerTopic;
import org.smartboot.mqtt.broker.MqttSession;
import org.smartboot.mqtt.broker.TopicFilterSubscriber;
import org.smartboot.mqtt.common.TopicToken;
@@ -49,8 +50,11 @@ public class TopicSubscribeTree {
subscribeTree.subscribers.remove(session);
}
+ public void match(BrokerTopic topicToken, BiConsumer consumer) {
+ match(topicToken.getTopicToken(), consumer);
+ }
- public void match(TopicToken topicToken, BiConsumer consumer) {
+ private void match(TopicToken topicToken, BiConsumer consumer) {
//精确匹配
TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode());
if (subscribeTree != null) {
diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml
index aa4dfbf9cd12a4aad0506a89a2763d9f4ebb1af4..68fcd766981766d5abbad959872f613a76a7da45 100644
--- a/smart-mqtt-client/pom.xml
+++ b/smart-mqtt-client/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.21
+ 0.22
../pom.xml
4.0.0
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
index c4160ddb4f2409b70d23c0d6f68f0687f21aaac6..715c4255104c33fe9b582594f645e1db6121902f 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
@@ -11,9 +11,11 @@
package org.smartboot.mqtt.client;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.AbstractSession;
+import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.DefaultMqttWriter;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.QosRetryPlugin;
@@ -46,6 +48,7 @@ import org.smartboot.mqtt.common.message.variable.properties.SubscribeProperties
import org.smartboot.mqtt.common.message.variable.properties.WillProperties;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.util.MqttMessageBuilders;
+import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
@@ -112,14 +115,35 @@ public class MqttClient extends AbstractSession {
private Consumer reconnectConsumer;
private boolean pingTimeout = false;
+ public MqttClient(String uri) {
+ this(uri, MqttUtil.createClientId());
+ }
+
+ public MqttClient(String uri, String clientId) {
+ this(uri, clientId, MqttVersion.MQTT_3_1_1);
+ }
+
public MqttClient(String host, int port, String clientId) {
this(host, port, clientId, MqttVersion.MQTT_3_1_1);
}
public MqttClient(String host, int port, String clientId, MqttVersion mqttVersion) {
+ this("mqtt://" + host + ":" + port, clientId, mqttVersion);
+ }
+
+ public MqttClient(String uri, String clientId, MqttVersion mqttVersion) {
super(new EventBusImpl(EventType.types()));
- clientConfigure.setHost(host);
- clientConfigure.setPort(port);
+
+ String[] array = uri.split(":");
+ if (array[0].equals("mqtts")) {
+ clientConfigure.setHost(array[1].substring(2));
+ //加密通信
+ } else if (array[0].equals("mqtt")) {
+ clientConfigure.setHost(array[1].substring(2));
+ } else {
+ throw new IllegalStateException("invalid URI Scheme, uri: " + uri);
+ }
+ clientConfigure.setPort(NumberUtils.toInt(array[2]));
clientConfigure.setMqttVersion(mqttVersion);
this.clientId = clientId;
//ping-pong消息超时监听
@@ -135,6 +159,7 @@ public class MqttClient extends AbstractSession {
});
}
+
public void connect() {
try {
asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(2, new ThreadFactory() {
@@ -169,42 +194,23 @@ public class MqttClient extends AbstractSession {
public void connect(AsynchronousChannelGroup asynchronousChannelGroup, BufferPagePool bufferPagePool, Consumer consumer) {
//设置 connect ack 回调事件
- this.connectConsumer = mqttConnAckMessage -> {
- if (!clientConfigure.isAutomaticReconnect()) {
- gcConfigure();
- }
-
- //连接成功,注册订阅消息
- if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
- setInflightQueue(new InflightQueue(this, 16));
- connected = true;
- //重连情况下重新触发订阅逻辑
- subscribes.forEach((k, v) -> {
- subscribe(k, v.getQoS(), v.getConsumer());
- });
- consumeTask();
- }
- //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发
- //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或
- //服务端重发消息的情况。
- if (!clientConfigure.isCleanSession()) {
- //todo
- }
- consumer.accept(mqttConnAckMessage);
- connected = true;
- };
+ this.connectConsumer = consumer;
//启动心跳插件
if (clientConfigure.getKeepAliveInterval() > 0) {
- QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() {
+ QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() {
@Override
- public void run() {
+ public void execute() {
//客户端发送了 PINGREQ 报文之后,如果在合理的时间内仍没有收到 PINGRESP 报文,
// 它应该关闭到服务端的网络连接。
if (pingTimeout) {
pingTimeout = false;
client.shutdown();
}
- if (session.isInvalid()) {
+ if (session == null || session.isInvalid()) {
+ if (client != null) {
+ client.shutdown();
+ client = null;
+ }
if (clientConfigure.isAutomaticReconnect()) {
LOGGER.warn("mqtt client is disconnect, try to reconnect...");
connect(asynchronousChannelGroup, reconnectConsumer == null ? consumer : reconnectConsumer);
@@ -214,9 +220,9 @@ public class MqttClient extends AbstractSession {
long delay = System.currentTimeMillis() - getLatestSendMessageTime() - clientConfigure.getKeepAliveInterval() * 1000L;
//gap 10ms
if (delay > -10) {
+ QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS);
MqttPingReqMessage pingReqMessage = new MqttPingReqMessage();
write(pingReqMessage);
- QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS);
} else {
QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, -delay, TimeUnit.MILLISECONDS);
}
@@ -310,8 +316,9 @@ public class MqttClient extends AbstractSession {
unsubscribeBuilder.properties(properties);
}
// wait ack message.
- CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder, () -> registeredTasks.offer(() -> unsubscribe0(topics)));
+ CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder);
if (future == null) {
+ registeredTasks.offer(() -> unsubscribe0(topics));
return;
}
future.whenComplete((message, throwable) -> {
@@ -360,12 +367,7 @@ public class MqttClient extends AbstractSession {
}
MqttSubscribeMessage subscribeMessage = subscribeBuilder.build();
- CompletableFuture> future = getInflightQueue().offer(subscribeBuilder, new Runnable() {
- @Override
- public void run() {
-
- }
- });
+ CompletableFuture> future = getInflightQueue().offer(subscribeBuilder);
if (future == null) {
registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer));
return;
@@ -397,7 +399,28 @@ public class MqttClient extends AbstractSession {
}
public void notifyResponse(MqttConnAckMessage connAckMessage) {
+ if (!clientConfigure.isAutomaticReconnect()) {
+ gcConfigure();
+ }
+
+ //连接成功,注册订阅消息
+ if (connAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
+ setInflightQueue(new InflightQueue(this, 16));
+ connected = true;
+ //重连情况下重新触发订阅逻辑
+ subscribes.forEach((k, v) -> {
+ subscribe(k, v.getQoS(), v.getConsumer());
+ });
+ consumeTask();
+ }
+ //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发
+ //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或
+ //服务端重发消息的情况。
+ if (!clientConfigure.isCleanSession()) {
+ //todo
+ }
connectConsumer.accept(connAckMessage);
+ connected = true;
}
@@ -493,6 +516,7 @@ public class MqttClient extends AbstractSession {
clientConfigure.setAutomaticReconnect(false);
disconnect = true;
client.shutdown();
+ client = null;
}
public void setReconnectConsumer(Consumer reconnectConsumer) {
diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml
index 2a2c23d2c2580a00d988208b2f9448fd8f9a8c46..e94e85134d224334dbedb035d70814116233078d 100644
--- a/smart-mqtt-common/pom.xml
+++ b/smart-mqtt-common/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.21
+ 0.22
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
index 9859587effad42794871fdf5ccd7b9b51e913b81..846240e7f81a47744cafbba066b5875fa7d2dd6c 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
/**
* @author 三刀(zhengjunweimail@163.com)
@@ -80,12 +81,12 @@ public class InflightQueue {
}
public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) {
- return offer(publishBuilder, () -> {
+ return offer(publishBuilder, mqttPacketIdentifierMessage -> {
});
}
- public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) {
+ public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
@@ -94,7 +95,7 @@ public class InflightQueue {
if (i < 0) {
i = queue.length - 1;
}
- queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run());
+ queue[i].getFuture().thenAccept(consumer);
return null;
} else {
return enqueue(publishBuilder);
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java
index f3e02c4562719686cfb82dfa560d24e611957672..05fb32def09be827015098c72771a8e5362b083b 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventType.java
@@ -31,11 +31,6 @@ public class EventType {
//连接断开
public static final EventType DISCONNECT = new EventType<>("disconnect");
- /**
- * Broker推送消息至客户端
- */
- public static final EventType PUSH_PUBLISH_MESSAGE = new EventType<>("pushPublishMessage");
-
/**
* 接收到客户端发送的任何消息
*/