From 4811a17cde0c2faa39007b7b9d6c13d2349a45f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 21 May 2023 14:19:41 +0800 Subject: [PATCH 01/22] =?UTF-8?q?feature-0.21=EF=BC=9A=E8=BF=81=E7=A7=BBme?= =?UTF-8?q?tric=E5=8A=9F=E8=83=BD=E8=87=B3=E4=BC=81=E4=B8=9A=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/pom.xml | 2 +- plugins/redis-bridge-plugin/pom.xml | 2 +- pom.xml | 6 +- smart-mqtt-broker/pom.xml | 2 +- .../mqtt/broker/BrokerConfigure.java | 2 +- .../smartboot/mqtt/broker/BrokerContext.java | 7 -- .../mqtt/broker/BrokerContextImpl.java | 85 ------------------- .../broker/MqttBrokerMessageProcessor.java | 3 - .../controller/DashBoardController.java | 76 +---------------- smart-mqtt-client/pom.xml | 2 +- smart-mqtt-common/pom.xml | 2 +- 11 files changed, 11 insertions(+), 178 deletions(-) diff --git a/plugins/pom.xml b/plugins/pom.xml index 24bee133..bee09121 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -16,7 +16,7 @@ org.smartboot.mqtt smart-mqtt - 0.20 + 0.21 ../pom.xml pom diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml index a89708bc..f4e16694 100644 --- a/plugins/redis-bridge-plugin/pom.xml +++ b/plugins/redis-bridge-plugin/pom.xml @@ -3,7 +3,7 @@ plugins org.smartboot.mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index cacb49b8..62de9293 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.20 + 0.21 4.0.0 mqtt broker - 0.20 + 0.21 1.5.27 1.1.22 2.6 @@ -66,7 +66,7 @@ com.alibaba.fastjson2 fastjson2 - 2.0.20.graal + 2.0.21.graal junit diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index e50e103d..aacc0788 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index caa7f048..7136ca35 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -38,7 +38,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.20"; + public static final String VERSION = "v0.21"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 28c8cc49..36440660 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -12,9 +12,7 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; import org.smartboot.mqtt.broker.provider.Providers; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; import org.smartboot.mqtt.common.eventbus.EventBus; -import org.smartboot.mqtt.common.to.MetricItemTO; import java.io.IOException; import java.util.Collection; @@ -86,9 +84,4 @@ public interface BrokerContext { MqttBrokerMessageProcessor getMessageProcessor(); - /** - * 运行指标 - */ - MetricItemTO metric(MqttMetricEnum metricEnum); - } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 8b61bb16..f31d92df 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -28,28 +28,20 @@ import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.eventbus.EventBusImpl; import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; import org.smartboot.mqtt.common.eventbus.EventType; -import org.smartboot.mqtt.common.message.MqttConnAckMessage; -import org.smartboot.mqtt.common.message.MqttConnectMessage; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; -import org.smartboot.mqtt.common.to.MetricItemTO; import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; -import org.smartboot.socket.extension.plugins.AbstractPlugin; import org.smartboot.socket.transport.AioQuickServer; -import org.smartboot.socket.transport.AioSession; import org.yaml.snakeyaml.Yaml; import java.io.IOException; @@ -62,9 +54,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; import java.util.concurrent.ArrayBlockingQueue; @@ -118,10 +108,6 @@ public class BrokerContextImpl implements BrokerContext { private String configJson; private final static BrokerTopic SHUTDOWN_TOPIC = new BrokerTopic(""); - /** - * 统计指标 - */ - private final Map metricMap = new HashMap<>(); private AsynchronousChannelGroup asynchronousChannelGroup; @Override @@ -135,8 +121,6 @@ public class BrokerContextImpl implements BrokerContext { initPushThread(); - initMetric(); - loadAndInstallPlugins(); @@ -182,69 +166,6 @@ public class BrokerContextImpl implements BrokerContext { configJson = null; } - private void initMetric() { - for (MqttMetricEnum metricEnum : MqttMetricEnum.values()) { - metricMap.put(metricEnum, new MetricItemTO(metricEnum)); - } - - processor.addPlugin(new AbstractPlugin() { - @Override - public void afterRead(AioSession session, int readSize) { - if (readSize > 0) { - metricMap.get(MqttMetricEnum.BYTES_RECEIVED).getMetric().add(readSize); - } - } - - @Override - public void afterWrite(AioSession session, int writeSize) { - if (writeSize > 0) { - metricMap.get(MqttMetricEnum.BYTES_SENT).getMetric().add(writeSize); - } - } - }); - eventBus.subscribe(ServerEventType.CONNECT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_CONNECT).getMetric().increment()); - eventBus.subscribe(ServerEventType.DISCONNECT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_DISCONNECT).getMetric().increment()); - eventBus.subscribe(ServerEventType.SUBSCRIBE_ACCEPT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_SUBSCRIBE).getMetric().increment()); - eventBus.subscribe(ServerEventType.UNSUBSCRIBE_ACCEPT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_UNSUBSCRIBE).getMetric().increment()); - eventBus.subscribe(EventType.RECEIVE_MESSAGE, (eventType, object) -> { - metricMap.get(MqttMetricEnum.PACKETS_RECEIVED).getMetric().increment(); - if (object.getObject() instanceof MqttConnectMessage) { - metricMap.get(MqttMetricEnum.PACKETS_CONNECT_RECEIVED).getMetric().increment(); - } - }); - eventBus.subscribe(EventType.WRITE_MESSAGE, (eventType, object) -> { - metricMap.get(MqttMetricEnum.PACKETS_SENT).getMetric().increment(); - if (object.getObject() instanceof MqttConnAckMessage) { - metricMap.get(MqttMetricEnum.PACKETS_CONNACK_SENT).getMetric().increment(); - } else if (object.getObject() instanceof MqttPublishMessage) { - switch (object.getObject().getFixedHeader().getQosLevel()) { - case AT_MOST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS0_SENT).getMetric().increment(); - break; - case AT_LEAST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS1_SENT).getMetric().increment(); - break; - case EXACTLY_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS2_SENT).getMetric().increment(); - break; - } - } - }); - messageBusSubscriber.consumer((brokerContext1, publishMessage) -> { - switch (publishMessage.getFixedHeader().getQosLevel()) { - case AT_MOST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS0_RECEIVED).getMetric().increment(); - break; - case AT_LEAST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS1_RECEIVED).getMetric().increment(); - break; - case EXACTLY_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS2_RECEIVED).getMetric().increment(); - break; - } - - }); - } private final TopicSubscriber BREAK = new TopicSubscriber(null, null, null, 0, 0); @@ -480,7 +401,6 @@ public class BrokerContextImpl implements BrokerContext { ValidateUtils.isTrue(!MqttUtil.containsTopicWildcards(topicName), "invalid topicName: " + topicName); BrokerTopic newTopic = new BrokerTopic(topicName); eventBus.publish(ServerEventType.TOPIC_CREATE, newTopic); - metric(MqttMetricEnum.TOPIC_COUNT).getMetric().increment(); return newTopic; }); } @@ -572,11 +492,6 @@ public class BrokerContextImpl implements BrokerContext { return processor; } - @Override - public MetricItemTO metric(MqttMetricEnum metricEnum) { - return metricMap.get(metricEnum); - } - @Override public void destroy() { LOGGER.info("destroy broker..."); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index d781f8b1..d2191db4 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -22,7 +22,6 @@ import org.smartboot.mqtt.broker.processor.PublishProcessor; import org.smartboot.mqtt.broker.processor.SubscribeProcessor; import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.common.DefaultMqttWriter; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.exception.MqttException; @@ -100,13 +99,11 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor overview() { - MetricTO metricTO = new MetricTO(); - Collection sessions = brokerContext.getSessions(); - Date date = new Date(); - //在线客户端数量 - MetricItemTO online = brokerContext.metric(MqttMetricEnum.CLIENT_ONLINE); - online.setTime(date); - metricTO.getMetric().put(MqttMetricEnum.CLIENT_ONLINE.getCode(), online); - - //主题数 - MetricItemTO topicCount = brokerContext.metric(MqttMetricEnum.TOPIC_COUNT); - topicCount.setTime(date); - metricTO.getMetric().put(MqttMetricEnum.TOPIC_COUNT.getCode(), topicCount); - - int subCount = 0; - for (MqttSession session : sessions) { - subCount += session.getSubscribers().size(); - } - MetricItemTO subscribeTopicCount = new MetricItemTO(); - subscribeTopicCount.setCode("subscribe_topic_count"); - subscribeTopicCount.setValue(subCount); - metricTO.getMetric().put(subscribeTopicCount.getCode(), subscribeTopicCount); - - return RestResult.ok(metricTO); + return RestResult.fail(OpenApi.MESSAGE_UPGRADE); } @RequestMapping(OpenApi.DASHBOARD_NODES) @@ -118,51 +90,7 @@ public class DashBoardController { */ @RequestMapping(OpenApi.DASHBOARD_METRICS) public RestResult metrics() { - MetricTO metricTO = new MetricTO(); - //连接 - List connectionGroup = new ArrayList<>(); - metricTO.getGroup().put("connection", connectionGroup); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_CONNECT)); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_DISCONNECT)); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_SUBSCRIBE)); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_UNSUBSCRIBE)); - - //会话 - List sessionGroup = new ArrayList<>(); - metricTO.getGroup().put("session", sessionGroup); - //认证与权限 - List accessGroup = new ArrayList<>(); - metricTO.getGroup().put("access", accessGroup); - - //流量收发 - List bytesGroup = new ArrayList<>(); - metricTO.getGroup().put("bytes", bytesGroup); - - //报文 - List packetGroup = new ArrayList<>(); - metricTO.getGroup().put("packet", packetGroup); - packetGroup.add(brokerContext.metric(MqttMetricEnum.BYTES_RECEIVED)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.BYTES_SENT)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_RECEIVED)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_SENT)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_CONNECT_RECEIVED)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_CONNACK_SENT)); - - - //消息数量 - List messageGroup = new ArrayList<>(); - metricTO.getGroup().put("message", messageGroup); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS0_RECEIVED)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS1_RECEIVED)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS2_RECEIVED)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS0_SENT)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS1_SENT)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS2_SENT)); - - //消息分发 - List deliveryGroup = new ArrayList<>(); - metricTO.getGroup().put("delivery", deliveryGroup); - return RestResult.ok(metricTO); + return RestResult.fail(OpenApi.MESSAGE_UPGRADE); } } diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index b93a632b..7d2e80c4 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 806073be..2a2c23d2 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 -- Gitee From fdaaddf494e60e4b5087c463e7be2c4cb10bd7cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 21 May 2023 16:06:28 +0800 Subject: [PATCH 02/22] =?UTF-8?q?feature-0.21=EF=BC=9A=E8=BF=81=E7=A7=BBme?= =?UTF-8?q?tric=E5=8A=9F=E8=83=BD=E8=87=B3=E4=BC=81=E4=B8=9A=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/common/to/MetricItemTO.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java index 9b031cfb..fa4c437f 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java @@ -36,6 +36,11 @@ public class MetricItemTO { @JSONField(serialize = false) private final LongAdder metric = new LongAdder(); + /** + * 上一次数据 + */ + private int latestValue; + /** * 采集周期,单位:秒,非正整数表示禁用周期统计 */ @@ -101,4 +106,12 @@ public class MetricItemTO { public void setTime(Date time) { this.time = time; } + + public int getLatestValue() { + return latestValue; + } + + public void setLatestValue(int latestValue) { + this.latestValue = latestValue; + } } -- Gitee From f5dcbf8a284e655e6f65a41a1eed850f2d9b3c69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 21 May 2023 16:13:10 +0800 Subject: [PATCH 03/22] =?UTF-8?q?feature-0.21=EF=BC=9A=E8=BF=81=E7=A7=BBme?= =?UTF-8?q?tric=E5=8A=9F=E8=83=BD=E8=87=B3=E4=BC=81=E4=B8=9A=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/DashBoardController.java | 5 +- .../mqtt/common/to/MetricItemTO.java | 117 ------------------ .../smartboot/mqtt/common/to/MetricTO.java | 40 ------ 3 files changed, 2 insertions(+), 160 deletions(-) delete mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java delete mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java index e413bf61..447b3bc9 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java @@ -23,7 +23,6 @@ import org.smartboot.mqtt.broker.BrokerRuntime; import org.smartboot.mqtt.broker.openapi.OpenApi; import org.smartboot.mqtt.broker.openapi.enums.BrokerStatueEnum; import org.smartboot.mqtt.broker.openapi.to.BrokerNodeTO; -import org.smartboot.mqtt.common.to.MetricTO; import java.lang.management.ManagementFactory; import java.util.Arrays; @@ -46,7 +45,7 @@ public class DashBoardController { } @RequestMapping(OpenApi.DASHBOARD_OVERVIEW) - public RestResult overview() { + public RestResult overview() { return RestResult.fail(OpenApi.MESSAGE_UPGRADE); } @@ -89,7 +88,7 @@ public class DashBoardController { * @return */ @RequestMapping(OpenApi.DASHBOARD_METRICS) - public RestResult metrics() { + public RestResult metrics() { return RestResult.fail(OpenApi.MESSAGE_UPGRADE); } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java deleted file mode 100644 index fa4c437f..00000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.common.to; - -import com.alibaba.fastjson2.annotation.JSONField; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; - -import java.util.Date; -import java.util.concurrent.atomic.LongAdder; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2023/1/26 - */ -public class MetricItemTO { - /** - * 指标编码 - */ - private String code; - /** - * 指标描述 - */ - private String desc; - - /** - * 指标数据 - */ - @JSONField(serialize = false) - private final LongAdder metric = new LongAdder(); - - /** - * 上一次数据 - */ - private int latestValue; - - /** - * 采集周期,单位:秒,非正整数表示禁用周期统计 - */ - private final int period; - /** - * 未启用周期采集改值为null - */ - @JSONField(format = "yyyy-MM-dd HH:mm:ss") - private Date time; - - - public MetricItemTO() { - this.period = 0; - } - - public MetricItemTO(MqttMetricEnum metricEnum) { - this(metricEnum, 0); - } - - public MetricItemTO(MqttMetricEnum metricEnum, int period) { - this.code = metricEnum.getCode(); - this.desc = metricEnum.getDesc(); - this.period = period; - } - - public String getCode() { - return code; - } - - public void setCode(String code) { - this.code = code; - } - - public String getDesc() { - return desc; - } - - public void setDesc(String desc) { - this.desc = desc; - } - - public int getValue() { - return metric.intValue(); - } - - public void setValue(int value) { - metric.reset(); - metric.add(value); - } - - public LongAdder getMetric() { - return metric; - } - - public int getPeriod() { - return period; - } - - public Date getTime() { - return time; - } - - public void setTime(Date time) { - this.time = time; - } - - public int getLatestValue() { - return latestValue; - } - - public void setLatestValue(int latestValue) { - this.latestValue = latestValue; - } -} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java deleted file mode 100644 index 17b44d05..00000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.common.to; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2023/1/22 - */ -public class MetricTO { - /** - * 指标项 - */ - private final Map metric = new HashMap<>(); - - /** - * 指标分组 - */ - private final Map> group = new HashMap<>(); - - - public Map> getGroup() { - return group; - } - - public Map getMetric() { - return metric; - } -} -- Gitee From e0fa8cde57fa44c8b8aa51863091cba260ac9169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 21 May 2023 16:15:04 +0800 Subject: [PATCH 04/22] =?UTF-8?q?feature-0.21=EF=BC=9A=E8=BF=81=E7=A7=BBme?= =?UTF-8?q?tric=E5=8A=9F=E8=83=BD=E8=87=B3=E4=BC=81=E4=B8=9A=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/common/enums/MetricTypeEnum.java | 26 ------- .../mqtt/common/enums/MqttMetricEnum.java | 75 ------------------- 2 files changed, 101 deletions(-) delete mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java delete mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java deleted file mode 100644 index 0553ab12..00000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.common.enums; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2023/2/19 - */ -public enum MetricTypeEnum { - /** - * 基础指标:指表达业务实体原子量化属性的且不可再分的概念集合 - */ - BASIC, - /** - * 复合指标:指建立在基础指标之上,通过一定运算规则形成的计算指标集合 - */ - COMPOSITE -} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java deleted file mode 100644 index b9609df0..00000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.common.enums; - -/** - * @author 三刀(zhengjunweimail@163_com) - * @version V1_0 , 2023/1/26 - */ -public enum MqttMetricEnum { - CLIENT_ONLINE("client_online", "客户端在线数"), - CLIENT_CONNECT("client_connect", "客户端连接次数"), - CLIENT_DISCONNECT("client_disconnected", "客户端断开连接次数"), - CLIENT_SUBSCRIBE("client_subscribe", "订阅次数"), - CLIENT_UNSUBSCRIBE("client_unsubscribe", "取消订阅次数"), - BYTES_RECEIVED("bytes_received", "已接收字节数"), - BYTES_SENT("bytes_sent", "已发送字节数"), - - PACKETS_CONNECT_RECEIVED("packets_connect_received", "接收的 CONNECT 报文数量"), - PACKETS_CONNACK_SENT("packets_connack_sent", "发送的 CONNACK 报文数量"), - - PACKETS_PUBLISH_RECEIVED("packets_publish_received", "接收的 PUBLISH 报文数量"), - PACKETS_PUBLISH_SENT("packets_publish_sent", "发送的 PUBLISH 报文数量"), - - PACKETS_RECEIVED("packets_received", "接收的报文数量"), - PACKETS_SENT("packets_sent", "发送的报文数量"), - - - TOPIC_COUNT("topic_count", "Topic数量"), - - MESSAGE_QOS0_RECEIVED("messages_qos0_received", "接收来自客户端的 QoS 0 消息数量"), - MESSAGE_QOS1_RECEIVED("messages_qos1_received", "接收来自客户端的 QoS 1 消息数量"), - MESSAGE_QOS2_RECEIVED("messages_qos2_received", "接收来自客户端的 QoS 2 消息数量"), - MESSAGE_QOS0_SENT("messages_qos0_sent", "发送给客户端的 QoS 0 消息数量"), - MESSAGE_QOS1_SENT("messages_qos1_sent", "发送给客户端的 QoS 1 消息数量"), - MESSAGE_QOS2_SENT("messages_qos2_sent", "发送给客户端的 QoS 2 消息数量"), - - PERIOD_MESSAGE_RECEIVED("period_message_received", "周期内接收消息数", MetricTypeEnum.COMPOSITE), - - PERIOD_MESSAGE_SENT("period_message_sent", "周期内发送消息数", MetricTypeEnum.COMPOSITE); - - private final String code; - private final String desc; - - private final MetricTypeEnum type; - - MqttMetricEnum(String code, String desc, MetricTypeEnum type) { - this.code = code; - this.desc = desc; - this.type = type; - } - - MqttMetricEnum(String code, String desc) { - this(code, desc, MetricTypeEnum.BASIC); - } - - public String getCode() { - return code; - } - - public String getDesc() { - return desc; - } - - public MetricTypeEnum getType() { - return type; - } -} -- Gitee From 0ce05035c966571d33cfd82cf9e39b35627600ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 27 May 2023 10:43:24 +0800 Subject: [PATCH 05/22] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/BrokerConfigure.java | 15 ++++++ .../smartboot/mqtt/broker/BrokerContext.java | 5 +- .../mqtt/broker/BrokerContextImpl.java | 49 +++++++++++++++++-- .../broker/MqttBrokerMessageProcessor.java | 39 +-------------- 4 files changed, 66 insertions(+), 42 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 7136ca35..408aa30e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -11,8 +11,12 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.common.ToString; +import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.socket.extension.plugins.Plugin; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; /** @@ -100,6 +104,8 @@ public class BrokerConfigure extends ToString { */ private int maxInflight = 8; + private final List> plugins = new LinkedList<>(); + public int getPort() { return port; } @@ -188,6 +194,15 @@ public class BrokerConfigure extends ToString { this.name = name; } + public BrokerConfigure addPlugin(Plugin plugin) { + plugins.add(plugin); + return this; + } + + public List> getPlugins() { + return plugins; + } + @Override public String toString() { return "BrokerConfigure{" + diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 36440660..208cf789 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -11,11 +11,14 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; +import org.smartboot.mqtt.broker.processor.MqttProcessor; import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.common.eventbus.EventBus; +import org.smartboot.mqtt.common.message.MqttMessage; import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; /** @@ -82,6 +85,6 @@ public interface BrokerContext { */ T parseConfig(String path, Class clazz); - MqttBrokerMessageProcessor getMessageProcessor(); + Map, MqttProcessor> getMessageProcessors(); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index f31d92df..22bad722 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -13,6 +13,7 @@ package org.smartboot.mqtt.broker; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONPath; import com.alibaba.fastjson2.JSONReader; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +24,15 @@ import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer; import org.smartboot.mqtt.broker.plugin.Plugin; +import org.smartboot.mqtt.broker.processor.ConnectProcessor; +import org.smartboot.mqtt.broker.processor.DisConnectProcessor; +import org.smartboot.mqtt.broker.processor.MqttAckProcessor; +import org.smartboot.mqtt.broker.processor.MqttProcessor; +import org.smartboot.mqtt.broker.processor.PingReqProcessor; +import org.smartboot.mqtt.broker.processor.PubRelProcessor; +import org.smartboot.mqtt.broker.processor.PublishProcessor; +import org.smartboot.mqtt.broker.processor.SubscribeProcessor; +import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.AsyncTask; @@ -34,6 +44,17 @@ import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.eventbus.EventBusImpl; import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; import org.smartboot.mqtt.common.eventbus.EventType; +import org.smartboot.mqtt.common.message.MqttConnectMessage; +import org.smartboot.mqtt.common.message.MqttDisconnectMessage; +import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.message.MqttPingReqMessage; +import org.smartboot.mqtt.common.message.MqttPubAckMessage; +import org.smartboot.mqtt.common.message.MqttPubCompMessage; +import org.smartboot.mqtt.common.message.MqttPubRecMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; +import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.MqttSubscribeMessage; +import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.MqttMessageBuilders; @@ -54,7 +75,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; import java.util.concurrent.ArrayBlockingQueue; @@ -110,6 +133,23 @@ public class BrokerContextImpl implements BrokerContext { private AsynchronousChannelGroup asynchronousChannelGroup; + private final Map, MqttProcessor> processors; + + { + Map, MqttProcessor> mqttProcessors = new HashMap<>(); + mqttProcessors.put(MqttPingReqMessage.class, new PingReqProcessor()); + mqttProcessors.put(MqttConnectMessage.class, new ConnectProcessor()); + mqttProcessors.put(MqttPublishMessage.class, new PublishProcessor()); + mqttProcessors.put(MqttSubscribeMessage.class, new SubscribeProcessor()); + mqttProcessors.put(MqttUnsubscribeMessage.class, new UnSubscribeProcessor()); + mqttProcessors.put(MqttPubAckMessage.class, new MqttAckProcessor<>()); + mqttProcessors.put(MqttPubRelMessage.class, new PubRelProcessor()); + mqttProcessors.put(MqttPubRecMessage.class, new MqttAckProcessor<>()); + mqttProcessors.put(MqttPubCompMessage.class, new MqttAckProcessor<>()); + mqttProcessors.put(MqttDisconnectMessage.class, new DisConnectProcessor()); + processors = MapUtils.unmodifiableMap(mqttProcessors); + } + @Override public void init() throws IOException { @@ -134,7 +174,8 @@ public class BrokerContextImpl implements BrokerContext { } }); pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); - processor.addPlugin(new QosRetryPlugin()); + brokerConfigure.addPlugin(new QosRetryPlugin()); + brokerConfigure.getPlugins().forEach(processor::addPlugin); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); server.start(asynchronousChannelGroup); @@ -487,11 +528,13 @@ public class BrokerContextImpl implements BrokerContext { } } + @Override - public MqttBrokerMessageProcessor getMessageProcessor() { - return processor; + public Map, MqttProcessor> getMessageProcessors() { + return processors; } + @Override public void destroy() { LOGGER.info("destroy broker..."); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index d2191db4..2c77386d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -12,36 +12,17 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.processor.ConnectProcessor; -import org.smartboot.mqtt.broker.processor.DisConnectProcessor; -import org.smartboot.mqtt.broker.processor.MqttAckProcessor; import org.smartboot.mqtt.broker.processor.MqttProcessor; -import org.smartboot.mqtt.broker.processor.PingReqProcessor; -import org.smartboot.mqtt.broker.processor.PubRelProcessor; -import org.smartboot.mqtt.broker.processor.PublishProcessor; -import org.smartboot.mqtt.broker.processor.SubscribeProcessor; -import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.common.DefaultMqttWriter; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.exception.MqttException; -import org.smartboot.mqtt.common.message.MqttConnectMessage; -import org.smartboot.mqtt.common.message.MqttDisconnectMessage; import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPingReqMessage; -import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; -import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; -import org.smartboot.mqtt.common.message.MqttSubscribeMessage; -import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; import org.smartboot.socket.util.Attachment; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -59,21 +40,7 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor onlineSessions = new ConcurrentHashMap<>(); - private final Map, MqttProcessor> processorMap = new HashMap<>(); - { - processorMap.put(MqttPingReqMessage.class, new PingReqProcessor()); - processorMap.put(MqttConnectMessage.class, new ConnectProcessor()); - processorMap.put(MqttPublishMessage.class, new PublishProcessor()); - processorMap.put(MqttSubscribeMessage.class, new SubscribeProcessor()); - processorMap.put(MqttUnsubscribeMessage.class, new UnSubscribeProcessor()); - processorMap.put(MqttPubAckMessage.class, new MqttAckProcessor<>()); - processorMap.put(MqttPubRelMessage.class, new PubRelProcessor()); - processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor<>()); - processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor<>()); - processorMap.put(MqttDisconnectMessage.class, new DisConnectProcessor()); -// addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512)); - } public MqttBrokerMessageProcessor(BrokerContext mqttContext) { this.mqttContext = mqttContext; @@ -81,7 +48,7 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor getOnlineSessions() { - return onlineSessions; - } - } -- Gitee From 59c7696027d1c09a2965960cf9a99e89d97f203e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 27 May 2023 11:58:20 +0800 Subject: [PATCH 06/22] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- smart-mqtt-client/pom.xml | 5 +++ .../org/smartboot/mqtt/client/MqttClient.java | 8 +++- .../org/smartboot/mqtt/client/Benchmark.java | 44 +++++++++++++++++-- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index 7d2e80c4..aa4dfbf9 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -27,5 +27,10 @@ junit test + + org.smartboot.mqtt + smart-mqtt-broker + test + \ No newline at end of file diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index be39b3cf..0e6710ab 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -343,7 +343,7 @@ public class MqttClient extends AbstractSession { subscribeBuilder.subscribeProperties(new SubscribeProperties()); } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - getInflightQueue().offer(subscribeBuilder, (message) -> { + InflightMessage inflightMessage = getInflightQueue().offer(subscribeBuilder, (message) -> { List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); int i = 0; @@ -364,7 +364,11 @@ public class MqttClient extends AbstractSession { } consumeTask(); }); - flush(); + if (inflightMessage == null) { + registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); + } else { + flush(); + } } public void notifyResponse(MqttConnAckMessage connAckMessage) { diff --git a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java index 9b294308..bfa32625 100644 --- a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java +++ b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java @@ -1,8 +1,13 @@ package org.smartboot.mqtt.client; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.BrokerContextImpl; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.util.MqttUtil; @@ -10,6 +15,7 @@ import java.io.IOException; import java.nio.channels.AsynchronousChannelGroup; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -17,18 +23,24 @@ import java.util.concurrent.atomic.AtomicLong; * @version V1.0 , 2022/4/18 */ public class Benchmark { + private static final Logger LOGGER = LoggerFactory.getLogger(Benchmark.class); private final String host = "127.0.0.1"; private final int port = 1883; private AsynchronousChannelGroup channelGroup; + private BrokerContext context; + @Before public void init() throws IOException { channelGroup = AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r)); + context = new BrokerContextImpl(); + context.init(); } @After public void destroy() { channelGroup.shutdown(); + context.destroy(); } /** @@ -81,12 +93,38 @@ public class Benchmark { System.out.println((System.currentTimeMillis() - s) + ":" + total.get()); } + @Test(timeout = 3000) + public void testSubscribe2() throws InterruptedException { + MqttClient client = new MqttClient(host, port, MqttUtil.createClientId()); + client.connect(channelGroup); + + int connectCount = 10000; + AtomicLong total = new AtomicLong(0); + CountDownLatch countDownLatch = new CountDownLatch(connectCount); + long s = System.currentTimeMillis(); + + while (connectCount-- > 0) { + long start = System.currentTimeMillis(); + String topic = "/topic/" + connectCount; + client.subscribe(topic, MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> { + }, (mqttClient, mqttQoS) -> { +// LOGGER.info("subscribe:{}", "/topic/" + topic); + total.addAndGet(System.currentTimeMillis() - start); + countDownLatch.countDown(); + }); + + } + countDownLatch.await(); + client.disconnect(); + System.out.println((System.currentTimeMillis() - s) + ":" + total.get()); + } + /** * 订阅topic成功后断开连接 * * @throws InterruptedException */ - @Test + @Test(timeout = 5000) public void testPublish() throws InterruptedException { int connectCount = 100; int publishCount = Short.MAX_VALUE; @@ -117,7 +155,7 @@ public class Benchmark { client.publish(topic, MqttQoS.AT_MOST_ONCE, payload, false); } System.out.println("publish finish!"); - publishDownLatch.await(); - System.out.println("publish use time: " + (System.currentTimeMillis() - startPublish)); + Assert.assertTrue("wait result timeout", publishDownLatch.await(5, TimeUnit.SECONDS)); + System.out.println("publish use time: " + (System.currentTimeMillis() - startPublish) + " count:" + publishDownLatch.getCount()); } } -- Gitee From 8f195bee46ebe5798feca374a36b551c594e5efd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 27 May 2023 12:06:03 +0800 Subject: [PATCH 07/22] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/MqttBrokerMessageProcessor.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 2c77386d..fc065a74 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -21,11 +21,9 @@ import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * @author 三刀 * @version V1.0 , 2018/4/24 @@ -36,10 +34,8 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor onlineSessions = new ConcurrentHashMap<>(); + + private final static AttachKey SESSION_KEY = AttachKey.valueOf("mqttSession"); public MqttBrokerMessageProcessor(BrokerContext mqttContext) { @@ -50,7 +46,8 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor Date: Sat, 27 May 2023 16:28:47 +0800 Subject: [PATCH 08/22] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../broker/MqttBrokerMessageProcessor.java | 3 +- .../smartboot/mqtt/common/InflightQueue.java | 3 +- .../mqtt/common/protocol/MqttProtocol.java | 5 +-- .../mqtt/common/util/MqttAttachKey.java | 34 +++++++++++++++++++ 5 files changed, 42 insertions(+), 5 deletions(-) create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java diff --git a/pom.xml b/pom.xml index 62de9293..6e52628a 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ 0.21 - 1.5.27 + 1.5.29 1.1.22 2.6 4.3 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index fc065a74..8bfae4a5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -18,6 +18,7 @@ import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; @@ -35,7 +36,7 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor SESSION_KEY = AttachKey.valueOf("mqttSession"); + private final static AttachKey SESSION_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_SESSION); public MqttBrokerMessageProcessor(BrokerContext mqttContext) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index efb4f189..0bb45b5c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -22,6 +22,7 @@ import org.smartboot.mqtt.common.message.MqttVariableMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.util.AttachKey; @@ -39,7 +40,7 @@ import java.util.function.Consumer; */ public class InflightQueue { private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class); - static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf("retryTask"); + static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.RETRY_TASK); private static final int TIMEOUT = 3; private final InflightMessage[] queue; private int takeIndex; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java index 47769341..07cffa57 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java @@ -16,6 +16,7 @@ import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttFixedHeader; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.Protocol; import org.smartboot.socket.transport.AioSession; @@ -36,8 +37,8 @@ public class MqttProtocol implements Protocol { private static final Logger logger = LoggerFactory.getLogger(MqttProtocol.class); private final int maxBytesInMessage; - public static final AttachKey MQTT_VERSION_ATTACH_KEY = AttachKey.valueOf("mqtt_version"); - private static final AttachKey DECODE_UNIT_ATTACH_KEY = AttachKey.valueOf("decodeUnit"); + public static final AttachKey MQTT_VERSION_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_VERSION); + private static final AttachKey DECODE_UNIT_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.DECODE_UNIT); public MqttProtocol(int maxBytesInMessage) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java new file mode 100644 index 00000000..68142ccd --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.common.util; + +import org.smartboot.socket.util.AttachKey; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 5/27/23 + */ +public class MqttAttachKey { + public static final String MQTT_VERSION = "mqtt_version"; + + public static final String DECODE_UNIT = "decode_unit"; + public static final String RETRY_TASK = "retry_task"; + public static final String MQTT_SESSION = "mqtt_session"; + + static { + AttachKey.reset(); + AttachKey.valueOf(MQTT_SESSION); + AttachKey.valueOf(MQTT_VERSION); + AttachKey.valueOf(DECODE_UNIT); + AttachKey.valueOf(RETRY_TASK); + AttachKey.reset(); + } +} -- Gitee From 9101a58ea9dcfcc75bc9bf6527cc1d3e92e66fd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 27 May 2023 18:17:45 +0800 Subject: [PATCH 09/22] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/client/MqttClient.java | 19 ++++--------------- .../smartboot/mqtt/common/InflightQueue.java | 14 ++++++++++++-- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index 0e6710ab..4e4682d1 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -432,25 +432,14 @@ public class MqttClient extends AbstractSession { InflightQueue inflightQueue = getInflightQueue(); InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> { consumer.accept(message.getVariableHeader().getPacketId()); - //最早发送的消息若收到响应,则更新点位 - synchronized (MqttClient.this) { - MqttClient.this.notifyAll(); - } + consumeTask(); }); if (inflightMessage == null) { - try { - synchronized (this) { - wait(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - publish(publishBuilder, consumer, autoFlush); - return; - } - if (autoFlush) { + registeredTasks.offer(() -> publish(publishBuilder, consumer, autoFlush)); + } else if (autoFlush) { flush(); } + } public MqttClientConfigure getClientConfigure() { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 0bb45b5c..db593fc8 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -18,6 +18,7 @@ import org.smartboot.mqtt.common.message.MqttFixedHeader; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttVariableMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; @@ -101,9 +102,9 @@ public class InflightQueue { if (inflightMessage.isCommit() || session.isDisconnect()) { return; } - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() { + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() { @Override - public void run() { + public void execute() { if (inflightMessage.isCommit()) { // System.out.println("message has commit,ignore retry monitor"); return; @@ -138,6 +139,11 @@ public class InflightQueue { MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER_DUP, variableHeader); session.write(pubRelMessage); break; + case SUBACK: + MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) inflightMessage.getOriginalMessage(); + MqttSubscribeMessage dupSubscribeMessage = new MqttSubscribeMessage(MqttFixedHeader.SUBSCRIBE_HEADER_DUP, subscribeMessage.getVariableHeader(), subscribeMessage.getPayload()); + session.write(dupSubscribeMessage); + break; default: throw new UnsupportedOperationException("invalid message type: " + inflightMessage.getExpectMessageType()); } @@ -153,6 +159,10 @@ public class InflightQueue { */ public void notify(MqttPacketIdentifierMessage message) { InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; + if (inflightMessage == null && message.getFixedHeader().isDup()) { + LOGGER.info("ignore duplicate message"); + return; + } switch (message.getFixedHeader().getMessageType()) { case SUBACK: case UNSUBACK: -- Gitee From 54ce3fa9bfdf2367a5586b9bf528e686666635fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 28 May 2023 14:52:53 +0800 Subject: [PATCH 10/22] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/MqttSession.java | 58 +++++++++---------- .../mqtt/broker/TopicFilterSubscriber.java | 8 +-- .../org/smartboot/mqtt/client/MqttClient.java | 2 +- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 713295a4..6f892f1f 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -66,6 +66,29 @@ public class MqttSession extends AbstractSession { this.session = session; this.mqttWriter = mqttWriter; mqttContext.getEventBus().publish(ServerEventType.SESSION_CREATE, this); + mqttContext.getEventBus().subscribe(ServerEventType.TOPIC_CREATE, new EventBusSubscriber() { + @Override + public boolean enable() { + return !disconnect; + } + + @Override + public void subscribe(EventType eventType, BrokerTopic object) { + if (!mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) { + return; + } + for (TopicFilterSubscriber topicFilterSubscriber : subscribers.values()) { + if (subscribers.containsKey(object.getTopic())) { + break; + } + if (topicFilterSubscriber.getTopicFilterToken().isWildcards() && MqttUtil.match(object.getTopicToken(), topicFilterSubscriber.getTopicFilterToken())) { + TopicSubscriber subscription = subscribeSuccess(topicFilterSubscriber.getMqttQoS(), topicFilterSubscriber.getTopicFilterToken(), object); + mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); + } + } + + } + }); } public ConnectProperties getProperties() { @@ -166,31 +189,6 @@ public class MqttSession extends AbstractSession { } } } - - //通配符匹配增量Topic - if (!subscribers.containsKey(topicFilter)) { - subscribers.put(topicFilter, new TopicFilterSubscriber(topicToken, mqttQoS)); - } - if (newSubscribe) { - mqttContext.getEventBus().subscribe(ServerEventType.TOPIC_CREATE, new EventBusSubscriber() { - @Override - public boolean enable() { - boolean enable = !disconnect && subscribers.containsKey(topicFilter); - if (!enable) { - LOGGER.info("current event is disable,quit topic:{} monitor", topicFilter); - } - return enable; - } - - @Override - public void subscribe(EventType eventType, BrokerTopic object) { - if (MqttUtil.match(object.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) { - TopicSubscriber subscription = MqttSession.this.subscribeSuccess(mqttQoS, topicToken, object); - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - } - } - }); - } } /** @@ -224,13 +222,9 @@ public class MqttSession extends AbstractSession { } }); - TopicFilterSubscriber topicFilterSubscriber = subscribers.get(subscription.getTopicFilterToken().getTopicFilter()); - if (topicFilterSubscriber == null) { - topicFilterSubscriber = new TopicFilterSubscriber(subscription.getTopicFilterToken(), subscription.getMqttQoS(), subscription); - subscribers.put(subscription.getTopicFilterToken().getTopicFilter(), topicFilterSubscriber); - } else { - topicFilterSubscriber.getTopicSubscribers().put(subscription.getTopic().getTopic(), subscription); - } + TopicFilterSubscriber topicFilterSubscriber = subscribers.computeIfAbsent(subscription.getTopicFilterToken().getTopicFilter(), s -> new TopicFilterSubscriber(subscription.getTopicFilterToken(), subscription.getMqttQoS())); + + topicFilterSubscriber.getTopicSubscribers().put(subscription.getTopic().getTopic(), subscription); TopicSubscriber preTopicSubscriber = subscription.getTopic().getConsumeOffsets().put(this, subscription); if (preTopicSubscriber != null) { LOGGER.error("invalid state..."); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java index 75f1691f..05d8d1bb 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java @@ -30,17 +30,11 @@ class TopicFilterSubscriber { /** * 客户端订阅所匹配的Topic。通配符订阅时可能有多个 */ - private final Map topicSubscribers; + private final Map topicSubscribers = new HashMap<>(); public TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { this.topicFilterToken = topicFilterToken; this.mqttQoS = mqttQoS; - this.topicSubscribers = new HashMap<>(); - } - - public TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS, TopicSubscriber topicSubscriber) { - this(topicFilterToken, mqttQoS); - topicSubscribers.put(topicSubscriber.getTopic().getTopic(), topicSubscriber); } public TopicToken getTopicFilterToken() { diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index 4e4682d1..f5d23d1f 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -169,11 +169,11 @@ public class MqttClient extends AbstractSession { if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { setInflightQueue(new InflightQueue(this, 16)); connected = true; - consumeTask(); //重连情况下重新触发订阅逻辑 subscribes.forEach((k, v) -> { subscribe(k, v.getQoS(), v.getConsumer()); }); + consumeTask(); } //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 -- Gitee From 6dfa34124d113161158acbbc9243228abb18b7c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 30 May 2023 21:41:04 +0800 Subject: [PATCH 11/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/BrokerContext.java | 5 + .../mqtt/broker/BrokerContextImpl.java | 24 +++- .../smartboot/mqtt/broker/MqttSession.java | 118 ++++++------------ .../mqtt/broker/TopicFilterSubscriber.java | 4 +- .../mqtt/broker/topic/TopicPublishTree.java | 80 ++++++++++++ .../mqtt/broker/topic/TopicSubscribeTree.java | 78 ++++++++++++ 6 files changed, 225 insertions(+), 84 deletions(-) create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 208cf789..1e683933 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -13,6 +13,8 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; import org.smartboot.mqtt.broker.processor.MqttProcessor; import org.smartboot.mqtt.broker.provider.Providers; +import org.smartboot.mqtt.broker.topic.TopicPublishTree; +import org.smartboot.mqtt.broker.topic.TopicSubscribeTree; import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.message.MqttMessage; @@ -87,4 +89,7 @@ public interface BrokerContext { Map, MqttProcessor> getMessageProcessors(); + TopicPublishTree getPublishTopicTree(); + + TopicSubscribeTree getTopicSubscribeTree(); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 22bad722..7e7ab64d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -35,6 +35,8 @@ import org.smartboot.mqtt.broker.processor.SubscribeProcessor; import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; +import org.smartboot.mqtt.broker.topic.TopicPublishTree; +import org.smartboot.mqtt.broker.topic.TopicSubscribeTree; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; @@ -106,6 +108,9 @@ public class BrokerContextImpl implements BrokerContext { */ private final ConcurrentMap topicMap = new ConcurrentHashMap<>(); private BrokerConfigure brokerConfigure = new BrokerConfigure(); + private final TopicPublishTree topicPublishTree = new TopicPublishTree(); + + private final TopicSubscribeTree subscribeTopicTree = new TopicSubscribeTree(); /** * Keep-Alive监听线程 */ @@ -358,6 +363,13 @@ public class BrokerContextImpl implements BrokerContext { LOGGER.info("刷新订阅关系, {} 订阅了topic: {}", subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getTopic().getTopic()); subscriber.getTopic().getQueue().offer(subscriber); }); + + eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object.getTopicToken(), (session, topicFilterSubscriber) -> { + if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) { + return; + } + session.subscribeSuccess(topicFilterSubscriber.getMqttQoS(), topicFilterSubscriber.getTopicFilterToken(), object); + })); } private void notifyPush(BrokerTopic topic) { @@ -440,7 +452,7 @@ public class BrokerContextImpl implements BrokerContext { public BrokerTopic getOrCreateTopic(String topic) { return topicMap.computeIfAbsent(topic, topicName -> { ValidateUtils.isTrue(!MqttUtil.containsTopicWildcards(topicName), "invalid topicName: " + topicName); - BrokerTopic newTopic = new BrokerTopic(topicName); + BrokerTopic newTopic = topicPublishTree.addTopic(topic); eventBus.publish(ServerEventType.TOPIC_CREATE, newTopic); return newTopic; }); @@ -509,6 +521,16 @@ public class BrokerContextImpl implements BrokerContext { } } + @Override + public TopicPublishTree getPublishTopicTree() { + return topicPublishTree; + } + + @Override + public TopicSubscribeTree getTopicSubscribeTree() { + return subscribeTopicTree; + } + public void loadYamlConfig() throws IOException { String brokerConfig = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig); InputStream inputStream = null; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 6f892f1f..1691b7cb 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -18,11 +18,9 @@ import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties; -import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.transport.AioSession; @@ -45,6 +43,8 @@ public class MqttSession extends AbstractSession { */ private final Map subscribers = new ConcurrentHashMap<>(); + private final Map pub2sub = new ConcurrentHashMap<>(); + private final BrokerContext mqttContext; private String username; @@ -66,29 +66,6 @@ public class MqttSession extends AbstractSession { this.session = session; this.mqttWriter = mqttWriter; mqttContext.getEventBus().publish(ServerEventType.SESSION_CREATE, this); - mqttContext.getEventBus().subscribe(ServerEventType.TOPIC_CREATE, new EventBusSubscriber() { - @Override - public boolean enable() { - return !disconnect; - } - - @Override - public void subscribe(EventType eventType, BrokerTopic object) { - if (!mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) { - return; - } - for (TopicFilterSubscriber topicFilterSubscriber : subscribers.values()) { - if (subscribers.containsKey(object.getTopic())) { - break; - } - if (topicFilterSubscriber.getTopicFilterToken().isWildcards() && MqttUtil.match(object.getTopicToken(), topicFilterSubscriber.getTopicFilterToken())) { - TopicSubscriber subscription = subscribeSuccess(topicFilterSubscriber.getMqttQoS(), topicFilterSubscriber.getTopicFilterToken(), object); - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - } - } - - } - }); } public ConnectProperties getProperties() { @@ -157,46 +134,39 @@ public class MqttSession extends AbstractSession { public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) { if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) { - subscribe0(topicFilter, mqttQoS, true); + subscribe0(topicFilter, mqttQoS); return mqttQoS; } else { return MqttQoS.FAILURE; } } - private void subscribe0(String topicFilter, MqttQoS mqttQoS, boolean newSubscribe) { + private void subscribe0(String topicFilter, MqttQoS mqttQoS) { TopicToken topicToken = new TopicToken(topicFilter); - //精准匹配 - if (!topicToken.isWildcards()) { - BrokerTopic topic = mqttContext.getOrCreateTopic(topicToken.getTopicFilter());//可能会先触发TopicFilterSubscriber.subscribe - TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic); - if (newSubscribe) { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - } else { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, subscription); - } - return; - } + TopicFilterSubscriber subscriber = new TopicFilterSubscriber(topicToken, mqttQoS); + TopicFilterSubscriber preSubscriber = subscribers.put(topicFilter, subscriber); + ValidateUtils.isTrue(preSubscriber == null, "duplicate topic filter"); + mqttContext.getTopicSubscribeTree().subscribeTopic(this, subscriber); + mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(mqttQoS, topicToken, topic)); + } + + public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { + TopicToken subToken = pub2sub.get(topic); + if (subToken != null) { + if (!subToken.isWildcards()) { + return; + } else if (topicToken.isWildcards()) { + if (topicToken.getTopicFilter().length() > subToken.getTopicFilter().length()) { + TopicSubscriber preSubscription = subscribers.get(subToken.getTopicFilter()).getTopicSubscribers().remove(topic.getTopic()); + TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, preSubscription.getNextConsumerOffset(), preSubscription.getRetainConsumerOffset()); + subscription.setTopicFilterToken(topicToken); + topic.getConsumeOffsets().put(MqttSession.this, subscription); - //通配符匹配存量Topic - for (BrokerTopic topic : mqttContext.getTopics()) { - if (MqttUtil.match(topic.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topic.getTopic(), this)) { - TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic); - if (newSubscribe) { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - } else { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, subscription); } + return; } } - } - - /** - * retain消息消费点位记录 - */ - private final Map retainOffsetCache = new HashMap<>(); - - private TopicSubscriber subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { + pub2sub.put(topic, topicToken); long latestOffset = mqttContext.getProviders().getPersistenceProvider().getLatestOffset(topic.getTopic()); // retain消费点位优先以缓存为准 Long retainOffset = retainOffsetCache.get(topic); @@ -205,39 +175,24 @@ public class MqttSession extends AbstractSession { retainOffset = oldestRetainOffset; } //以当前消息队列的最新点位为起始点位 - TopicSubscriber subscription = new TopicSubscriber(topic, this, mqttQoS, latestOffset + 1, retainOffset); + TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, latestOffset + 1, retainOffset); subscription.setTopicFilterToken(topicToken); - // 如果服务端收到一个 SUBSCRIBE 报文, - //报文的主题过滤器与一个现存订阅的主题过滤器相同, - // 那么必须使用新的订阅彻底替换现存的订阅。 - // 新订阅的主题过滤器和之前订阅的相同,但是它的最大 QoS 值可以不同。 - ValidateUtils.isTrue(!disconnect, "session has closed,can not subscribe topic"); - - subscribers.values().forEach(topicFilterSubscriber -> { - TopicSubscriber oldOffset = topicFilterSubscriber.getTopicSubscribers().remove(topic.getTopic()); - if (oldOffset != null) { - TopicSubscriber consumerOffset = oldOffset.getTopic().getConsumeOffsets().remove(this); - consumerOffset.disable(); - LOGGER.info("remove topic:{} {},", topic, oldOffset == consumerOffset ? "success" : "fail"); - } - }); - - TopicFilterSubscriber topicFilterSubscriber = subscribers.computeIfAbsent(subscription.getTopicFilterToken().getTopicFilter(), s -> new TopicFilterSubscriber(subscription.getTopicFilterToken(), subscription.getMqttQoS())); + topic.getConsumeOffsets().put(MqttSession.this, subscription); + mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - topicFilterSubscriber.getTopicSubscribers().put(subscription.getTopic().getTopic(), subscription); - TopicSubscriber preTopicSubscriber = subscription.getTopic().getConsumeOffsets().put(this, subscription); - if (preTopicSubscriber != null) { - LOGGER.error("invalid state..."); - } else { - LOGGER.debug("new subscribe topic:{} success by topicFilter:{}", subscription.getTopic().getTopic(), subscription.getTopicFilterToken().getTopicFilter()); - } - - return subscription; + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic.getTopic(), subscription); } + /** + * retain消息消费点位记录 + */ + private final Map retainOffsetCache = new HashMap<>(); + public void resubscribe() { - subscribers.values().forEach(subscriber -> subscribe0(subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getMqttQoS(), false)); + subscribers.values().stream().filter(subscriber -> subscriber.getTopicFilterToken().isWildcards()).forEach(subscriber -> { + mqttContext.getPublishTopicTree().match(subscriber.getTopicFilterToken(), topic -> subscribeSuccess(subscriber.getMqttQoS(), subscriber.getTopicFilterToken(), topic)); + }); } public void unsubscribe(String topicFilter) { @@ -255,6 +210,7 @@ public class MqttSession extends AbstractSession { LOGGER.error("remove subscriber:{} error!", removeSubscriber); } }); + mqttContext.getTopicSubscribeTree().unsubscribe(this, filterSubscriber); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java index 05d8d1bb..2a6cc9ff 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java @@ -22,7 +22,7 @@ import java.util.Map; * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/7/13 */ -class TopicFilterSubscriber { +public class TopicFilterSubscriber { private final TopicToken topicFilterToken; private final MqttQoS mqttQoS; @@ -32,7 +32,7 @@ class TopicFilterSubscriber { */ private final Map topicSubscribers = new HashMap<>(); - public TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { + TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { this.topicFilterToken = topicFilterToken; this.mqttQoS = mqttQoS; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java new file mode 100644 index 00000000..929824b9 --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.smartboot.mqtt.broker.BrokerTopic; +import org.smartboot.mqtt.common.TopicToken; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 5/28/23 + */ +public class TopicPublishTree { + private BrokerTopic brokerTopic; + private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); + + public synchronized BrokerTopic addTopic(String topic) { + BrokerTopic brokerTopic = new BrokerTopic(topic); + TopicToken topicToken = brokerTopic.getTopicToken(); + TopicPublishTree treeNode = this; + while (true) { + treeNode = treeNode.subNode.computeIfAbsent(topicToken.getNode(), n -> new TopicPublishTree()); + if (topicToken.getNextNode() == null) { + break; + } else { + topicToken = topicToken.getNextNode(); + } + } + treeNode.brokerTopic = new BrokerTopic(topic); + return treeNode.brokerTopic; + } + + public void match(TopicToken topicToken, Consumer consumer) { + match(this, topicToken, consumer); + } + + private void match(TopicPublishTree treeNode, TopicToken topicToken, Consumer consumer) { + //匹配结束 + if (topicToken == null) { + if (treeNode.brokerTopic != null) { + consumer.accept(treeNode.brokerTopic); + } + return; + } + //合法的#通配符必然存在于末端 + if ("#".equals(topicToken.getNode())) { + treeNode.subNode.values().forEach(node -> { + subscribeChildren(node, consumer); + }); + } else if ("+".equals(topicToken.getNode())) { + treeNode.subNode.values().forEach(node -> { + match(node, topicToken.getNextNode(), consumer); + }); + } else { + TopicPublishTree node = treeNode.subNode.get(topicToken.getNode()); + if (node != null) { + match(node, topicToken.getNextNode(), consumer); + } + } + } + + private void subscribeChildren(TopicPublishTree treeNode, Consumer consumer) { + BrokerTopic brokerTopic = treeNode.brokerTopic; + if (brokerTopic != null) { + consumer.accept(brokerTopic); + } + //递归订阅Topic + treeNode.subNode.values().forEach(subNode -> subscribeChildren(subNode, consumer)); + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java new file mode 100644 index 00000000..a6e17e70 --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.broker.TopicFilterSubscriber; +import org.smartboot.mqtt.common.TopicToken; +import org.smartboot.mqtt.common.util.ValidateUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 5/28/23 + */ +public class TopicSubscribeTree { + private final Map subscribers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); + + public synchronized void subscribeTopic(MqttSession session, TopicFilterSubscriber subscriber) { + TopicSubscribeTree treeNode = this; + TopicToken token = subscriber.getTopicFilterToken(); + do { + treeNode = treeNode.subNode.computeIfAbsent(token.getNode(), n -> new TopicSubscribeTree()); + } while ((token = token.getNextNode()) != null); + treeNode.subscribers.put(session, subscriber); + } + + public void unsubscribe(MqttSession session, TopicFilterSubscriber subscriber) { + TopicSubscribeTree subscribeTree = this; + TopicToken topicToken = subscriber.getTopicFilterToken(); + while (true) { + subscribeTree = subscribeTree.subNode.get(topicToken.getNode()); + if (topicToken.getNextNode() == null) { + break; + } + topicToken = topicToken.getNextNode(); + } + subscribeTree.subscribers.remove(session); + } + + + public void match(TopicToken topicToken, BiConsumer consumer) { + //精确匹配 + TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode()); + if (subscribeTree != null) { + if (topicToken.getNextNode() == null) { + subscribers.forEach(consumer); + } else { + subscribeTree.match(topicToken.getNextNode(), consumer); + } + } + subscribeTree = subNode.get("#"); + if (subscribeTree != null) { + ValidateUtils.isTrue(subscribeTree.subNode.isEmpty(), "'#' node must be empty"); + subscribeTree.subscribers.forEach(consumer); + } + + subscribeTree = subNode.get("+"); + if (subscribeTree != null) { + if (topicToken.getNextNode() == null) { + subscribers.forEach(consumer); + } else { + subscribeTree.subNode.values().forEach(t -> match(topicToken.getNextNode(), consumer)); + } + } + } +} -- Gitee From bb9fcd4f3654a277d30367e21b20b3e62526418e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 30 May 2023 23:14:41 +0800 Subject: [PATCH 12/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/MqttBrokerMessageProcessor.java | 6 ++-- .../smartboot/mqtt/broker/MqttSession.java | 29 ++++++++++++------- .../mqtt/broker/TopicFilterSubscriber.java | 6 +++- .../mqtt/broker/TopicSubscriber.java | 6 +++- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 8bfae4a5..ee6d2287 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -82,9 +82,9 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor sub.setMqttQoS(mqttQoS)); + return; + } TopicToken topicToken = new TopicToken(topicFilter); - TopicFilterSubscriber subscriber = new TopicFilterSubscriber(topicToken, mqttQoS); + if (!topicToken.isWildcards()) { + mqttContext.getOrCreateTopic(topicFilter); + } + subscriber = new TopicFilterSubscriber(topicToken, mqttQoS); TopicFilterSubscriber preSubscriber = subscribers.put(topicFilter, subscriber); ValidateUtils.isTrue(preSubscriber == null, "duplicate topic filter"); mqttContext.getTopicSubscribeTree().subscribeTopic(this, subscriber); @@ -153,18 +162,15 @@ public class MqttSession extends AbstractSession { public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { TopicToken subToken = pub2sub.get(topic); if (subToken != null) { - if (!subToken.isWildcards()) { - return; - } else if (topicToken.isWildcards()) { - if (topicToken.getTopicFilter().length() > subToken.getTopicFilter().length()) { - TopicSubscriber preSubscription = subscribers.get(subToken.getTopicFilter()).getTopicSubscribers().remove(topic.getTopic()); - TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, preSubscription.getNextConsumerOffset(), preSubscription.getRetainConsumerOffset()); - subscription.setTopicFilterToken(topicToken); - topic.getConsumeOffsets().put(MqttSession.this, subscription); - + if (subToken.isWildcards()) { + if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > subToken.getTopicFilter().length()) { + TopicSubscriber preSubscription = subscribers.get(subToken.getTopicFilter()).getTopicSubscribers().get(topic.getTopic()); + preSubscription.setMqttQoS(mqttQoS); + preSubscription.setTopicFilterToken(topicToken); + pub2sub.put(topic, topicToken); } - return; } + return; } pub2sub.put(topic, topicToken); long latestOffset = mqttContext.getProviders().getPersistenceProvider().getLatestOffset(topic.getTopic()); @@ -202,6 +208,7 @@ public class MqttSession extends AbstractSession { } filterSubscriber.getTopicSubscribers().values().forEach(subscriber -> { TopicSubscriber removeSubscriber = subscriber.getTopic().getConsumeOffsets().remove(this); + pub2sub.remove(removeSubscriber.getTopic()); retainOffsetCache.put(subscriber.getTopic(), subscriber.getRetainConsumerOffset()); if (subscriber == removeSubscriber) { removeSubscriber.disable(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java index 2a6cc9ff..1a28d076 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java @@ -25,7 +25,7 @@ import java.util.Map; public class TopicFilterSubscriber { private final TopicToken topicFilterToken; - private final MqttQoS mqttQoS; + private MqttQoS mqttQoS; /** * 客户端订阅所匹配的Topic。通配符订阅时可能有多个 @@ -45,6 +45,10 @@ public class TopicFilterSubscriber { return mqttQoS; } + public void setMqttQoS(MqttQoS mqttQoS) { + this.mqttQoS = mqttQoS; + } + public Map getTopicSubscribers() { return topicSubscribers; } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 5c1ed6c8..e62da8a5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -41,7 +41,7 @@ public class TopicSubscriber { /** * 服务端向客户端发送应用消息所允许的最大 QoS 等级 */ - private final MqttQoS mqttQoS; + private MqttQoS mqttQoS; /** * 期望消费的点位 @@ -191,4 +191,8 @@ public class TopicSubscriber { public void disable() { this.enable = false; } + + public void setMqttQoS(MqttQoS mqttQoS) { + this.mqttQoS = mqttQoS; + } } -- Gitee From bee28421ebb10823b7d8c1f1f6f37e6496340e57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 31 May 2023 18:30:21 +0800 Subject: [PATCH 13/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/MqttSession.java | 23 ++++++++++--------- .../mqtt/broker/TopicFilterSubscriber.java | 4 ++-- .../smartboot/mqtt/common/InflightQueue.java | 4 ++-- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 82c97d99..e2b8d264 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -43,8 +43,6 @@ public class MqttSession extends AbstractSession { */ private final Map subscribers = new ConcurrentHashMap<>(); - private final Map pub2sub = new ConcurrentHashMap<>(); - private final BrokerContext mqttContext; private String username; @@ -160,19 +158,23 @@ public class MqttSession extends AbstractSession { } public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { - TopicToken subToken = pub2sub.get(topic); - if (subToken != null) { - if (subToken.isWildcards()) { - if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > subToken.getTopicFilter().length()) { - TopicSubscriber preSubscription = subscribers.get(subToken.getTopicFilter()).getTopicSubscribers().get(topic.getTopic()); + TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this); + if (topicSubscriber != null) { + //此前的订阅关系 + TopicToken preToken = topicSubscriber.getTopicFilterToken(); + if (preToken.isWildcards()) { + if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) { + //解除旧的订阅关系 + TopicSubscriber preSubscription = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); preSubscription.setMqttQoS(mqttQoS); preSubscription.setTopicFilterToken(topicToken); - pub2sub.put(topic, topicToken); + //绑定新的订阅关系 + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, preSubscription); + mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, preSubscription); } } return; } - pub2sub.put(topic, topicToken); long latestOffset = mqttContext.getProviders().getPersistenceProvider().getLatestOffset(topic.getTopic()); // retain消费点位优先以缓存为准 Long retainOffset = retainOffsetCache.get(topic); @@ -186,7 +188,7 @@ public class MqttSession extends AbstractSession { topic.getConsumeOffsets().put(MqttSession.this, subscription); mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic.getTopic(), subscription); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, subscription); } /** @@ -208,7 +210,6 @@ public class MqttSession extends AbstractSession { } filterSubscriber.getTopicSubscribers().values().forEach(subscriber -> { TopicSubscriber removeSubscriber = subscriber.getTopic().getConsumeOffsets().remove(this); - pub2sub.remove(removeSubscriber.getTopic()); retainOffsetCache.put(subscriber.getTopic(), subscriber.getRetainConsumerOffset()); if (subscriber == removeSubscriber) { removeSubscriber.disable(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java index 1a28d076..7047f2ef 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java @@ -30,7 +30,7 @@ public class TopicFilterSubscriber { /** * 客户端订阅所匹配的Topic。通配符订阅时可能有多个 */ - private final Map topicSubscribers = new HashMap<>(); + private final Map topicSubscribers = new HashMap<>(); TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { this.topicFilterToken = topicFilterToken; @@ -49,7 +49,7 @@ public class TopicFilterSubscriber { this.mqttQoS = mqttQoS; } - public Map getTopicSubscribers() { + public Map getTopicSubscribers() { return topicSubscribers; } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index db593fc8..b7fe6d37 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -159,7 +159,7 @@ public class InflightQueue { */ public void notify(MqttPacketIdentifierMessage message) { InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; - if (inflightMessage == null && message.getFixedHeader().isDup()) { + if (inflightMessage == null) { LOGGER.info("ignore duplicate message"); return; } @@ -200,7 +200,7 @@ public class InflightQueue { } } - public synchronized void commit(InflightMessage inflightMessage) { + private synchronized void commit(InflightMessage inflightMessage) { MqttVariableMessage originalMessage = inflightMessage.getOriginalMessage(); ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message"); inflightMessage.setCommit(true); -- Gitee From 714aaf2c08933bb2744993ee68697882d958beae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 31 May 2023 19:09:09 +0800 Subject: [PATCH 14/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/smartboot/mqtt/common/InflightQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index b7fe6d37..75633a68 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -109,7 +109,7 @@ public class InflightQueue { // System.out.println("message has commit,ignore retry monitor"); return; } - if (session.isDisconnect()) { + if (session.session.isInvalid()) { LOGGER.debug("session is disconnect , pause qos monitor."); return; } -- Gitee From e3d108cd919b0b825a6ba144db10e367ea4f9b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 31 May 2023 22:35:14 +0800 Subject: [PATCH 15/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 4 ++-- .../org/smartboot/mqtt/client/MqttClient.java | 19 +++++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index dc762a83..1a561048 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,6 @@ services: options: max-size: "100m" max-file: "1" -# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe - command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish + command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe +# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish version: '3.7' \ No newline at end of file diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index f5d23d1f..545425cf 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -432,14 +432,25 @@ public class MqttClient extends AbstractSession { InflightQueue inflightQueue = getInflightQueue(); InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> { consumer.accept(message.getVariableHeader().getPacketId()); - consumeTask(); + //最早发送的消息若收到响应,则更新点位 + synchronized (MqttClient.this) { + MqttClient.this.notifyAll(); + } }); if (inflightMessage == null) { - registeredTasks.offer(() -> publish(publishBuilder, consumer, autoFlush)); - } else if (autoFlush) { + try { + synchronized (this) { + wait(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + publish(publishBuilder, consumer, autoFlush); + return; + } + if (autoFlush) { flush(); } - } public MqttClientConfigure getClientConfigure() { -- Gitee From 4091ac896cf2b9957e34cacc448ea1dc0cc974fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Thu, 1 Jun 2023 19:04:55 +0800 Subject: [PATCH 16/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/client/MqttClient.java | 21 +++++++++-- .../mqtt/client/MqttClientProcessor.java | 35 ++++++++++++++----- .../client/processor/PublishProcessor.java | 14 +++++--- .../smartboot/mqtt/common/InflightQueue.java | 2 +- 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index 545425cf..ce286e74 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -75,7 +75,12 @@ public class MqttClient extends AbstractSession { * 客户端配置项 */ private final MqttClientConfigure clientConfigure = new MqttClientConfigure(); - private final AbstractMessageProcessor messageProcessor = new MqttClientProcessor(this); + private static final AbstractMessageProcessor messageProcessor = new MqttClientProcessor(); + + static { + messageProcessor.addPlugin(new QosRetryPlugin()); + } + /** * 完成connect之前注册的事件 */ @@ -85,6 +90,8 @@ public class MqttClient extends AbstractSession { */ private final Map subscribes = new ConcurrentHashMap<>(); + private final Map mapping = new ConcurrentHashMap<>(); + private final List wildcardsToken = new LinkedList<>(); private AioQuickClient client; @@ -215,7 +222,7 @@ public class MqttClient extends AbstractSession { }, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS); } // messageProcessor.addPlugin(new StreamMonitorPlugin<>()); - messageProcessor.addPlugin(new QosRetryPlugin()); + client = new AioQuickClient(clientConfigure.getHost(), clientConfigure.getPort(), new MqttProtocol(clientConfigure.getMaxPacketSize()), messageProcessor); try { if (bufferPagePool != null) { @@ -223,7 +230,9 @@ public class MqttClient extends AbstractSession { } client.setReadBufferSize(clientConfigure.getBufferSize()).setWriteBuffer(clientConfigure.getBufferSize(), 8).connectTimeout(clientConfigure.getConnectionTimeout()); session = client.start(asynchronousChannelGroup); - session.setAttachment(new Attachment()); + Attachment attachment = new Attachment(); + session.setAttachment(attachment); + attachment.put(MqttClientProcessor.SESSION_KEY, this); setMqttVersion(clientConfigure.getMqttVersion()); mqttWriter = new DefaultMqttWriter(session.writeBuffer()); @@ -305,6 +314,7 @@ public class MqttClient extends AbstractSession { subscribes.remove(unsubscribedTopic); wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter())); } + mapping.clear(); consumeTask(); }); flush(); @@ -360,6 +370,7 @@ public class MqttClient extends AbstractSession { } else { LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); } + mapping.clear(); subAckConsumer.accept(MqttClient.this, minQos); } consumeTask(); @@ -461,6 +472,10 @@ public class MqttClient extends AbstractSession { return subscribes; } + public Map getMapping() { + return mapping; + } + public List getWildcardsToken() { return wildcardsToken; } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java index b15876b6..ad328192 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java @@ -20,6 +20,7 @@ import org.smartboot.mqtt.client.processor.PubRelProcessor; import org.smartboot.mqtt.client.processor.PublishProcessor; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; +import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.mqtt.common.message.MqttPingRespMessage; @@ -29,9 +30,12 @@ import org.smartboot.mqtt.common.message.MqttPubRecMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.MqttSubAckMessage; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.util.AttachKey; +import org.smartboot.socket.util.Attachment; import java.util.HashMap; import java.util.Map; @@ -42,7 +46,7 @@ import java.util.Map; */ public class MqttClientProcessor extends AbstractMessageProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(MqttClientProcessor.class); - private final MqttClient mqttClient; + final static AttachKey SESSION_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_SESSION); private static final Map, MqttProcessor> processors = new HashMap<>(); static { @@ -52,21 +56,20 @@ public class MqttClientProcessor extends AbstractMessageProcessor { processors.put(MqttPubRecMessage.class, new MqttAckProcessor()); processors.put(MqttPubCompMessage.class, new MqttAckProcessor()); processors.put(MqttPubRelMessage.class, new PubRelProcessor()); - processors.put(MqttSubAckMessage.class, new MqttAckProcessor()); + processors.put(MqttSubAckMessage.class, new MqttAckProcessor()); processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor()); } - public MqttClientProcessor(MqttClient mqttClient) { - this.mqttClient = mqttClient; - } @Override public void process0(AioSession session, MqttMessage msg) { - mqttClient.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(mqttClient, msg)); + Attachment attachment = session.getAttachment(); + MqttClient client = attachment.get(SESSION_KEY); + client.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(client, msg)); MqttProcessor processor = processors.get(msg.getClass()); // LOGGER.info("receive msg:{}", msg); if (processor != null) { - processor.process(mqttClient, msg); + processor.process(client, msg); } else { LOGGER.error("unknown msg:{}", msg); } @@ -74,7 +77,23 @@ public class MqttClientProcessor extends AbstractMessageProcessor { @Override public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { -// System.out.println(stateMachineEnum); + switch (stateMachineEnum) { + case DECODE_EXCEPTION: + LOGGER.error("decode exception", throwable); + break; + case SESSION_CLOSED: + Attachment attachment = session.getAttachment(); + attachment.get(SESSION_KEY).disconnect(); + break; + case PROCESS_EXCEPTION: + if (throwable instanceof MqttException) { + LOGGER.warn("process exception", throwable); + ((MqttException) throwable).getCallback().run(); + } + break; + default: + break; + } if (throwable != null) { throwable.printStackTrace(); } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java index 3b053b07..d69fd428 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java @@ -61,12 +61,18 @@ public class PublishProcessor implements MqttProcessor { private void processPublishMessage(MqttPublishMessage mqttPublishMessage, MqttClient mqttClient) { MqttPublishVariableHeader header = mqttPublishMessage.getVariableHeader(); - Subscribe subscribe = mqttClient.getSubscribes().get(header.getTopicName()); - - //尝试通配符匹配 + Subscribe subscribe = mqttClient.getMapping().get(header.getTopicName()); if (subscribe == null) { - subscribe = matchWildcardsSubscribe(mqttClient, header.getTopicName()); + subscribe = mqttClient.getSubscribes().get(header.getTopicName()); + //尝试通配符匹配 + if (subscribe == null) { + subscribe = matchWildcardsSubscribe(mqttClient, header.getTopicName()); + } + if (subscribe != null) { + mqttClient.getMapping().put(header.getTopicName(), subscribe); + } } + // If unsubscribed, maybe null. if (subscribe != null && !subscribe.getUnsubscribed()) { subscribe.getConsumer().accept(mqttClient, mqttPublishMessage); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 75633a68..0b3b974c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -42,7 +42,7 @@ import java.util.function.Consumer; public class InflightQueue { private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class); static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.RETRY_TASK); - private static final int TIMEOUT = 3; + private static final int TIMEOUT = 30; private final InflightMessage[] queue; private int takeIndex; private int putIndex; -- Gitee From b3a5a165eb8ce982743efb8696686706dc56fa54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Thu, 1 Jun 2023 21:14:09 +0800 Subject: [PATCH 17/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java | 2 +- .../org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java index 929824b9..99949af9 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicPublishTree.java @@ -24,7 +24,7 @@ public class TopicPublishTree { private BrokerTopic brokerTopic; private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); - public synchronized BrokerTopic addTopic(String topic) { + public BrokerTopic addTopic(String topic) { BrokerTopic brokerTopic = new BrokerTopic(topic); TopicToken topicToken = brokerTopic.getTopicToken(); TopicPublishTree treeNode = this; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java index a6e17e70..1ab0df22 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -27,7 +27,7 @@ public class TopicSubscribeTree { private final Map subscribers = new ConcurrentHashMap<>(); private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); - public synchronized void subscribeTopic(MqttSession session, TopicFilterSubscriber subscriber) { + public void subscribeTopic(MqttSession session, TopicFilterSubscriber subscriber) { TopicSubscribeTree treeNode = this; TopicToken token = subscriber.getTopicFilterToken(); do { -- Gitee From c316e08641e2d408994fa7935201e402c16aa08c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 2 Jun 2023 08:56:16 +0800 Subject: [PATCH 18/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/MqttBrokerMessageProcessor.java | 6 +++--- .../java/org/smartboot/mqtt/client/MqttClientProcessor.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index ee6d2287..8bfae4a5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -82,9 +82,9 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor { default: break; } - if (throwable != null) { - throwable.printStackTrace(); - } +// if (throwable != null) { +// throwable.printStackTrace(); +// } } } -- Gitee From 657f14efd3dc3436f39d39803d9cd6f220c7fa61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 2 Jun 2023 22:40:51 +0800 Subject: [PATCH 19/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/BrokerContextImpl.java | 18 ++- .../smartboot/mqtt/broker/MqttSession.java | 3 +- .../mqtt/broker/TopicSubscriber.java | 75 ++++++------ .../mqtt/broker/eventbus/ServerEventType.java | 2 + .../broker/processor/SubscribeProcessor.java | 2 +- .../org/smartboot/mqtt/client/MqttClient.java | 51 ++++---- .../mqtt/common/AbstractSession.java | 9 +- .../mqtt/common/InflightMessage.java | 15 +-- .../smartboot/mqtt/common/InflightQueue.java | 112 +++++++++++------- 9 files changed, 152 insertions(+), 135 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 7e7ab64d..42a9313e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -49,6 +49,7 @@ import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttConnectMessage; import org.smartboot.mqtt.common.message.MqttDisconnectMessage; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPingReqMessage; import org.smartboot.mqtt.common.message.MqttPubAckMessage; import org.smartboot.mqtt.common.message.MqttPubCompMessage; @@ -57,6 +58,7 @@ import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.MqttMessageBuilders; @@ -84,6 +86,7 @@ import java.util.Properties; import java.util.ServiceLoader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -311,6 +314,8 @@ public class BrokerContextImpl implements BrokerContext { notifyPush(brokerTopic); }); + eventBus.subscribe(ServerEventType.NOTIFY_TOPIC_PUSH, (eventType, object) -> notifyPush(object)); + //一个新的订阅建立时,对每个匹配的主题名,如果存在最近保留的消息,它必须被发送给这个订阅者 eventBus.subscribe(ServerEventType.SUBSCRIBE_TOPIC, new EventBusSubscriber() { @Override @@ -324,12 +329,6 @@ public class BrokerContextImpl implements BrokerContext { BrokerTopic topic = subscriber.getTopic(); topic.getQueue().offer(subscriber); notifyPush(topic); -// -// int preVersion = subscriber.getTopic().getVersion().get(); -// subscriber.batchPublish(BrokerContextImpl.this); -// if (preVersion != subscriber.getTopic().getVersion().get()) { -// notifyPush(subscriber.getTopic()); -// } return; } //retain采用严格顺序publish模式 @@ -349,7 +348,8 @@ public class BrokerContextImpl implements BrokerContext { } InflightQueue inflightQueue = session.getInflightQueue(); // retain消息逐个推送 - inflightQueue.offer(publishBuilder, (mqtt) -> { + CompletableFuture> future = inflightQueue.put(publishBuilder); + future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { LOGGER.info("publish retain to client:{} success ", session.getClientId()); subscriber.setRetainConsumerOffset(offset + 1); retainPushThreadPool.execute(task); @@ -359,10 +359,6 @@ public class BrokerContextImpl implements BrokerContext { }); } }); - eventBus.subscribe(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, (eventType, subscriber) -> { - LOGGER.info("刷新订阅关系, {} 订阅了topic: {}", subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getTopic().getTopic()); - subscriber.getTopic().getQueue().offer(subscriber); - }); eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object.getTopicToken(), (session, topicFilterSubscriber) -> { if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index e2b8d264..02a990fc 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -186,9 +186,8 @@ public class MqttSession extends AbstractSession { TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, latestOffset + 1, retainOffset); subscription.setTopicFilterToken(topicToken); topic.getConsumeOffsets().put(MqttSession.this, subscription); - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, subscription); + mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); } /** diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index e62da8a5..09378fd5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -12,17 +12,20 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.eventbus.ServerEventType; import org.smartboot.mqtt.broker.provider.PersistenceProvider; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; -import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventType; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.util.MqttMessageBuilders; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; /** @@ -76,25 +79,27 @@ public class TopicSubscriber { return; } semaphore.release(); - publish0(brokerContext, 0); + int i = 16; + while (publishAvailable(brokerContext)) { + if (i-- == 0) { + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + topic.getVersion().incrementAndGet(); + } + break; + } + } mqttSession.flush(); } - private void publish0(BrokerContext brokerContext, final int depth) { + private boolean publishAvailable(BrokerContext brokerContext) { PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider(); PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset); if (persistenceMessage == null) { if (semaphore.tryAcquire()) { topic.getQueue().offer(this); } - return; - } - if (depth > 16) { - if (semaphore.tryAcquire()) { - topic.getQueue().offer(this); - topic.getVersion().incrementAndGet(); - } - return; + return false; } MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic()); @@ -109,38 +114,30 @@ public class TopicSubscriber { //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); - publish0(brokerContext, depth + 1); - return; - } - InflightMessage suc; - if (depth == 0) { - suc = inflightQueue.offer(publishBuilder, (mqtt) -> { - //最早发送的消息若收到响应,则更新点位 - commitNextConsumerOffset(offset + 1); - if (persistenceMessage.isRetained()) { - setRetainConsumerOffset(getRetainConsumerOffset() + 1); - } - commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); - publish0(brokerContext, 1); - }, () -> publish0(brokerContext, 0)); - } else { - suc = inflightQueue.offer(publishBuilder, (mqtt) -> { - //最早发送的消息若收到响应,则更新点位 - commitNextConsumerOffset(offset + 1); - if (persistenceMessage.isRetained()) { - setRetainConsumerOffset(getRetainConsumerOffset() + 1); - } - commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); - publish0(brokerContext, 1); - }); + return true; } - // 飞行队列已满 - if (suc != null) { - //递归处理下一个消息 - publish0(brokerContext, depth + 1); + CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> { + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + topic.getVersion().incrementAndGet(); + } + brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); + }); + if (future == null) { + return false; } + future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { + //最早发送的消息若收到响应,则更新点位 + commitNextConsumerOffset(offset + 1); + if (persistenceMessage.isRetained()) { + setRetainConsumerOffset(getRetainConsumerOffset() + 1); + } + commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); + publishAvailable(brokerContext); + }); + return true; } public BrokerTopic getTopic() { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java index d4bf3ef3..4fcc38d3 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java @@ -86,6 +86,8 @@ public class ServerEventType extends EventType { */ public static final ServerEventType> CONNACK = new ServerEventType<>("connect"); + public static final ServerEventType NOTIFY_TOPIC_PUSH = new ServerEventType<>("notify_topic_push"); + protected ServerEventType(String name) { super(name); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java index b6c9f38e..fa281f9c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java @@ -60,6 +60,6 @@ public class SubscribeProcessor extends AuthorizedMqttProcessor { + CompletableFuture> future = getInflightQueue().put(unsubscribeBuilder); + future.whenComplete((message, throwable) -> { ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); for (String unsubscribedTopic : unsubscribedTopics) { subscribes.remove(unsubscribedTopic); @@ -353,7 +356,18 @@ public class MqttClient extends AbstractSession { subscribeBuilder.subscribeProperties(new SubscribeProperties()); } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - InflightMessage inflightMessage = getInflightQueue().offer(subscribeBuilder, (message) -> { + + CompletableFuture> future = getInflightQueue().offer(subscribeBuilder, new Runnable() { + @Override + public void run() { + + } + }); + if (future == null) { + registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); + return; + } + future.whenComplete((message, throwable) -> { List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); int i = 0; @@ -375,11 +389,8 @@ public class MqttClient extends AbstractSession { } consumeTask(); }); - if (inflightMessage == null) { - registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); - } else { - flush(); - } + flush(); + } public void notifyResponse(MqttConnAckMessage connAckMessage) { @@ -440,25 +451,8 @@ public class MqttClient extends AbstractSession { consumer.accept(0); return; } - InflightQueue inflightQueue = getInflightQueue(); - InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> { - consumer.accept(message.getVariableHeader().getPacketId()); - //最早发送的消息若收到响应,则更新点位 - synchronized (MqttClient.this) { - MqttClient.this.notifyAll(); - } - }); - if (inflightMessage == null) { - try { - synchronized (this) { - wait(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - publish(publishBuilder, consumer, autoFlush); - return; - } + CompletableFuture> future = inflightQueue.put(publishBuilder); + future.whenComplete((message, throwable) -> consumer.accept(message.getVariableHeader().getPacketId())); if (autoFlush) { flush(); } @@ -487,6 +481,9 @@ public class MqttClient extends AbstractSession { */ @Override public void disconnect() { + if (disconnect) { + return; + } //DISCONNECT 报文是客户端发给服务端的最后一个控制报文。表示客户端正常断开连接。 write(new MqttDisconnectMessage()); //关闭自动重连 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index 8c7d78a6..3099892d 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -25,8 +25,7 @@ import org.smartboot.socket.util.Attachment; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Hashtable; /** * @author 三刀(zhengjunweimail@163.com) @@ -54,8 +53,8 @@ public abstract class AbstractSession { private MqttVersion mqttVersion; - private InflightQueue inflightQueue; - private final Map ackMessageCacheMap = new ConcurrentHashMap<>(); + protected InflightQueue inflightQueue; + private final Hashtable ackMessageCacheMap = new Hashtable<>(); public AbstractSession(EventBus eventBus) { this.eventBus = eventBus; @@ -77,7 +76,7 @@ public abstract class AbstractSession { public final synchronized void write(MqttMessage mqttMessage, boolean autoFlush) { try { if (disconnect) { - this.disconnect(); +// this.disconnect(); ValidateUtils.isTrue(false, "已断开连接,无法发送消息"); } mqttMessage.setVersion(mqttVersion); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java index 168c10d7..4d10632f 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java @@ -18,7 +18,7 @@ import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; -import java.util.function.Consumer; +import java.util.concurrent.CompletableFuture; /** * @author 三刀(zhengjunweimail@163.com) @@ -31,6 +31,7 @@ public class InflightMessage { private final MqttPacketIdentifierMessage originalMessage; private MqttPacketIdentifierMessage responseMessage; + private final CompletableFuture> future = new CompletableFuture<>(); /** * 飞行队列为其分配的packetId */ @@ -40,16 +41,13 @@ public class InflightMessage { private boolean commit; - private final Consumer> consumer; - private int retryCount; private long latestTime; - public InflightMessage(int packetId, MqttPacketIdentifierMessage originalMessage, Consumer> consumer) { + public InflightMessage(int packetId, MqttPacketIdentifierMessage originalMessage) { this.assignedPacketId = packetId; this.originalMessage = originalMessage; - this.consumer = consumer; if (originalMessage instanceof MqttSubscribeMessage) { this.expectMessageType = MqttMessageType.SUBACK; } else if (originalMessage instanceof MqttUnsubscribeMessage) { @@ -104,10 +102,6 @@ public class InflightMessage { this.latestTime = latestTime; } - public final Consumer> getConsumer() { - return consumer; - } - public int getAssignedPacketId() { return assignedPacketId; } @@ -120,4 +114,7 @@ public class InflightMessage { this.responseMessage = responseMessage; } + public CompletableFuture> getFuture() { + return future; + } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 0b3b974c..9c385d2e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; +import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.mqtt.common.message.MqttFixedHeader; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; @@ -30,10 +31,11 @@ import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; import org.smartboot.socket.util.QuickTimerTask; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * @author 三刀(zhengjunweimail@163.com) @@ -51,7 +53,13 @@ public class InflightQueue { private final AtomicInteger packetId = new AtomicInteger(0); private final AbstractSession session; - private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); + + private final ReentrantLock lock = new ReentrantLock(false); + + private final Condition notEmpty = lock.newCondition(); + + + private final Condition notFull = lock.newCondition(); public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); @@ -59,40 +67,64 @@ public class InflightQueue { this.session = session; } - public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { - return offer(publishBuilder, consumer, null); + public CompletableFuture> put(MqttMessageBuilders.MessageBuilder publishBuilder) { + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + while (count == queue.length) { + session.flush(); + notFull.await(); + } + return enqueue(publishBuilder); + } catch (Exception e) { + throw new MqttException("put message into inflight queue exception", e); + } finally { + lock.unlock(); + } } - public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer, Runnable runnable) { - InflightMessage inflightMessage; - synchronized (this) { + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { if (count == queue.length) { - if (runnable != null) { - runnables.offer(runnable); + int i = putIndex - 1; + if (i < 0) { + i = queue.length - 1; } + queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run()); return null; + } else { + return enqueue(publishBuilder); } - int id = packetId.incrementAndGet(); - // 16位无符号最大值65535 - if (id > 65535) { - id = id % queue.length + queue.length; - packetId.set(id); - } - MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build(); - inflightMessage = new InflightMessage(id, mqttMessage, consumer); - queue[putIndex++] = inflightMessage; - if (putIndex == queue.length) { - putIndex = 0; - } - count++; + } finally { + lock.unlock(); + } + } - //启动消息质量监测 - if (count == 1) { - retry(inflightMessage); - } + + public CompletableFuture> enqueue(MqttMessageBuilders.MessageBuilder publishBuilder) { + + int id = packetId.incrementAndGet(); + // 16位无符号最大值65535 + if (id > 65535) { + id = id % queue.length + queue.length; + packetId.set(id); + } + MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build(); + InflightMessage inflightMessage = new InflightMessage(id, mqttMessage); + queue[putIndex++] = inflightMessage; + if (putIndex == queue.length) { + putIndex = 0; + } + count++; + + //启动消息质量监测 + if (count == 1) { + retry(inflightMessage); } session.write(inflightMessage.getOriginalMessage(), count == queue.length); - return inflightMessage; + return inflightMessage.getFuture(); } /** @@ -174,7 +206,13 @@ public class InflightQueue { } inflightMessage.setResponseMessage(message); inflightMessage.setLatestTime(System.currentTimeMillis()); - commit(inflightMessage); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + commit(inflightMessage); + } finally { + lock.unlock(); + } break; } case PUBREC: @@ -200,7 +238,7 @@ public class InflightQueue { } } - private synchronized void commit(InflightMessage inflightMessage) { + private void commit(InflightMessage inflightMessage) { MqttVariableMessage originalMessage = inflightMessage.getOriginalMessage(); ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message"); inflightMessage.setCommit(true); @@ -210,13 +248,14 @@ public class InflightQueue { } queue[takeIndex++] = null; count--; + notFull.signal(); if (takeIndex == queue.length) { takeIndex = 0; } - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + inflightMessage.getFuture().complete(inflightMessage.getResponseMessage()); while (count > 0 && queue[takeIndex].isCommit()) { inflightMessage = queue[takeIndex]; - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + inflightMessage.getFuture().complete(inflightMessage.getResponseMessage()); queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; @@ -230,14 +269,5 @@ public class InflightQueue { InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } - - while (count < queue.length) { - Runnable runnable = runnables.poll(); - if (runnable != null) { - runnable.run(); - } else { - break; - } - } } } \ No newline at end of file -- Gitee From c913804394fc303a25a58af5115214f1589d22a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 2 Jun 2023 23:54:38 +0800 Subject: [PATCH 20/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/common/InflightQueue.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 9c385d2e..8c955bba 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -56,9 +56,6 @@ public class InflightQueue { private final ReentrantLock lock = new ReentrantLock(false); - private final Condition notEmpty = lock.newCondition(); - - private final Condition notFull = lock.newCondition(); public InflightQueue(AbstractSession session, int size) { @@ -72,7 +69,6 @@ public class InflightQueue { try { lock.lockInterruptibly(); while (count == queue.length) { - session.flush(); notFull.await(); } return enqueue(publishBuilder); @@ -248,7 +244,7 @@ public class InflightQueue { } queue[takeIndex++] = null; count--; - notFull.signal(); + if (takeIndex == queue.length) { takeIndex = 0; } @@ -262,7 +258,9 @@ public class InflightQueue { } count--; } - + if (count < queue.length) { + notFull.signal(); + } if (count > 0) { //注册超时监听任务 Attachment attachment = session.session.getAttachment(); -- Gitee From 36e309da0db0100cfe662b586704551c46980830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 3 Jun 2023 08:15:09 +0800 Subject: [PATCH 21/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/broker/BrokerContextImpl.java | 2 +- .../src/main/java/org/smartboot/mqtt/client/MqttClient.java | 5 ++++- .../main/java/org/smartboot/mqtt/common/InflightQueue.java | 6 ++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 42a9313e..bdf8b6fa 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -348,7 +348,7 @@ public class BrokerContextImpl implements BrokerContext { } InflightQueue inflightQueue = session.getInflightQueue(); // retain消息逐个推送 - CompletableFuture> future = inflightQueue.put(publishBuilder); + CompletableFuture> future = inflightQueue.offer(publishBuilder); future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { LOGGER.info("publish retain to client:{} success ", session.getClientId()); subscriber.setRetainConsumerOffset(offset + 1); diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index a2fb986a..c4160ddb 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -310,7 +310,10 @@ public class MqttClient extends AbstractSession { unsubscribeBuilder.properties(properties); } // wait ack message. - CompletableFuture> future = getInflightQueue().put(unsubscribeBuilder); + CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder, () -> registeredTasks.offer(() -> unsubscribe0(topics))); + if (future == null) { + return; + } future.whenComplete((message, throwable) -> { ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); for (String unsubscribedTopic : unsubscribedTopics) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 8c955bba..9859587e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -79,6 +79,12 @@ public class InflightQueue { } } + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) { + return offer(publishBuilder, () -> { + + }); + } + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) { final ReentrantLock lock = this.lock; lock.lock(); -- Gitee From 48174362f9892f9eeb1efcb37eb2eca11afb1b95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 3 Jun 2023 08:52:47 +0800 Subject: [PATCH 22/22] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1a561048..9367fa4e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,6 @@ services: options: max-size: "100m" max-file: "1" - command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe -# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish + command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe +# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish version: '3.7' \ No newline at end of file -- Gitee