diff --git a/README.md b/README.md index ab66c11de6b93fff8b21b9c257d3bef7d8e40aec..63122d2749e31fa6bbeb4d95650f820a7681fc4f 100644 --- a/README.md +++ b/README.md @@ -1,50 +1,55 @@ ## ![image](icon/logo.png) SMQTT开源的MQTT消息代理Broker -SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件! +SMQTT基于reactor-netty(spring-webflux底层依赖) +开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件! ## smqtt目前拥有的功能如下: -![架构图](icon/component.png) +![架构图](icon/component.png) 1. 消息质量等级实现(支持qos0,qos1,qos2) 2. topicFilter支持 - topic分级(test/test) - +支持(单层匹配) - - #支持(多层匹配) + - # 支持(多层匹配) 3. 会话消息 - 默认内存存储 - 支持持久化(redis/db) 4. 保留消息 - - 默认内存存储 - - 支持持久化(redis/db) + - 默认内存存储 + - 支持持久化(redis/db) 5. 遗嘱消息 - > 设备掉线时候触发 + + > 设备掉线时候触发 6. 客户端认证 - - 支持spi注入外部认证 + - 支持spi注入外部认证 7. tls加密 - - 支持tls加密(mqtt端口/http端口) + - 支持tls加密(mqtt端口/http端口) 8. websocket协议支持x - > 使用mqtt over websocket + + > 使用mqtt over websocket 9. http协议交互 - 支持http接口推送消息 - 支持spi扩展http接口 10. SPI接口扩展支持 - - 消息管理接口(会话消息/保留消息管理) - - 通道管理接口 (管理系统的客户端连接) - - 认证接口 (用于自定义外部认证) - - 拦截器 (用户自定义拦截消息) + - 消息管理接口(会话消息/保留消息管理) + - 通道管理接口 (管理系统的客户端连接) + - 认证接口 (用于自定义外部认证) + - 拦截器 (用户自定义拦截消息) 11. 集群支持(gossip协议实现) -12. 容器化支持 +12. 容器化支持 + > 默认镜像最新tag: 1ssqq1lxr/smqtt 13. 持久化支持(session 保留消息) 14. 规则引擎支持 15. 支持springboot starter启动 16. 管理后台 + > 请参考smqtt文档如何启动管理后台 17. grafana监控集成 - 支持influxdb - 支持prometheus - + ## 尝试一下 > 大家不要恶意链接,谢谢! @@ -60,6 +65,7 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor ### main方式启动 引入依赖 + ```markdown @@ -85,69 +91,55 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor ```markdown Bootstrap bootstrap = Bootstrap.builder() - .rootLevel(Level.DEBUG) - .tcpConfig( - BootstrapConfig - .TcpConfig - .builder() - .port(8888) - .username("smqtt") - .password("smqtt") - .build()) - .httpConfig( - BootstrapConfig - .HttpConfig - .builder() - .enable(true) - .accessLog(true) - .build()) - .clusterConfig( - BootstrapConfig. - ClusterConfig - .builder() - .enable(true) - .namespace("smqtt") - .node("node-1") - .port(7773) - .url("127.0.0.1:7771,127.0.0.1:7772"). - build()) - .build() - .startAwait(); +.rootLevel(Level.DEBUG) +.tcpConfig( +BootstrapConfig .TcpConfig .builder() +.port(8888) +.username("smqtt") +.password("smqtt") +.build()) +.httpConfig( +BootstrapConfig .HttpConfig .builder() +.enable(true) +.accessLog(true) +.build()) +.clusterConfig( +BootstrapConfig. ClusterConfig .builder() +.enable(true) +.namespace("smqtt") +.node("node-1") +.port(7773) +.url("127.0.0.1:7771,127.0.0.1:7772"). build()) +.build() +.startAwait(); ``` - 非阻塞式启动服务: ```markdown - Bootstrap bootstrap = Bootstrap.builder() - .rootLevel(Level.DEBUG) - .tcpConfig( - BootstrapConfig - .TcpConfig - .builder() - .port(8888) - .username("smqtt") - .password("smqtt") - .build()) - .httpConfig( - BootstrapConfig - .HttpConfig - .builder() - .enable(true) - .accessLog(true) - .build()) - .clusterConfig( - BootstrapConfig. - ClusterConfig - .builder() - .enable(true) - .namespace("smqtt") - .node("node-1") - .port(7773) - .url("127.0.0.1:7771,127.0.0.1:7772"). - build()) - .build() - .start().block(); +Bootstrap bootstrap = Bootstrap.builder() +.rootLevel(Level.DEBUG) +.tcpConfig( +BootstrapConfig .TcpConfig .builder() +.port(8888) +.username("smqtt") +.password("smqtt") +.build()) +.httpConfig( +BootstrapConfig .HttpConfig .builder() +.enable(true) +.accessLog(true) +.build()) +.clusterConfig( +BootstrapConfig. ClusterConfig .builder() +.enable(true) +.namespace("smqtt") +.node("node-1") +.port(7773) +.url("127.0.0.1:7771,127.0.0.1:7772"). build()) +.build() +.start().block(); ``` ### jar方式 @@ -168,11 +160,8 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor java -jar smqtt-bootstrap-1.0.1-SNAPSHOT.jar ``` - - ### docker 方式 - 拉取镜像 ``` @@ -189,18 +178,15 @@ docker run -it -p 1883:1883 1ssqq1lxr/smqtt 启动镜像使用自定义配置(同上准备配置文件config.yaml) - ``` # 启动服务 docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1ssqq1lxr/smqtt ``` - - ### springboot方式 1. 引入依赖 - + ```markdown io.github.quickmsg @@ -212,23 +198,25 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1 2. 启动类Application上添加注解 ` @EnableMqttServer` 3. 配置application.yml文件 - > properties也支持,但是需要自己转换,没有提供demo文件 + > properties也支持,但是需要自己转换,没有提供demo文件 [config.yaml](config/config.yaml) 4. 启动springboot服务服务即可 5. 如果引入的是spring-boot-starter-parent的管理包,如果启动报错,则需要添加以下依赖 + ```xml - - io.projectreactor - reactor-core - 3.4.9 - - - io.projectreactor.netty - reactor-netty - 1.0.10 - + + + io.projectreactor + reactor-core + 3.4.9 + + +io.projectreactor.netty +reactor-netty +1.0.10 + ``` ## 官网地址 @@ -239,43 +227,44 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1 [wiki地址](https://wiki.smqtt.cc/) - ## 管理后台 -![image](icon/admin.png) +![image](icon/admin.png) ## 监控页面 ### Mqtt监控 -![image](icon/application.png) +![image](icon/application.png) ### Jvm监控 + ![image](icon/jvm.png) ### Netty监控 -![image](icon/netty.png) - +![image](icon/netty.png) ## License [Apache License, Version 2.0](LICENSE) - ## 友情链接 -[一款非常好用的IOT平台:thinglinks](https://github.com/mqttsnet/thinglinks) +一款非常好用的IOT平台 thinglinks: + +- [Github](https://github.com/mqttsnet/thinglinks) +- [Gitee](https://gitee.com/mqttsnet/thinglinks) ## 相关技术文档 + - [reactor3](https://projectreactor.io/docs/core/release/reference/) - [reactor-netty](https://projectreactor.io/docs/netty/1.0.12/reference/index.html) ## 麻烦关注下公众号! -![image](icon/icon.jpg) +![image](icon/icon.jpg) - 添加微信号`Lemon877164954`,拉入smqtt官方交流群 - 加入qq群 `700152283` - diff --git a/config/config.yaml b/config/config.yaml index f05620668661b6369c0f18f6a7b3dc34ff5477f1..088e023b1f415aec8aeb71df639d24f88d2f95f7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,7 +1,8 @@ smqtt: logLevel: INFO # 系统日志 tcp: # tcp配置 - connectModel: UNIQUE # UNIQUE 唯一 KICK 互踢 + connectModel: KICK # UNIQUE 唯一 KICK 互踢 + notKickSecond: 30 # KICK互踢模式生效, 单位秒, 指定时间内客户端不互踢, 避免客户端自动连接持续互踢 port: 1883 # mqtt端口号 username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java index 63c5fa5a4898cb9656561a8da23e429c5c998061..8ce52990f410c33513ce3db9593b28ca7cf78e82 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java @@ -9,7 +9,7 @@ import java.util.function.Consumer; /** * @author luxurong */ -@Slf4j +@Slf4j(topic = "ack") public abstract class AbsAck implements Ack { private final int maxRetrySize; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index 3ac5af9b542de70f1e23f05abb59b3948af541ce..aace44e3c7fe2ff9abe4fc33590c326ee2bf477d 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -45,6 +45,8 @@ public class MqttChannel { private long authTime; + private long connectTime; + private boolean sessionPersistent; private Will will; @@ -273,7 +275,8 @@ public class MqttChannel { MqttMessage reply = getReplyMqttMessage(mqttMessage); Runnable runnable = () -> mqttChannel.write(Mono.just(reply)).subscribe(); - Runnable cleaner = () -> MessageUtils.safeRelease(reply);; + Runnable cleaner = () -> MessageUtils.safeRelease(reply); + ; Ack ack = new RetryAck(mqttChannel.generateId(reply.fixedHeader().messageType(), getMessageId(reply)), 5, 5, runnable, mqttChannel.getTimeAckManager(), cleaner); ack.start(); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java index 7bb7ccfe187b9fc9f1363700c90fbfa0d43e8a79..6776a4da13c2a270c997bd50798e6862b300247e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java @@ -32,7 +32,7 @@ public class BootstrapConfig { smqttConfig.setLogLevel("INFO"); bootstrapConfig.setSmqttConfig(smqttConfig); smqttConfig.setClusterConfig(ClusterConfig.builder() - .enable(false).build()); + .enable(false).build()); smqttConfig.setHttpConfig(HttpConfig.builder() .enable(false).build()); smqttConfig.setWebsocketConfig(WebsocketConfig.builder() @@ -114,6 +114,12 @@ public class BootstrapConfig { @Builder.Default private ConnectModel connectModel = ConnectModel.UNIQUE; + + /** + * 不互踢时间 默认60s + */ + @Builder.Default + private Integer notKickSecond = 60; /** * 端口 */ @@ -177,7 +183,7 @@ public class BootstrapConfig { /** * 单个连接读写字节限制 */ - private String channelReadWriteSize; + private String channelReadWriteSize; /** @@ -437,7 +443,6 @@ public class BootstrapConfig { /** * 指标配置 - */ @Data @Builder diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java index d5106ea25537c4b63d60e37f8b839e1b0138c107..af55fc6e823d951fe3a0cc73e3c318c4097a1b3c 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java @@ -1,5 +1,6 @@ package io.github.quickmsg.common.config; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @@ -8,6 +9,7 @@ import lombok.NoArgsConstructor; */ @Data @NoArgsConstructor +@Builder public class SslContext { private String crt; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index eb5d1a5ced4e7a48076f75e8d3581e5b512dbb33..19c340dac9389abc45642e94393119623571ad09 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -43,6 +43,7 @@ public class HeapMqttMessage { } private Object getJsonObject(String body) { + body=body.replaceAll("\r|\n|\t", ""); if (body.startsWith("{") && body.endsWith("}")) { return JacksonUtil.json2Bean(body, JsonMap.class); } else if (body.startsWith("[") && body.endsWith("]")) { diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java index 101d11e2e34ac8ce331b580ce9625f7b4c52e1aa..ff3f0ae900ed080f806dbfe0f3f3ff79437ce96a 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/TopicRegexUtils.java @@ -5,6 +5,8 @@ package io.github.quickmsg.common.utils; */ public class TopicRegexUtils { + public static final TopicRegexUtils instance = new TopicRegexUtils(); + public static String regexTopic(String topic) { if (topic.startsWith("$")) { topic = "\\" + topic; @@ -15,5 +17,8 @@ public class TopicRegexUtils { .replaceAll("#", "(.+)") + "$"; } + public String regularTopic(String topic) { + return TopicRegexUtils.regexTopic(topic); + } } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index 304178d8fe04715674288d46163d9c0fa60a7bbf..2ab1843fde8d307151c4c7001508d22c63f2d7d7 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -77,6 +77,7 @@ public class Bootstrap { } } Optional.ofNullable(tcpConfig.getConnectModel()).ifPresent(mqttConfiguration::setConnectModel); + Optional.ofNullable(tcpConfig.getNotKickSecond()).ifPresent(mqttConfiguration::setNotKickSecond); Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort); Optional.ofNullable(tcpConfig.getLowWaterMark()).ifPresent(mqttConfiguration::setLowWaterMark); Optional.ofNullable(tcpConfig.getHighWaterMark()).ifPresent(mqttConfiguration::setHighWaterMark); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java index f6f385179dbf3b890703f2ac852c79bd8f44982b..288e898e0ed8a0db2bbbafb88711cd2882db4864 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java @@ -98,7 +98,7 @@ public abstract class AbstractReceiveContext implements this.messageRegistry.startUp(abstractConfiguration.getEnvironmentMap()); this.metricManager = metricManager(abstractConfiguration.getMeterConfig()); Optional.ofNullable(abstractConfiguration.getSourceDefinitions()).ifPresent(sourceDefinitions -> sourceDefinitions.forEach(SourceManager::loadSource)); - this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,512); + this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,50); } private TrafficHandlerLoader trafficHandlerLoader() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java index 0ac9299cd9fe96d57d3f9445be00c83fc317b7f3..03bda94cb65e1bc261e3fb8cc9c0e1deaf84bf53 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java @@ -37,6 +37,8 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private ConnectModel connectModel; + private Integer notKickSecond; + private PasswordAuthentication reactivePasswordAuth = (u, p, c) -> true; private Integer bossThreadSize = Runtime.getRuntime().availableProcessors(); @@ -57,7 +59,7 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private BootstrapConfig.ClusterConfig clusterConfig; - private BootstrapConfig.MeterConfig meterConfig ; + private BootstrapConfig.MeterConfig meterConfig; private List ruleChainDefinitions; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java index 9b3f02711960d5e8af32cb1fd591079f2fe75af2..b8045200744c3d5f38c4e218f2358efab6d265d7 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiveContext.java @@ -30,7 +30,10 @@ public class MqttReceiveContext extends AbstractReceiveContext log.error("on connect error",throwable)) + .onErrorContinue((throwable, o) -> { + log.error("on message error {}",o,throwable); + }) + .filter(mqttMessage -> mqttMessage.decoderResult().isSuccess()) .subscribe(mqttMessage -> this.accept(mqttChannel, new SmqttMessage<>(mqttMessage,System.currentTimeMillis(),Boolean.FALSE))); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 6d17c31a495baa1cf0555bf66f4c09ef2faec291..6e6bd47a320e2d820938b5e5f7a4f306e5c15a18 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -72,8 +72,16 @@ public class ConnectProtocol implements Protocol { false).then(mqttChannel.close()); } } else { - Optional.ofNullable( channelRegistry.get(clientIdentifier)) - .ifPresent(ch->ch.close().subscribe()); + MqttChannel existMqttChannel = channelRegistry.get(clientIdentifier); + if (existMqttChannel != null) { + if (System.currentTimeMillis() - existMqttChannel.getConnectTime() > (mqttReceiveContext.getConfiguration().getNotKickSecond() * 1000)) { + existMqttChannel.close().subscribe(); + } else { + return mqttChannel.write( + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + false).then(mqttChannel.close()); + } + } } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() @@ -96,6 +104,7 @@ public class ConnectProtocol implements Protocol { .build()); } mqttChannel.setAuthTime(System.currentTimeMillis()); + mqttChannel.setConnectTime(System.currentTimeMillis()); mqttChannel.setKeepalive(mqttConnectVariableHeader.keepAliveTimeSeconds()); mqttChannel.setSessionPersistent(!mqttConnectVariableHeader.isCleanSession()); mqttChannel.setStatus(ChannelStatus.ONLINE); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java index 4007bacd823fe05b123456d3c8e4c2680278319e..ff606b417ddeb1f08e526424f8f6abef728a229e 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java @@ -45,24 +45,23 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { @Override public void chooseProtocol(MqttChannel mqttChannel, SmqttMessage smqttMessage, ReceiveContext receiveContext) { MqttMessage mqttMessage = smqttMessage.getMessage(); - if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) { - log.info(" 【{}】【{}】 【{}】", - Thread.currentThread().getName(), - mqttMessage.fixedHeader().messageType(), - mqttChannel); - Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType())) - .ifPresent(protocol -> protocol - .doParseProtocol(smqttMessage, mqttChannel) - .contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext)) - .subscribeOn(scheduler) - .subscribe(aVoid -> { - }, error -> { - log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage()); - ReactorNetty.safeRelease(mqttMessage.payload()); - }, () -> ReactorNetty.safeRelease(mqttMessage.payload()))); - } else { - log.error("chooseProtocol {} error mqttMessage {} ", mqttChannel, mqttMessage); - } + log.info(" 【{}】【{}】 【{}】", + Thread.currentThread().getName(), + mqttMessage.fixedHeader().messageType(), + mqttChannel); + Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType())) + .ifPresent(protocol -> protocol + .doParseProtocol(smqttMessage, mqttChannel) + .contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext)) + .subscribeOn(scheduler) + .onErrorContinue(((throwable, o) -> { + + })) + .subscribe(aVoid -> { + }, error -> { + log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage()); + ReactorNetty.safeRelease(mqttMessage.payload()); + }, () -> ReactorNetty.safeRelease(mqttMessage.payload()))); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java index 28eee581cf92aea1a83be9f87827be3e04731eb5..d647de2acfdcf691822a5355c0998d74b0bc5203 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java @@ -4,11 +4,11 @@ import io.github.quickmsg.common.config.Configuration; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.core.mqtt.MqttConfiguration; import io.netty.channel.ChannelOption; -import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import lombok.extern.slf4j.Slf4j; import reactor.netty.tcp.SslProvider; import reactor.netty.tcp.TcpServer; +import reactor.netty.tcp.TcpSslContextSpec; import java.io.File; import java.util.Map; @@ -25,7 +25,7 @@ public class AbstractSslHandler { File cert; File key; SslContext sslContext = configuration.getSslContext(); - if (sslContext != null) { + if (sslContext != null && sslContext.getCrt() != null && sslContext.getKey() != null) { cert = new File(sslContext.getCrt()); key = new File(sslContext.getKey()); @@ -33,9 +33,10 @@ public class AbstractSslHandler { SelfSignedCertificate ssc = new SelfSignedCertificate(); cert = ssc.certificate(); key = ssc.privateKey(); + log.info("SelfSignedCertificate cert {} key {}",cert.getAbsolutePath(),key.getAbsolutePath()); } - SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key); - sslContextSpec.sslContext(sslContextBuilder); + TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key); + sslContextSpec.sslContext(tcpSslContextSpec); } } catch (Exception e) { @@ -48,7 +49,7 @@ public class AbstractSslHandler { public TcpServer initTcpServer(MqttConfiguration mqttConfiguration) { TcpServer server = TcpServer.create(); if (mqttConfiguration.getSsl()) { - server.secure(sslContextSpec -> this.secure(sslContextSpec, mqttConfiguration)); + server = server.secure(sslContextSpec -> this.secure(sslContextSpec, mqttConfiguration)); } if (mqttConfiguration.getOptions() != null) { for (Map.Entry entry : mqttConfiguration.getOptions().entrySet()) { diff --git a/smqtt-core/src/main/resources/logback.xml b/smqtt-core/src/main/resources/logback.xml index fd8feafc91cc2823ab2e7b430b45f7cd2dce19b9..23ed1884e2eade6cbf8a502bcb237a92382d8444 100644 --- a/smqtt-core/src/main/resources/logback.xml +++ b/smqtt-core/src/main/resources/logback.xml @@ -23,7 +23,7 @@ - + logs/smqtt.%d{yyyy-MM-dd}.log 30 @@ -33,6 +33,16 @@ + + + logs/ack.%d{yyyy-MM-dd}.log + 30 + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + @@ -47,8 +57,14 @@ - + + + + + + + \ No newline at end of file diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java index 982dffab13123d1198235476e5c8bb72b26c20aa..d3c8ec63674e4cc68673512002be83d2ab5dc620 100644 --- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java +++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/RuleExecute.java @@ -1,5 +1,6 @@ package io.github.quickmsg.rule; +import io.github.quickmsg.common.utils.TopicRegexUtils; import org.apache.commons.jexl3.*; import reactor.util.context.ContextView; @@ -19,7 +20,8 @@ public interface RuleExecute { * 执行 * * @param context 上下文容器 -\ */ + * \ + */ void execute(ContextView context); @@ -34,6 +36,7 @@ public interface RuleExecute { JexlExpression e = J_EXL_ENGINE.createExpression(script); MapContext context = new MapContext(); mapContextConsumer.accept(context); + context.set("TopicRegexUtils", TopicRegexUtils.instance); return e.evaluate(context); }