From c09a823106037f7779adf46add8617184764938c Mon Sep 17 00:00:00 2001 From: luxurong Date: Sat, 25 Dec 2021 11:10:29 +0800 Subject: [PATCH 1/8] version --- .../io/github/quickmsg/core/mqtt/AbstractReceiveContext.java | 2 +- .../java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java | 4 +++- .../io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java index f6f38517..288e898e 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java @@ -98,7 +98,7 @@ public abstract class AbstractReceiveContext implements this.messageRegistry.startUp(abstractConfiguration.getEnvironmentMap()); this.metricManager = metricManager(abstractConfiguration.getMeterConfig()); Optional.ofNullable(abstractConfiguration.getSourceDefinitions()).ifPresent(sourceDefinitions -> sourceDefinitions.forEach(SourceManager::loadSource)); - this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,512); + this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,50); } private TrafficHandlerLoader trafficHandlerLoader() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java index 9b3f0271..8a6a318c 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java @@ -30,7 +30,9 @@ public class MqttReceiveContext extends AbstractReceiveContext log.error("on connect error",throwable)) + .onErrorContinue((throwable, o) -> { + log.error("on message error {}",o,throwable); + }) .subscribe(mqttMessage -> this.accept(mqttChannel, new SmqttMessage<>(mqttMessage,System.currentTimeMillis(),Boolean.FALSE))); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java index 4007bacd..69b7cddb 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java @@ -45,7 +45,7 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { @Override public void chooseProtocol(MqttChannel mqttChannel, SmqttMessage smqttMessage, ReceiveContext receiveContext) { MqttMessage mqttMessage = smqttMessage.getMessage(); - if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) { + if (mqttMessage!=null && mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) { log.info(" 【{}】【{}】 【{}】", Thread.currentThread().getName(), mqttMessage.fixedHeader().messageType(), @@ -55,6 +55,7 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { .doParseProtocol(smqttMessage, mqttChannel) .contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext)) .subscribeOn(scheduler) + .onErrorContinue(((throwable, o) -> {})) .subscribe(aVoid -> { }, error -> { log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage()); -- Gitee From 191508d6fa57a914a9a9e5a73c943868e7f7d20f Mon Sep 17 00:00:00 2001 From: Easy <1091927336@qq.com> Date: Mon, 3 Jan 2022 14:18:54 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E4=BA=92=E8=B8=A2=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E4=B8=AD,=20=E6=8C=87=E5=AE=9A=E6=97=B6=E9=97=B4=E5=86=85?= =?UTF-8?q?=E4=B8=8D=E4=BA=92=E8=B8=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yaml | 3 ++- .../github/quickmsg/common/channel/MqttChannel.java | 5 ++++- .../quickmsg/common/config/BootstrapConfig.java | 11 ++++++++--- .../java/io/github/quickmsg/core/Bootstrap.java | 1 + .../quickmsg/core/mqtt/MqttConfiguration.java | 4 +++- .../quickmsg/core/protocol/ConnectProtocol.java | 13 +++++++++++-- 6 files changed, 29 insertions(+), 8 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index f0562066..088e023b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,7 +1,8 @@ smqtt: logLevel: INFO # 系统日志 tcp: # tcp配置 - connectModel: UNIQUE # UNIQUE 唯一 KICK 互踢 + connectModel: KICK # UNIQUE 唯一 KICK 互踢 + notKickSecond: 30 # KICK互踢模式生效, 单位秒, 指定时间内客户端不互踢, 避免客户端自动连接持续互踢 port: 1883 # mqtt端口号 username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index 3ac5af9b..aace44e3 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -45,6 +45,8 @@ public class MqttChannel { private long authTime; + private long connectTime; + private boolean sessionPersistent; private Will will; @@ -273,7 +275,8 @@ public class MqttChannel { MqttMessage reply = getReplyMqttMessage(mqttMessage); Runnable runnable = () -> mqttChannel.write(Mono.just(reply)).subscribe(); - Runnable cleaner = () -> MessageUtils.safeRelease(reply);; + Runnable cleaner = () -> MessageUtils.safeRelease(reply); + ; Ack ack = new RetryAck(mqttChannel.generateId(reply.fixedHeader().messageType(), getMessageId(reply)), 5, 5, runnable, mqttChannel.getTimeAckManager(), cleaner); ack.start(); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java index 7bb7ccfe..6776a4da 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java @@ -32,7 +32,7 @@ public class BootstrapConfig { smqttConfig.setLogLevel("INFO"); bootstrapConfig.setSmqttConfig(smqttConfig); smqttConfig.setClusterConfig(ClusterConfig.builder() - .enable(false).build()); + .enable(false).build()); smqttConfig.setHttpConfig(HttpConfig.builder() .enable(false).build()); smqttConfig.setWebsocketConfig(WebsocketConfig.builder() @@ -114,6 +114,12 @@ public class BootstrapConfig { @Builder.Default private ConnectModel connectModel = ConnectModel.UNIQUE; + + /** + * 不互踢时间 默认60s + */ + @Builder.Default + private Integer notKickSecond = 60; /** * 端口 */ @@ -177,7 +183,7 @@ public class BootstrapConfig { /** * 单个连接读写字节限制 */ - private String channelReadWriteSize; + private String channelReadWriteSize; /** @@ -437,7 +443,6 @@ public class BootstrapConfig { /** * 指标配置 - */ @Data @Builder diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index 304178d8..2ab1843f 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -77,6 +77,7 @@ public class Bootstrap { } } Optional.ofNullable(tcpConfig.getConnectModel()).ifPresent(mqttConfiguration::setConnectModel); + Optional.ofNullable(tcpConfig.getNotKickSecond()).ifPresent(mqttConfiguration::setNotKickSecond); Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort); Optional.ofNullable(tcpConfig.getLowWaterMark()).ifPresent(mqttConfiguration::setLowWaterMark); Optional.ofNullable(tcpConfig.getHighWaterMark()).ifPresent(mqttConfiguration::setHighWaterMark); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java index 0ac9299c..03bda94c 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java @@ -37,6 +37,8 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private ConnectModel connectModel; + private Integer notKickSecond; + private PasswordAuthentication reactivePasswordAuth = (u, p, c) -> true; private Integer bossThreadSize = Runtime.getRuntime().availableProcessors(); @@ -57,7 +59,7 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private BootstrapConfig.ClusterConfig clusterConfig; - private BootstrapConfig.MeterConfig meterConfig ; + private BootstrapConfig.MeterConfig meterConfig; private List ruleChainDefinitions; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 6d17c31a..6e6bd47a 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -72,8 +72,16 @@ public class ConnectProtocol implements Protocol { false).then(mqttChannel.close()); } } else { - Optional.ofNullable( channelRegistry.get(clientIdentifier)) - .ifPresent(ch->ch.close().subscribe()); + MqttChannel existMqttChannel = channelRegistry.get(clientIdentifier); + if (existMqttChannel != null) { + if (System.currentTimeMillis() - existMqttChannel.getConnectTime() > (mqttReceiveContext.getConfiguration().getNotKickSecond() * 1000)) { + existMqttChannel.close().subscribe(); + } else { + return mqttChannel.write( + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + false).then(mqttChannel.close()); + } + } } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() @@ -96,6 +104,7 @@ public class ConnectProtocol implements Protocol { .build()); } mqttChannel.setAuthTime(System.currentTimeMillis()); + mqttChannel.setConnectTime(System.currentTimeMillis()); mqttChannel.setKeepalive(mqttConnectVariableHeader.keepAliveTimeSeconds()); mqttChannel.setSessionPersistent(!mqttConnectVariableHeader.isCleanSession()); mqttChannel.setStatus(ChannelStatus.ONLINE); -- Gitee From c7587d18397418f7ea85e16020f3f472256bdf82 Mon Sep 17 00:00:00 2001 From: sunshihuan <13733918655@163.com> Date: Fri, 7 Jan 2022 19:17:50 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E8=A7=84=E5=88=99?= =?UTF-8?q?=E5=BC=95=E6=93=8Ejson=E8=BD=AC=E6=8D=A2=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/github/quickmsg/common/message/HeapMqttMessage.java | 1 + 1 file changed, 1 insertion(+) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index eb5d1a5c..19c340da 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -43,6 +43,7 @@ public class HeapMqttMessage { } private Object getJsonObject(String body) { + body=body.replaceAll("\r|\n|\t", ""); if (body.startsWith("{") && body.endsWith("}")) { return JacksonUtil.json2Bean(body, JsonMap.class); } else if (body.startsWith("[") && body.endsWith("]")) { -- Gitee From 8a375eba5c0d37ab75ea381d13e6088fcdd154ce Mon Sep 17 00:00:00 2001 From: Easy <1091927336@qq.com> Date: Sat, 8 Jan 2022 14:00:12 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E5=8F=8B=E9=93=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 187 +++++++++++++++++++++++++----------------------------- 1 file changed, 88 insertions(+), 99 deletions(-) diff --git a/README.md b/README.md index ab66c11d..63122d27 100644 --- a/README.md +++ b/README.md @@ -1,50 +1,55 @@ ## ![image](icon/logo.png) SMQTT开源的MQTT消息代理Broker -SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件! +SMQTT基于reactor-netty(spring-webflux底层依赖) +开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件! ## smqtt目前拥有的功能如下: -![架构图](icon/component.png) +![架构图](icon/component.png) 1. 消息质量等级实现(支持qos0,qos1,qos2) 2. topicFilter支持 - topic分级(test/test) - +支持(单层匹配) - - #支持(多层匹配) + - # 支持(多层匹配) 3. 会话消息 - 默认内存存储 - 支持持久化(redis/db) 4. 保留消息 - - 默认内存存储 - - 支持持久化(redis/db) + - 默认内存存储 + - 支持持久化(redis/db) 5. 遗嘱消息 - > 设备掉线时候触发 + + > 设备掉线时候触发 6. 客户端认证 - - 支持spi注入外部认证 + - 支持spi注入外部认证 7. tls加密 - - 支持tls加密(mqtt端口/http端口) + - 支持tls加密(mqtt端口/http端口) 8. websocket协议支持x - > 使用mqtt over websocket + + > 使用mqtt over websocket 9. http协议交互 - 支持http接口推送消息 - 支持spi扩展http接口 10. SPI接口扩展支持 - - 消息管理接口(会话消息/保留消息管理) - - 通道管理接口 (管理系统的客户端连接) - - 认证接口 (用于自定义外部认证) - - 拦截器 (用户自定义拦截消息) + - 消息管理接口(会话消息/保留消息管理) + - 通道管理接口 (管理系统的客户端连接) + - 认证接口 (用于自定义外部认证) + - 拦截器 (用户自定义拦截消息) 11. 集群支持(gossip协议实现) -12. 容器化支持 +12. 容器化支持 + > 默认镜像最新tag: 1ssqq1lxr/smqtt 13. 持久化支持(session 保留消息) 14. 规则引擎支持 15. 支持springboot starter启动 16. 管理后台 + > 请参考smqtt文档如何启动管理后台 17. grafana监控集成 - 支持influxdb - 支持prometheus - + ## 尝试一下 > 大家不要恶意链接,谢谢! @@ -60,6 +65,7 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor ### main方式启动 引入依赖 + ```markdown @@ -85,69 +91,55 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor ```markdown Bootstrap bootstrap = Bootstrap.builder() - .rootLevel(Level.DEBUG) - .tcpConfig( - BootstrapConfig - .TcpConfig - .builder() - .port(8888) - .username("smqtt") - .password("smqtt") - .build()) - .httpConfig( - BootstrapConfig - .HttpConfig - .builder() - .enable(true) - .accessLog(true) - .build()) - .clusterConfig( - BootstrapConfig. - ClusterConfig - .builder() - .enable(true) - .namespace("smqtt") - .node("node-1") - .port(7773) - .url("127.0.0.1:7771,127.0.0.1:7772"). - build()) - .build() - .startAwait(); +.rootLevel(Level.DEBUG) +.tcpConfig( +BootstrapConfig .TcpConfig .builder() +.port(8888) +.username("smqtt") +.password("smqtt") +.build()) +.httpConfig( +BootstrapConfig .HttpConfig .builder() +.enable(true) +.accessLog(true) +.build()) +.clusterConfig( +BootstrapConfig. ClusterConfig .builder() +.enable(true) +.namespace("smqtt") +.node("node-1") +.port(7773) +.url("127.0.0.1:7771,127.0.0.1:7772"). build()) +.build() +.startAwait(); ``` - 非阻塞式启动服务: ```markdown - Bootstrap bootstrap = Bootstrap.builder() - .rootLevel(Level.DEBUG) - .tcpConfig( - BootstrapConfig - .TcpConfig - .builder() - .port(8888) - .username("smqtt") - .password("smqtt") - .build()) - .httpConfig( - BootstrapConfig - .HttpConfig - .builder() - .enable(true) - .accessLog(true) - .build()) - .clusterConfig( - BootstrapConfig. - ClusterConfig - .builder() - .enable(true) - .namespace("smqtt") - .node("node-1") - .port(7773) - .url("127.0.0.1:7771,127.0.0.1:7772"). - build()) - .build() - .start().block(); +Bootstrap bootstrap = Bootstrap.builder() +.rootLevel(Level.DEBUG) +.tcpConfig( +BootstrapConfig .TcpConfig .builder() +.port(8888) +.username("smqtt") +.password("smqtt") +.build()) +.httpConfig( +BootstrapConfig .HttpConfig .builder() +.enable(true) +.accessLog(true) +.build()) +.clusterConfig( +BootstrapConfig. ClusterConfig .builder() +.enable(true) +.namespace("smqtt") +.node("node-1") +.port(7773) +.url("127.0.0.1:7771,127.0.0.1:7772"). build()) +.build() +.start().block(); ``` ### jar方式 @@ -168,11 +160,8 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor java -jar smqtt-bootstrap-1.0.1-SNAPSHOT.jar ``` - - ### docker 方式 - 拉取镜像 ``` @@ -189,18 +178,15 @@ docker run -it -p 1883:1883 1ssqq1lxr/smqtt 启动镜像使用自定义配置(同上准备配置文件config.yaml) - ``` # 启动服务 docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1ssqq1lxr/smqtt ``` - - ### springboot方式 1. 引入依赖 - + ```markdown io.github.quickmsg @@ -212,23 +198,25 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1 2. 启动类Application上添加注解 ` @EnableMqttServer` 3. 配置application.yml文件 - > properties也支持,但是需要自己转换,没有提供demo文件 + > properties也支持,但是需要自己转换,没有提供demo文件 [config.yaml](config/config.yaml) 4. 启动springboot服务服务即可 5. 如果引入的是spring-boot-starter-parent的管理包,如果启动报错,则需要添加以下依赖 + ```xml - - io.projectreactor - reactor-core - 3.4.9 - - - io.projectreactor.netty - reactor-netty - 1.0.10 - + + + io.projectreactor + reactor-core + 3.4.9 + + +io.projectreactor.netty +reactor-netty +1.0.10 + ``` ## 官网地址 @@ -239,43 +227,44 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1 [wiki地址](https://wiki.smqtt.cc/) - ## 管理后台 -![image](icon/admin.png) +![image](icon/admin.png) ## 监控页面 ### Mqtt监控 -![image](icon/application.png) +![image](icon/application.png) ### Jvm监控 + ![image](icon/jvm.png) ### Netty监控 -![image](icon/netty.png) - +![image](icon/netty.png) ## License [Apache License, Version 2.0](LICENSE) - ## 友情链接 -[一款非常好用的IOT平台:thinglinks](https://github.com/mqttsnet/thinglinks) +一款非常好用的IOT平台 thinglinks: + +- [Github](https://github.com/mqttsnet/thinglinks) +- [Gitee](https://gitee.com/mqttsnet/thinglinks) ## 相关技术文档 + - [reactor3](https://projectreactor.io/docs/core/release/reference/) - [reactor-netty](https://projectreactor.io/docs/netty/1.0.12/reference/index.html) ## 麻烦关注下公众号! -![image](icon/icon.jpg) +![image](icon/icon.jpg) - 添加微信号`Lemon877164954`,拉入smqtt官方交流群 - 加入qq群 `700152283` - -- Gitee From cee90a2863491ca5a3038f676bd75aef11e261f4 Mon Sep 17 00:00:00 2001 From: Easy <1091927336@qq.com> Date: Sun, 9 Jan 2022 02:11:20 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E8=A7=84=E5=88=99=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E6=94=AF=E6=8C=81topic=E9=80=9A=E9=85=8D=E7=AC=A6=E5=88=A4?= =?UTF-8?q?=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/github/quickmsg/common/utils/TopicRegexUtils.java | 5 +++++ .../src/main/java/io/github/quickmsg/rule/RuleExecute.java | 5 ++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java index 101d11e2..ff3f0ae9 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java @@ -5,6 +5,8 @@ package io.github.quickmsg.common.utils; */ public class TopicRegexUtils { + public static final TopicRegexUtils instance = new TopicRegexUtils(); + public static String regexTopic(String topic) { if (topic.startsWith("$")) { topic = "\\" + topic; @@ -15,5 +17,8 @@ public class TopicRegexUtils { .replaceAll("#", "(.+)") + "$"; } + public String regularTopic(String topic) { + return TopicRegexUtils.regexTopic(topic); + } } diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java index 982dffab..d3c8ec63 100644 --- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java +++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java @@ -1,5 +1,6 @@ package io.github.quickmsg.rule; +import io.github.quickmsg.common.utils.TopicRegexUtils; import org.apache.commons.jexl3.*; import reactor.util.context.ContextView; @@ -19,7 +20,8 @@ public interface RuleExecute { * 执行 * * @param context 上下文容器 -\ */ + * \ + */ void execute(ContextView context); @@ -34,6 +36,7 @@ public interface RuleExecute { JexlExpression e = J_EXL_ENGINE.createExpression(script); MapContext context = new MapContext(); mapContextConsumer.accept(context); + context.set("TopicRegexUtils", TopicRegexUtils.instance); return e.evaluate(context); } -- Gitee From 9599e797c2ea9a4b5a7f2ba34a5eac95f2ab28a7 Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 11 Jan 2022 14:42:09 +0800 Subject: [PATCH 6/8] ssl --- .../io/github/quickmsg/common/config/SslContext.java | 2 ++ .../github/quickmsg/core/ssl/AbstractSslHandler.java | 11 ++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java index d5106ea2..af55fc6e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java @@ -1,5 +1,6 @@ package io.github.quickmsg.common.config; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @@ -8,6 +9,7 @@ import lombok.NoArgsConstructor; */ @Data @NoArgsConstructor +@Builder public class SslContext { private String crt; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java index 28eee581..a571b99e 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java @@ -4,11 +4,11 @@ import io.github.quickmsg.common.config.Configuration; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.core.mqtt.MqttConfiguration; import io.netty.channel.ChannelOption; -import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import lombok.extern.slf4j.Slf4j; import reactor.netty.tcp.SslProvider; import reactor.netty.tcp.TcpServer; +import reactor.netty.tcp.TcpSslContextSpec; import java.io.File; import java.util.Map; @@ -25,7 +25,7 @@ public class AbstractSslHandler { File cert; File key; SslContext sslContext = configuration.getSslContext(); - if (sslContext != null) { + if (sslContext != null && sslContext.getCrt() != null && sslContext.getKey() != null) { cert = new File(sslContext.getCrt()); key = new File(sslContext.getKey()); @@ -33,9 +33,10 @@ public class AbstractSslHandler { SelfSignedCertificate ssc = new SelfSignedCertificate(); cert = ssc.certificate(); key = ssc.privateKey(); + log.error("SelfSignedCertificate cert {} key {}",cert.getAbsolutePath(),key.getAbsolutePath()); } - SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key); - sslContextSpec.sslContext(sslContextBuilder); + TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key); + sslContextSpec.sslContext(tcpSslContextSpec); } } catch (Exception e) { @@ -48,7 +49,7 @@ public class AbstractSslHandler { public TcpServer initTcpServer(MqttConfiguration mqttConfiguration) { TcpServer server = TcpServer.create(); if (mqttConfiguration.getSsl()) { - server.secure(sslContextSpec -> this.secure(sslContextSpec, mqttConfiguration)); + server = server.secure(sslContextSpec -> this.secure(sslContextSpec, mqttConfiguration)); } if (mqttConfiguration.getOptions() != null) { for (Map.Entry entry : mqttConfiguration.getOptions().entrySet()) { -- Gitee From e1b13716c36d7bd67ed5b9af76d96c9a753c5fd7 Mon Sep 17 00:00:00 2001 From: luxurong Date: Tue, 11 Jan 2022 14:44:03 +0800 Subject: [PATCH 7/8] ssl --- .../java/io/github/quickmsg/core/ssl/AbstractSslHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java index a571b99e..d647de2a 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java @@ -33,7 +33,7 @@ public class AbstractSslHandler { SelfSignedCertificate ssc = new SelfSignedCertificate(); cert = ssc.certificate(); key = ssc.privateKey(); - log.error("SelfSignedCertificate cert {} key {}",cert.getAbsolutePath(),key.getAbsolutePath()); + log.info("SelfSignedCertificate cert {} key {}",cert.getAbsolutePath(),key.getAbsolutePath()); } TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key); sslContextSpec.sslContext(tcpSslContextSpec); -- Gitee From fc344ffd0b22cc440f8a9dc425badb5629f01860 Mon Sep 17 00:00:00 2001 From: luxurong Date: Fri, 14 Jan 2022 22:05:15 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/github/quickmsg/common/ack/AbsAck.java | 2 +- .../core/mqtt/MqttReceiveContext.java | 1 + .../core/spi/DefaultProtocolAdaptor.java | 36 +++++++++---------- smqtt-core/src/main/resources/logback.xml | 20 +++++++++-- 4 files changed, 37 insertions(+), 22 deletions(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java index 63c5fa5a..8ce52990 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java @@ -9,7 +9,7 @@ import java.util.function.Consumer; /** * @author luxurong */ -@Slf4j +@Slf4j(topic = "ack") public abstract class AbsAck implements Ack { private final int maxRetrySize; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java index 8a6a318c..b8045200 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java @@ -33,6 +33,7 @@ public class MqttReceiveContext extends AbstractReceiveContext { log.error("on message error {}",o,throwable); }) + .filter(mqttMessage -> mqttMessage.decoderResult().isSuccess()) .subscribe(mqttMessage -> this.accept(mqttChannel, new SmqttMessage<>(mqttMessage,System.currentTimeMillis(),Boolean.FALSE))); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java index 69b7cddb..ff606b41 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java @@ -45,25 +45,23 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { @Override public void chooseProtocol(MqttChannel mqttChannel, SmqttMessage smqttMessage, ReceiveContext receiveContext) { MqttMessage mqttMessage = smqttMessage.getMessage(); - if (mqttMessage!=null && mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) { - log.info(" 【{}】【{}】 【{}】", - Thread.currentThread().getName(), - mqttMessage.fixedHeader().messageType(), - mqttChannel); - Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType())) - .ifPresent(protocol -> protocol - .doParseProtocol(smqttMessage, mqttChannel) - .contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext)) - .subscribeOn(scheduler) - .onErrorContinue(((throwable, o) -> {})) - .subscribe(aVoid -> { - }, error -> { - log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage()); - ReactorNetty.safeRelease(mqttMessage.payload()); - }, () -> ReactorNetty.safeRelease(mqttMessage.payload()))); - } else { - log.error("chooseProtocol {} error mqttMessage {} ", mqttChannel, mqttMessage); - } + log.info(" 【{}】【{}】 【{}】", + Thread.currentThread().getName(), + mqttMessage.fixedHeader().messageType(), + mqttChannel); + Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType())) + .ifPresent(protocol -> protocol + .doParseProtocol(smqttMessage, mqttChannel) + .contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext)) + .subscribeOn(scheduler) + .onErrorContinue(((throwable, o) -> { + + })) + .subscribe(aVoid -> { + }, error -> { + log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage()); + ReactorNetty.safeRelease(mqttMessage.payload()); + }, () -> ReactorNetty.safeRelease(mqttMessage.payload()))); } diff --git a/smqtt-core/src/main/resources/logback.xml b/smqtt-core/src/main/resources/logback.xml index fd8feafc..23ed1884 100644 --- a/smqtt-core/src/main/resources/logback.xml +++ b/smqtt-core/src/main/resources/logback.xml @@ -23,7 +23,7 @@ - + logs/smqtt.%d{yyyy-MM-dd}.log 30 @@ -33,6 +33,16 @@ + + + logs/ack.%d{yyyy-MM-dd}.log + 30 + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + @@ -47,8 +57,14 @@ - + + + + + + + \ No newline at end of file -- Gitee