diff --git a/dashboard/components.d.ts b/dashboard/components.d.ts index a3e444698b9f33b33bab831bd2af74a49e5b4248..990f1edbbae2aa74f40dc1f9039e3873604a22d9 100644 --- a/dashboard/components.d.ts +++ b/dashboard/components.d.ts @@ -10,6 +10,7 @@ declare module '@vue/runtime-core' { LayAffix: typeof import('@layui/layui-vue')['LayAffix'] LayAvatar: typeof import('@layui/layui-vue')['LayAvatar'] LayBadge: typeof import('@layui/layui-vue')['LayBadge'] + LayBarcode: typeof import('@layui/layui-vue')['LayBarcode'] LayBody: typeof import('@layui/layui-vue')['LayBody'] LayBreadcrumb: typeof import('@layui/layui-vue')['LayBreadcrumb'] LayBreadcrumbItem: typeof import('@layui/layui-vue')['LayBreadcrumbItem'] @@ -39,6 +40,7 @@ declare module '@vue/runtime-core' { LayMenuItem: typeof import('@layui/layui-vue')['LayMenuItem'] LayPanel: typeof import('@layui/layui-vue')['LayPanel'] LayProgress: typeof import('@layui/layui-vue')['LayProgress'] + LayQrcode: typeof import('@layui/layui-vue')['LayQrcode'] LayResult: typeof import('@layui/layui-vue')['LayResult'] LayRow: typeof import('@layui/layui-vue')['LayRow'] LayScroll: typeof import('@layui/layui-vue')['LayScroll'] diff --git a/dashboard/vite.config.ts b/dashboard/vite.config.ts index b3fba4182710bac688202f3b7789c930f8016691..e13d05e73dad8be5a2853ac9855d23ecbc3160b8 100644 --- a/dashboard/vite.config.ts +++ b/dashboard/vite.config.ts @@ -11,7 +11,8 @@ export default defineConfig({ server:{ proxy:{ '/api': { - target: 'http://127.0.0.1:18083/api/', + // target: 'http://127.0.0.1:18083/api/', + target: 'http://82.157.162.230:8083/api/', changeOrigin: true, rewrite: path => path.replace(/^\/api/, '') } diff --git a/docker-compose.yml b/docker-compose.yml index 6e8aacf41b61a16d79e6b61f286f4e1dcce50b1d..66659d7a342b3c2ffc645be2d78fd0720c3ee5ab 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,6 @@ services: options: max-size: "100m" max-file: "1" - command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe -# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish + command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe +# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish version: '3.7' \ No newline at end of file diff --git a/pom.xml b/pom.xml index 55e604a72b255369e3828f36a5a59e6f8002ac38..8e5c86a003feedec69a977d10c0984f3e0f439ed 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.18 + 0.19 4.0.0 mqtt broker - 0.18 + 0.19 1.5.26 1.1.22 2.6 diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index 720ebb4d95e282180cf0bee95fbfd9643d3c1112..238477629a38cac025e9ec397ccc3e4eb4700982 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.18 + 0.19 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 223c0e2b75bc730bfdacb7b5ef296ad2b47f8178..f26c8480823b55771262995a55e1c11a89e671b7 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -38,7 +38,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.18"; + public static final String VERSION = "v0.19"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 773d7036172c75aadb1d68c9648d57a20b8b28a8..e0239b09012b0ea582ee3cedf0e595fd88053c10 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -17,6 +17,7 @@ import org.smartboot.mqtt.broker.processor.DisConnectProcessor; import org.smartboot.mqtt.broker.processor.MqttAckProcessor; import org.smartboot.mqtt.broker.processor.MqttProcessor; import org.smartboot.mqtt.broker.processor.PingReqProcessor; +import org.smartboot.mqtt.broker.processor.PubRelProcessor; import org.smartboot.mqtt.broker.processor.PublishProcessor; import org.smartboot.mqtt.broker.processor.SubscribeProcessor; import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; @@ -68,10 +69,11 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor()); - processorMap.put(MqttPubRelMessage.class, new MqttAckProcessor<>()); + processorMap.put(MqttPubRelMessage.class, new PubRelProcessor()); processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor<>()); processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor<>()); processorMap.put(MqttDisconnectMessage.class, new DisConnectProcessor()); +// addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512)); } public MqttBrokerMessageProcessor(BrokerContext mqttContext) { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java index 22d8ee88bde4f35f8101caec03696745623d3ed9..6196f641dce5bdf881c4e0443fc5be0674577bf1 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java @@ -21,6 +21,6 @@ import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; public class MqttAckProcessor extends AuthorizedMqttProcessor { @Override public void process0(BrokerContext context, MqttSession session, T t) { - session.notifyResponse(t); + session.getInflightQueue().notify(t); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..7b8573e20e2e02279845bf8ae259a8379b4f3a3a --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.processor; + +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.common.message.MqttPubCompMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; +import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; +import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 4/22/23 + */ +public class PubRelProcessor extends AuthorizedMqttProcessor { + @Override + public void process0(BrokerContext context, MqttSession session, MqttPubRelMessage message) { + //发送pubRel消息。 + //todo + MqttPubQosVariableHeader qosVariableHeader; + //todo + byte code = 0; + if (code != 0) { + ReasonProperties properties = new ReasonProperties(); + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); + qosVariableHeader.setReasonCode(code); + } else { + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), null); + } + MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); + session.write(pubRelMessage, false); + session.notifyPubComp(message.getVariableHeader().getPacketId()); + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index eaf5f6f1edef54c630636ebb88f5b83042e9c8a0..bdd518f14642bfe8d12fc6becbb3a59292e263ae 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -21,13 +21,10 @@ import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttReasonCode; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; -import org.smartboot.mqtt.common.util.ValidateUtils; /** * 发布Topic @@ -112,24 +109,6 @@ public class PublishProcessor extends AuthorizedMqttProcessor { - ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message"); - //发送pubRel消息。 - //todo - MqttPubQosVariableHeader qosVariableHeader; - //todo - byte code = 0; - if (code != 0) { - ReasonProperties properties = new ReasonProperties(); - qosVariableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), properties); - qosVariableHeader.setReasonCode(code); - } else { - qosVariableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), null); - } - MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); - session.write(pubRelMessage, false); - // 消息投递至消息总线 - context.getEventBus().publish(ServerEventType.RECEIVE_PUBLISH_MESSAGE, EventObject.newEventObject(session, mqttPublishMessage)); - }); + session.write(pubRecMessage, () -> context.getEventBus().publish(ServerEventType.RECEIVE_PUBLISH_MESSAGE, EventObject.newEventObject(session, mqttPublishMessage))); } } diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index 6350be7a88bbd8302f9e726ae31454c91906bd2e..1da20a7d84c1069540582c412cf581778e8b4d67 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.18 + 0.19 ../pom.xml 4.0.0 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java index cb87d9b03be49ff9522a264e072fb0ecc5adf9b6..b15876b6258c72556a0efc6d6447efed0427feee 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java @@ -16,6 +16,7 @@ import org.smartboot.mqtt.client.processor.ConnAckProcessor; import org.smartboot.mqtt.client.processor.MqttAckProcessor; import org.smartboot.mqtt.client.processor.MqttPingRespProcessor; import org.smartboot.mqtt.client.processor.MqttProcessor; +import org.smartboot.mqtt.client.processor.PubRelProcessor; import org.smartboot.mqtt.client.processor.PublishProcessor; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; @@ -50,7 +51,7 @@ public class MqttClientProcessor extends AbstractMessageProcessor { processors.put(MqttPublishMessage.class, new PublishProcessor()); processors.put(MqttPubRecMessage.class, new MqttAckProcessor()); processors.put(MqttPubCompMessage.class, new MqttAckProcessor()); - processors.put(MqttPubRelMessage.class, new MqttAckProcessor()); + processors.put(MqttPubRelMessage.class, new PubRelProcessor()); processors.put(MqttSubAckMessage.class, new MqttAckProcessor()); processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor()); } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java index cb398df7d0c9bfb099a6026ae0cf911a7b492a81..302f4fee711a48d92cb29e7f28c8279d488461a6 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java @@ -20,6 +20,6 @@ import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; public class MqttAckProcessor implements MqttProcessor { @Override public void process(MqttClient mqttClient, T message) { - mqttClient.notifyResponse(message); + mqttClient.getInflightQueue().notify(message); } } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..fd8dfb9e89470b73034925e1c08d34f8b88aaa0a --- /dev/null +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.client.processor; + +import org.smartboot.mqtt.client.MqttClient; +import org.smartboot.mqtt.common.message.MqttPubCompMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; +import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; +import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 4/22/23 + */ +public class PubRelProcessor implements MqttProcessor { + + @Override + public void process(MqttClient mqttClient, MqttPubRelMessage message) { + //发送pubRel消息。 + //todo + MqttPubQosVariableHeader qosVariableHeader; + //todo + byte code = 0; + if (code != 0) { + ReasonProperties properties = new ReasonProperties(); + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); + qosVariableHeader.setReasonCode(code); + } else { + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), null); + } + MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); + mqttClient.write(pubRelMessage, false); + mqttClient.notifyPubComp(message.getVariableHeader().getPacketId()); + } +} diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java index f4559f9cc928eef254523d3c57f83b063b8d0611..3b053b0718d41ca3ac91eee311f8014845e39654 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java @@ -18,15 +18,12 @@ import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; import org.smartboot.mqtt.common.util.MqttUtil; -import org.smartboot.mqtt.common.util.ValidateUtils; /** * 发布Topic @@ -104,19 +101,7 @@ public class PublishProcessor implements MqttProcessor { MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(messageId, properties); MqttPubRecMessage pubRecMessage = new MqttPubRecMessage(variableHeader); - session.write(pubRecMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message"); - //todo - ReasonProperties reasonProperties = null; - if (mqttPublishMessage.getVersion() == MqttVersion.MQTT_5) { - reasonProperties = new ReasonProperties(); - } - MqttPubQosVariableHeader qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), reasonProperties); - MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); - session.write(pubRelMessage, false); - - processPublishMessage(mqttPublishMessage, session); - }); + session.write(pubRecMessage, () -> processPublishMessage(mqttPublishMessage, session)); } } diff --git a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java index e29e0080ed1aaeda946fd4458f7f9afc4a88f3bd..e7a7de8c8a5ccc5a10fbc24c744340c4c3f56fab 100644 --- a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java +++ b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java @@ -31,20 +31,22 @@ public class MqttClientBootstrap { //订阅主题 client.subscribe("test", MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> { System.out.println("subscribe message:" + new String(publishMessage.getPayload().getPayload())); + }, (mqttClient, mqttQoS) -> { + //最多分发一次 + client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); + //至少分发一次 + client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); + //只分发一次 + client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); }); client.subscribe("test/#", MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> { System.out.println("subscribe test/# message:" + new String(publishMessage.getPayload().getPayload())); + }, (mqttClient, mqttQoS) -> { + //只分发一次 + client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); }); - //最多分发一次 - client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); - //至少分发一次 - client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); - //只分发一次 - client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); - //只分发一次 - client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); } } diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index c8d8bc1ca84b60ed6048a499892d863687eb04c5..60c42c94cfdd00c45041281851e1e04f74a17f2e 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.18 + 0.19 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index 69e2ddc13dedce40c282df7ebd357c15e4dca2e0..8c7d78a6158197de8d9f761f10daff318e8d2335 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -12,29 +12,21 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttSubscribeMessage; -import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; -import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.transport.AioSession; import org.smartboot.socket.util.Attachment; -import org.smartboot.socket.util.QuickTimerTask; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; /** * @author 三刀(zhengjunweimail@163.com) @@ -63,48 +55,25 @@ public abstract class AbstractSession { private MqttVersion mqttVersion; private InflightQueue inflightQueue; - private final Map ackMessageCacheMap = new ConcurrentHashMap<>(); + private final Map ackMessageCacheMap = new ConcurrentHashMap<>(); public AbstractSession(EventBus eventBus) { this.eventBus = eventBus; } - public final void write(MqttPacketIdentifierMessage mqttMessage, Consumer> consumer) { - QosMessage ackMessage = new QosMessage(mqttMessage, consumer); - switch (mqttMessage.getFixedHeader().getQosLevel()) { - case AT_MOST_ONCE: - ValidateUtils.isTrue(mqttMessage instanceof MqttPubRecMessage, "invalid message instance"); - //超时移除即可, - break; - case AT_LEAST_ONCE: - ValidateUtils.isTrue(mqttMessage instanceof MqttSubscribeMessage || mqttMessage instanceof MqttUnsubscribeMessage, "invalid message instance"); - //重新发送subscribe或unSubscribe消息 - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(() -> { - if (!ackMessage.isCommit()) { - write(mqttMessage, consumer); - } - }, 1, TimeUnit.SECONDS); - default: - throw new UnsupportedOperationException(); - } - ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), ackMessage); + public final void write(MqttPubRecMessage mqttMessage, Runnable callback) { + ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), callback); write(mqttMessage, false); } - public final void notifyResponse(MqttPacketIdentifierMessage message) { - if (message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) { - inflightQueue.notify(message); - } else { - QosMessage qosMessage = ackMessageCacheMap.remove(message.getVariableHeader().getPacketId()); - if (qosMessage != null) { - qosMessage.setCommit(true); - qosMessage.getConsumer().accept(message); - } else { - LOGGER.info("message is null"); - } + public final void notifyPubComp(int packetId) { + Runnable consumer = ackMessageCacheMap.remove(packetId); + if (consumer != null) { + consumer.run(); } } + public final synchronized void write(MqttMessage mqttMessage, boolean autoFlush) { try { if (disconnect) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index af08078ad302f9ebe9654acbb760b1a02d248b49..992eac6edf13b580630d8f6149024099b7f18ec0 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; +import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttVariableMessage; @@ -49,7 +50,6 @@ public class InflightQueue { private final AbstractSession session; private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); - public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); this.queue = new InflightMessage[size]; @@ -95,7 +95,7 @@ public class InflightQueue { /** * 超时重发 */ - void retry(InflightMessage inflightMessage) { + private void retry(InflightMessage inflightMessage) { if (inflightMessage.isCommit() || session.isDisconnect()) { return; } @@ -117,11 +117,13 @@ public class InflightQueue { return; } inflightMessage.setLatestTime(System.currentTimeMillis()); - LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage().getFixedHeader()); + LOGGER.info("message:{} time out,retry...", inflightMessage.getExpectMessageType()); switch (inflightMessage.getExpectMessageType()) { case PUBACK: case PUBREC: - session.write(inflightMessage.getOriginalMessage()); + MqttMessage mqttMessage = inflightMessage.getOriginalMessage(); + mqttMessage.getFixedHeader().setDup(true); + session.write(mqttMessage); break; case PUBCOMP: ReasonProperties properties = null; @@ -131,6 +133,7 @@ public class InflightQueue { MqttVariableMessage message = inflightMessage.getOriginalMessage(); MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); + pubRelMessage.getFixedHeader().setDup(true); session.write(pubRelMessage); break; default: @@ -148,19 +151,28 @@ public class InflightQueue { */ public void notify(MqttPacketIdentifierMessage message) { InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; - ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type"); - ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); - inflightMessage.setResponseMessage(message); - inflightMessage.setLatestTime(System.currentTimeMillis()); switch (message.getFixedHeader().getMessageType()) { case SUBACK: case UNSUBACK: case PUBACK: case PUBCOMP: { + if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) { +// System.out.println("maybe dup ack,ignore:" + message.getFixedHeader().getMessageType()); + break; + } + inflightMessage.setResponseMessage(message); + inflightMessage.setLatestTime(System.currentTimeMillis()); commit(inflightMessage); break; } case PUBREC: + //说明此前出现过重复publish,切已经收到过REC,并发送过REL消息 + if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) { +// System.out.println("maybe dup pubRec,ignore"); + break; + } + inflightMessage.setResponseMessage(message); + inflightMessage.setLatestTime(System.currentTimeMillis()); inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP); //todo ReasonProperties properties = null; @@ -172,7 +184,7 @@ public class InflightQueue { session.write(pubRelMessage, false); break; default: - throw new RuntimeException(); + throw new RuntimeException(message.toString()); } } @@ -206,6 +218,7 @@ public class InflightQueue { InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } + while (count < queue.length) { Runnable runnable = runnables.poll(); if (runnable != null) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java index 681477635c35701009ffde1d182dbeb3c8cf9438..cf0c3a359645c0adc3743910b1430b87689f6272 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java @@ -36,10 +36,13 @@ public class MqttFixedHeader extends ToString { public static final MqttFixedHeader PUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.PUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PUB_REC_HEADER = new MqttFixedHeader(MqttMessageType.PUBREC, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PUB_REL_HEADER = new MqttFixedHeader(MqttMessageType.PUBREL, MqttQoS.AT_LEAST_ONCE); + public static final MqttFixedHeader PUB_REL_HEADER_DUP = new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_LEAST_ONCE, false); public static final MqttFixedHeader PUB_COMP_HEADER = new MqttFixedHeader(MqttMessageType.PUBCOMP, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader SUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, MqttQoS.AT_LEAST_ONCE); + public static final MqttFixedHeader SUBSCRIBE_HEADER_DUP = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, true, MqttQoS.AT_LEAST_ONCE, false); public static final MqttFixedHeader SUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.SUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader UNSUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, MqttQoS.AT_LEAST_ONCE); + public static final MqttFixedHeader UNSUBSCRIBE_HEADER_DUP = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, true, MqttQoS.AT_LEAST_ONCE, false); public static final MqttFixedHeader UNSUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PING_REQ_HEADER = new MqttFixedHeader(MqttMessageType.PINGREQ, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PING_RESP_HEADER = new MqttFixedHeader(MqttMessageType.PINGRESP, MqttQoS.AT_MOST_ONCE); @@ -55,7 +58,7 @@ public class MqttFixedHeader extends ToString { /** * 重发标志 */ - private final boolean dup; + private boolean dup; private final MqttQoS qosLevel; /** * 保留标志,是否存储消息 @@ -81,6 +84,10 @@ public class MqttFixedHeader extends ToString { return dup; } + public void setDup(boolean dup) { + this.dup = dup; + } + public MqttQoS getQosLevel() { return qosLevel; } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java index 7d11ade5f406849df82a70b0d982a324ec58cde0..4734a4dd06063f4b354e3e5f6eb2ef14ad5ccde6 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java @@ -41,13 +41,13 @@ final class MqttMessageFactory { case CONNACK: return MqttFixedHeader.CONN_ACK_HEADER; case SUBSCRIBE: - return MqttFixedHeader.SUBSCRIBE_HEADER; + return dup ? MqttFixedHeader.SUBSCRIBE_HEADER_DUP : MqttFixedHeader.SUBSCRIBE_HEADER; case SUBACK: return MqttFixedHeader.SUB_ACK_HEADER; case UNSUBACK: return MqttFixedHeader.UNSUB_ACK_HEADER; case UNSUBSCRIBE: - return MqttFixedHeader.UNSUBSCRIBE_HEADER; + return dup ? MqttFixedHeader.UNSUBSCRIBE_HEADER_DUP : MqttFixedHeader.UNSUBSCRIBE_HEADER; case PUBLISH: if (dup || retain) { return new MqttFixedHeader(messageType, dup, MqttQoS.valueOf(qosLevel), retain); @@ -67,7 +67,7 @@ final class MqttMessageFactory { case PUBREC: return MqttFixedHeader.PUB_REC_HEADER; case PUBREL: - return MqttFixedHeader.PUB_REL_HEADER; + return dup ? MqttFixedHeader.PUB_REL_HEADER_DUP : MqttFixedHeader.PUB_REL_HEADER; case PUBCOMP: return MqttFixedHeader.PUB_COMP_HEADER; case PINGREQ: diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java index 7f7518bac6dce9a30a3d336c1b06eb69191bdd14..e88fd486f603656268722eff1f769643bac71688 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java @@ -110,9 +110,6 @@ public final class MqttMessageBuilders { public MqttPublishMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained); - if (qos != MqttQoS.AT_LEAST_ONCE && qos != MqttQoS.EXACTLY_ONCE) { - packetId = -packetId; - } MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(packetId, topic, publishProperties); return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload); }