diff --git a/dashboard/components.d.ts b/dashboard/components.d.ts
index a3e444698b9f33b33bab831bd2af74a49e5b4248..990f1edbbae2aa74f40dc1f9039e3873604a22d9 100644
--- a/dashboard/components.d.ts
+++ b/dashboard/components.d.ts
@@ -10,6 +10,7 @@ declare module '@vue/runtime-core' {
LayAffix: typeof import('@layui/layui-vue')['LayAffix']
LayAvatar: typeof import('@layui/layui-vue')['LayAvatar']
LayBadge: typeof import('@layui/layui-vue')['LayBadge']
+ LayBarcode: typeof import('@layui/layui-vue')['LayBarcode']
LayBody: typeof import('@layui/layui-vue')['LayBody']
LayBreadcrumb: typeof import('@layui/layui-vue')['LayBreadcrumb']
LayBreadcrumbItem: typeof import('@layui/layui-vue')['LayBreadcrumbItem']
@@ -39,6 +40,7 @@ declare module '@vue/runtime-core' {
LayMenuItem: typeof import('@layui/layui-vue')['LayMenuItem']
LayPanel: typeof import('@layui/layui-vue')['LayPanel']
LayProgress: typeof import('@layui/layui-vue')['LayProgress']
+ LayQrcode: typeof import('@layui/layui-vue')['LayQrcode']
LayResult: typeof import('@layui/layui-vue')['LayResult']
LayRow: typeof import('@layui/layui-vue')['LayRow']
LayScroll: typeof import('@layui/layui-vue')['LayScroll']
diff --git a/dashboard/vite.config.ts b/dashboard/vite.config.ts
index b3fba4182710bac688202f3b7789c930f8016691..e13d05e73dad8be5a2853ac9855d23ecbc3160b8 100644
--- a/dashboard/vite.config.ts
+++ b/dashboard/vite.config.ts
@@ -11,7 +11,8 @@ export default defineConfig({
server:{
proxy:{
'/api': {
- target: 'http://127.0.0.1:18083/api/',
+ // target: 'http://127.0.0.1:18083/api/',
+ target: 'http://82.157.162.230:8083/api/',
changeOrigin: true,
rewrite: path => path.replace(/^\/api/, '')
}
diff --git a/docker-compose.yml b/docker-compose.yml
index 6e8aacf41b61a16d79e6b61f286f4e1dcce50b1d..66659d7a342b3c2ffc645be2d78fd0720c3ee5ab 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -53,6 +53,6 @@ services:
options:
max-size: "100m"
max-file: "1"
- command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
-# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish
+ command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
+# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish
version: '3.7'
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 55e604a72b255369e3828f36a5a59e6f8002ac38..8e5c86a003feedec69a977d10c0984f3e0f439ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,12 +4,12 @@
org.smartboot.mqtt
smart-mqtt
smart-mqtt
- 0.18
+ 0.19
4.0.0
mqtt broker
- 0.18
+ 0.19
1.5.26
1.1.22
2.6
diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml
index 720ebb4d95e282180cf0bee95fbfd9643d3c1112..238477629a38cac025e9ec397ccc3e4eb4700982 100644
--- a/smart-mqtt-broker/pom.xml
+++ b/smart-mqtt-broker/pom.xml
@@ -5,7 +5,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.18
+ 0.19
../pom.xml
4.0.0
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
index 223c0e2b75bc730bfdacb7b5ef296ad2b47f8178..f26c8480823b55771262995a55e1c11a89e671b7 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
@@ -38,7 +38,7 @@ public class BrokerConfigure extends ToString {
/**
* 当前smart-mqtt
*/
- public static final String VERSION = "v0.18";
+ public static final String VERSION = "v0.19";
static final Map SystemEnvironments = new HashMap<>();
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
index 773d7036172c75aadb1d68c9648d57a20b8b28a8..e0239b09012b0ea582ee3cedf0e595fd88053c10 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
@@ -17,6 +17,7 @@ import org.smartboot.mqtt.broker.processor.DisConnectProcessor;
import org.smartboot.mqtt.broker.processor.MqttAckProcessor;
import org.smartboot.mqtt.broker.processor.MqttProcessor;
import org.smartboot.mqtt.broker.processor.PingReqProcessor;
+import org.smartboot.mqtt.broker.processor.PubRelProcessor;
import org.smartboot.mqtt.broker.processor.PublishProcessor;
import org.smartboot.mqtt.broker.processor.SubscribeProcessor;
import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor;
@@ -68,10 +69,11 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor());
- processorMap.put(MqttPubRelMessage.class, new MqttAckProcessor<>());
+ processorMap.put(MqttPubRelMessage.class, new PubRelProcessor());
processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor<>());
processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor<>());
processorMap.put(MqttDisconnectMessage.class, new DisConnectProcessor());
+// addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512));
}
public MqttBrokerMessageProcessor(BrokerContext mqttContext) {
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
index 22d8ee88bde4f35f8101caec03696745623d3ed9..6196f641dce5bdf881c4e0443fc5be0674577bf1 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
@@ -21,6 +21,6 @@ import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
public class MqttAckProcessor extends AuthorizedMqttProcessor {
@Override
public void process0(BrokerContext context, MqttSession session, T t) {
- session.notifyResponse(t);
+ session.getInflightQueue().notify(t);
}
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..7b8573e20e2e02279845bf8ae259a8379b4f3a3a
--- /dev/null
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+
+package org.smartboot.mqtt.broker.processor;
+
+import org.smartboot.mqtt.broker.BrokerContext;
+import org.smartboot.mqtt.broker.MqttSession;
+import org.smartboot.mqtt.common.message.MqttPubCompMessage;
+import org.smartboot.mqtt.common.message.MqttPubRelMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
+import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
+
+/**
+ * @author 三刀(zhengjunweimail@163.com)
+ * @version V1.0 , 4/22/23
+ */
+public class PubRelProcessor extends AuthorizedMqttProcessor {
+ @Override
+ public void process0(BrokerContext context, MqttSession session, MqttPubRelMessage message) {
+ //发送pubRel消息。
+ //todo
+ MqttPubQosVariableHeader qosVariableHeader;
+ //todo
+ byte code = 0;
+ if (code != 0) {
+ ReasonProperties properties = new ReasonProperties();
+ qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
+ qosVariableHeader.setReasonCode(code);
+ } else {
+ qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), null);
+ }
+ MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader);
+ session.write(pubRelMessage, false);
+ session.notifyPubComp(message.getVariableHeader().getPacketId());
+ }
+}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
index eaf5f6f1edef54c630636ebb88f5b83042e9c8a0..bdd518f14642bfe8d12fc6becbb3a59292e263ae 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
@@ -21,13 +21,10 @@ import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttReasonCode;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttPubAckMessage;
-import org.smartboot.mqtt.common.message.MqttPubCompMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
-import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
-import org.smartboot.mqtt.common.util.ValidateUtils;
/**
* 发布Topic
@@ -112,24 +109,6 @@ public class PublishProcessor extends AuthorizedMqttProcessor {
- ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message");
- //发送pubRel消息。
- //todo
- MqttPubQosVariableHeader qosVariableHeader;
- //todo
- byte code = 0;
- if (code != 0) {
- ReasonProperties properties = new ReasonProperties();
- qosVariableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), properties);
- qosVariableHeader.setReasonCode(code);
- } else {
- qosVariableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), null);
- }
- MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader);
- session.write(pubRelMessage, false);
- // 消息投递至消息总线
- context.getEventBus().publish(ServerEventType.RECEIVE_PUBLISH_MESSAGE, EventObject.newEventObject(session, mqttPublishMessage));
- });
+ session.write(pubRecMessage, () -> context.getEventBus().publish(ServerEventType.RECEIVE_PUBLISH_MESSAGE, EventObject.newEventObject(session, mqttPublishMessage)));
}
}
diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml
index 6350be7a88bbd8302f9e726ae31454c91906bd2e..1da20a7d84c1069540582c412cf581778e8b4d67 100644
--- a/smart-mqtt-client/pom.xml
+++ b/smart-mqtt-client/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.18
+ 0.19
../pom.xml
4.0.0
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
index cb87d9b03be49ff9522a264e072fb0ecc5adf9b6..b15876b6258c72556a0efc6d6447efed0427feee 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
@@ -16,6 +16,7 @@ import org.smartboot.mqtt.client.processor.ConnAckProcessor;
import org.smartboot.mqtt.client.processor.MqttAckProcessor;
import org.smartboot.mqtt.client.processor.MqttPingRespProcessor;
import org.smartboot.mqtt.client.processor.MqttProcessor;
+import org.smartboot.mqtt.client.processor.PubRelProcessor;
import org.smartboot.mqtt.client.processor.PublishProcessor;
import org.smartboot.mqtt.common.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
@@ -50,7 +51,7 @@ public class MqttClientProcessor extends AbstractMessageProcessor {
processors.put(MqttPublishMessage.class, new PublishProcessor());
processors.put(MqttPubRecMessage.class, new MqttAckProcessor());
processors.put(MqttPubCompMessage.class, new MqttAckProcessor());
- processors.put(MqttPubRelMessage.class, new MqttAckProcessor());
+ processors.put(MqttPubRelMessage.class, new PubRelProcessor());
processors.put(MqttSubAckMessage.class, new MqttAckProcessor());
processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor());
}
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java
index cb398df7d0c9bfb099a6026ae0cf911a7b492a81..302f4fee711a48d92cb29e7f28c8279d488461a6 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java
@@ -20,6 +20,6 @@ import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
public class MqttAckProcessor implements MqttProcessor {
@Override
public void process(MqttClient mqttClient, T message) {
- mqttClient.notifyResponse(message);
+ mqttClient.getInflightQueue().notify(message);
}
}
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..fd8dfb9e89470b73034925e1c08d34f8b88aaa0a
--- /dev/null
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+
+package org.smartboot.mqtt.client.processor;
+
+import org.smartboot.mqtt.client.MqttClient;
+import org.smartboot.mqtt.common.message.MqttPubCompMessage;
+import org.smartboot.mqtt.common.message.MqttPubRelMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
+import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
+
+/**
+ * @author 三刀(zhengjunweimail@163.com)
+ * @version V1.0 , 4/22/23
+ */
+public class PubRelProcessor implements MqttProcessor {
+
+ @Override
+ public void process(MqttClient mqttClient, MqttPubRelMessage message) {
+ //发送pubRel消息。
+ //todo
+ MqttPubQosVariableHeader qosVariableHeader;
+ //todo
+ byte code = 0;
+ if (code != 0) {
+ ReasonProperties properties = new ReasonProperties();
+ qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
+ qosVariableHeader.setReasonCode(code);
+ } else {
+ qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), null);
+ }
+ MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader);
+ mqttClient.write(pubRelMessage, false);
+ mqttClient.notifyPubComp(message.getVariableHeader().getPacketId());
+ }
+}
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
index f4559f9cc928eef254523d3c57f83b063b8d0611..3b053b0718d41ca3ac91eee311f8014845e39654 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
@@ -18,15 +18,12 @@ import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttPubAckMessage;
-import org.smartboot.mqtt.common.message.MqttPubCompMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
-import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import org.smartboot.mqtt.common.util.MqttUtil;
-import org.smartboot.mqtt.common.util.ValidateUtils;
/**
* 发布Topic
@@ -104,19 +101,7 @@ public class PublishProcessor implements MqttProcessor {
MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(messageId, properties);
MqttPubRecMessage pubRecMessage = new MqttPubRecMessage(variableHeader);
- session.write(pubRecMessage, message -> {
- ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message");
- //todo
- ReasonProperties reasonProperties = null;
- if (mqttPublishMessage.getVersion() == MqttVersion.MQTT_5) {
- reasonProperties = new ReasonProperties();
- }
- MqttPubQosVariableHeader qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), reasonProperties);
- MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader);
- session.write(pubRelMessage, false);
-
- processPublishMessage(mqttPublishMessage, session);
- });
+ session.write(pubRecMessage, () -> processPublishMessage(mqttPublishMessage, session));
}
}
diff --git a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java
index e29e0080ed1aaeda946fd4458f7f9afc4a88f3bd..e7a7de8c8a5ccc5a10fbc24c744340c4c3f56fab 100644
--- a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java
+++ b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java
@@ -31,20 +31,22 @@ public class MqttClientBootstrap {
//订阅主题
client.subscribe("test", MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> {
System.out.println("subscribe message:" + new String(publishMessage.getPayload().getPayload()));
+ }, (mqttClient, mqttQoS) -> {
+ //最多分发一次
+ client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
+ //至少分发一次
+ client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
+ //只分发一次
+ client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
});
client.subscribe("test/#", MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> {
System.out.println("subscribe test/# message:" + new String(publishMessage.getPayload().getPayload()));
+ }, (mqttClient, mqttQoS) -> {
+ //只分发一次
+ client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
});
- //最多分发一次
- client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
- //至少分发一次
- client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
- //只分发一次
- client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
- //只分发一次
- client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
}
}
diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml
index c8d8bc1ca84b60ed6048a499892d863687eb04c5..60c42c94cfdd00c45041281851e1e04f74a17f2e 100644
--- a/smart-mqtt-common/pom.xml
+++ b/smart-mqtt-common/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.18
+ 0.19
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
index 69e2ddc13dedce40c282df7ebd357c15e4dca2e0..8c7d78a6158197de8d9f761f10daff318e8d2335 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
@@ -12,29 +12,21 @@ package org.smartboot.mqtt.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
import org.smartboot.mqtt.common.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
-import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
-import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
-import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.transport.AioSession;
import org.smartboot.socket.util.Attachment;
-import org.smartboot.socket.util.QuickTimerTask;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
/**
* @author 三刀(zhengjunweimail@163.com)
@@ -63,48 +55,25 @@ public abstract class AbstractSession {
private MqttVersion mqttVersion;
private InflightQueue inflightQueue;
- private final Map ackMessageCacheMap = new ConcurrentHashMap<>();
+ private final Map ackMessageCacheMap = new ConcurrentHashMap<>();
public AbstractSession(EventBus eventBus) {
this.eventBus = eventBus;
}
- public final void write(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> mqttMessage, Consumer> consumer) {
- QosMessage ackMessage = new QosMessage(mqttMessage, consumer);
- switch (mqttMessage.getFixedHeader().getQosLevel()) {
- case AT_MOST_ONCE:
- ValidateUtils.isTrue(mqttMessage instanceof MqttPubRecMessage, "invalid message instance");
- //超时移除即可,
- break;
- case AT_LEAST_ONCE:
- ValidateUtils.isTrue(mqttMessage instanceof MqttSubscribeMessage || mqttMessage instanceof MqttUnsubscribeMessage, "invalid message instance");
- //重新发送subscribe或unSubscribe消息
- QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(() -> {
- if (!ackMessage.isCommit()) {
- write(mqttMessage, consumer);
- }
- }, 1, TimeUnit.SECONDS);
- default:
- throw new UnsupportedOperationException();
- }
- ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), ackMessage);
+ public final void write(MqttPubRecMessage mqttMessage, Runnable callback) {
+ ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), callback);
write(mqttMessage, false);
}
- public final void notifyResponse(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
- if (message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) {
- inflightQueue.notify(message);
- } else {
- QosMessage qosMessage = ackMessageCacheMap.remove(message.getVariableHeader().getPacketId());
- if (qosMessage != null) {
- qosMessage.setCommit(true);
- qosMessage.getConsumer().accept(message);
- } else {
- LOGGER.info("message is null");
- }
+ public final void notifyPubComp(int packetId) {
+ Runnable consumer = ackMessageCacheMap.remove(packetId);
+ if (consumer != null) {
+ consumer.run();
}
}
+
public final synchronized void write(MqttMessage mqttMessage, boolean autoFlush) {
try {
if (disconnect) {
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
index af08078ad302f9ebe9654acbb760b1a02d248b49..992eac6edf13b580630d8f6149024099b7f18ec0 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
@@ -14,6 +14,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
+import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttVariableMessage;
@@ -49,7 +50,6 @@ public class InflightQueue {
private final AbstractSession session;
private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>();
-
public InflightQueue(AbstractSession session, int size) {
ValidateUtils.isTrue(size > 0, "inflight must >0");
this.queue = new InflightMessage[size];
@@ -95,7 +95,7 @@ public class InflightQueue {
/**
* 超时重发
*/
- void retry(InflightMessage inflightMessage) {
+ private void retry(InflightMessage inflightMessage) {
if (inflightMessage.isCommit() || session.isDisconnect()) {
return;
}
@@ -117,11 +117,13 @@ public class InflightQueue {
return;
}
inflightMessage.setLatestTime(System.currentTimeMillis());
- LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage().getFixedHeader());
+ LOGGER.info("message:{} time out,retry...", inflightMessage.getExpectMessageType());
switch (inflightMessage.getExpectMessageType()) {
case PUBACK:
case PUBREC:
- session.write(inflightMessage.getOriginalMessage());
+ MqttMessage mqttMessage = inflightMessage.getOriginalMessage();
+ mqttMessage.getFixedHeader().setDup(true);
+ session.write(mqttMessage);
break;
case PUBCOMP:
ReasonProperties properties = null;
@@ -131,6 +133,7 @@ public class InflightQueue {
MqttVariableMessage extends MqttPacketIdVariableHeader> message = inflightMessage.getOriginalMessage();
MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader);
+ pubRelMessage.getFixedHeader().setDup(true);
session.write(pubRelMessage);
break;
default:
@@ -148,19 +151,28 @@ public class InflightQueue {
*/
public void notify(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length];
- ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type");
- ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId());
- inflightMessage.setResponseMessage(message);
- inflightMessage.setLatestTime(System.currentTimeMillis());
switch (message.getFixedHeader().getMessageType()) {
case SUBACK:
case UNSUBACK:
case PUBACK:
case PUBCOMP: {
+ if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) {
+// System.out.println("maybe dup ack,ignore:" + message.getFixedHeader().getMessageType());
+ break;
+ }
+ inflightMessage.setResponseMessage(message);
+ inflightMessage.setLatestTime(System.currentTimeMillis());
commit(inflightMessage);
break;
}
case PUBREC:
+ //说明此前出现过重复publish,切已经收到过REC,并发送过REL消息
+ if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) {
+// System.out.println("maybe dup pubRec,ignore");
+ break;
+ }
+ inflightMessage.setResponseMessage(message);
+ inflightMessage.setLatestTime(System.currentTimeMillis());
inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP);
//todo
ReasonProperties properties = null;
@@ -172,7 +184,7 @@ public class InflightQueue {
session.write(pubRelMessage, false);
break;
default:
- throw new RuntimeException();
+ throw new RuntimeException(message.toString());
}
}
@@ -206,6 +218,7 @@ public class InflightQueue {
InflightMessage monitorMessage = queue[takeIndex];
attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage));
}
+
while (count < queue.length) {
Runnable runnable = runnables.poll();
if (runnable != null) {
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java
index 681477635c35701009ffde1d182dbeb3c8cf9438..cf0c3a359645c0adc3743910b1430b87689f6272 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java
@@ -36,10 +36,13 @@ public class MqttFixedHeader extends ToString {
public static final MqttFixedHeader PUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.PUBACK, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader PUB_REC_HEADER = new MqttFixedHeader(MqttMessageType.PUBREC, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader PUB_REL_HEADER = new MqttFixedHeader(MqttMessageType.PUBREL, MqttQoS.AT_LEAST_ONCE);
+ public static final MqttFixedHeader PUB_REL_HEADER_DUP = new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_LEAST_ONCE, false);
public static final MqttFixedHeader PUB_COMP_HEADER = new MqttFixedHeader(MqttMessageType.PUBCOMP, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader SUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, MqttQoS.AT_LEAST_ONCE);
+ public static final MqttFixedHeader SUBSCRIBE_HEADER_DUP = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, true, MqttQoS.AT_LEAST_ONCE, false);
public static final MqttFixedHeader SUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.SUBACK, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader UNSUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, MqttQoS.AT_LEAST_ONCE);
+ public static final MqttFixedHeader UNSUBSCRIBE_HEADER_DUP = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, true, MqttQoS.AT_LEAST_ONCE, false);
public static final MqttFixedHeader UNSUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBACK, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader PING_REQ_HEADER = new MqttFixedHeader(MqttMessageType.PINGREQ, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader PING_RESP_HEADER = new MqttFixedHeader(MqttMessageType.PINGRESP, MqttQoS.AT_MOST_ONCE);
@@ -55,7 +58,7 @@ public class MqttFixedHeader extends ToString {
/**
* 重发标志
*/
- private final boolean dup;
+ private boolean dup;
private final MqttQoS qosLevel;
/**
* 保留标志,是否存储消息
@@ -81,6 +84,10 @@ public class MqttFixedHeader extends ToString {
return dup;
}
+ public void setDup(boolean dup) {
+ this.dup = dup;
+ }
+
public MqttQoS getQosLevel() {
return qosLevel;
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java
index 7d11ade5f406849df82a70b0d982a324ec58cde0..4734a4dd06063f4b354e3e5f6eb2ef14ad5ccde6 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java
@@ -41,13 +41,13 @@ final class MqttMessageFactory {
case CONNACK:
return MqttFixedHeader.CONN_ACK_HEADER;
case SUBSCRIBE:
- return MqttFixedHeader.SUBSCRIBE_HEADER;
+ return dup ? MqttFixedHeader.SUBSCRIBE_HEADER_DUP : MqttFixedHeader.SUBSCRIBE_HEADER;
case SUBACK:
return MqttFixedHeader.SUB_ACK_HEADER;
case UNSUBACK:
return MqttFixedHeader.UNSUB_ACK_HEADER;
case UNSUBSCRIBE:
- return MqttFixedHeader.UNSUBSCRIBE_HEADER;
+ return dup ? MqttFixedHeader.UNSUBSCRIBE_HEADER_DUP : MqttFixedHeader.UNSUBSCRIBE_HEADER;
case PUBLISH:
if (dup || retain) {
return new MqttFixedHeader(messageType, dup, MqttQoS.valueOf(qosLevel), retain);
@@ -67,7 +67,7 @@ final class MqttMessageFactory {
case PUBREC:
return MqttFixedHeader.PUB_REC_HEADER;
case PUBREL:
- return MqttFixedHeader.PUB_REL_HEADER;
+ return dup ? MqttFixedHeader.PUB_REL_HEADER_DUP : MqttFixedHeader.PUB_REL_HEADER;
case PUBCOMP:
return MqttFixedHeader.PUB_COMP_HEADER;
case PINGREQ:
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
index 7f7518bac6dce9a30a3d336c1b06eb69191bdd14..e88fd486f603656268722eff1f769643bac71688 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
@@ -110,9 +110,6 @@ public final class MqttMessageBuilders {
public MqttPublishMessage build() {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained);
- if (qos != MqttQoS.AT_LEAST_ONCE && qos != MqttQoS.EXACTLY_ONCE) {
- packetId = -packetId;
- }
MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(packetId, topic, publishProperties);
return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload);
}