diff --git a/pom.xml b/pom.xml
index 5b80a9da33fe93040ea1c50ab5e61c6732074614..9e1b21e74bc6118b04fc0807ab148ec08dc1ee36 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,6 +8,7 @@
1.1.2
smqtt-common
+ smqtt-icc-common
smqtt-core
smqtt-bootstrap
smqtt-registry
diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml
index 1f67f6d19a35971acb604dc67d441429e65b3469..dddb998cd53713e5caade0066e83ac9340ecdc64 100644
--- a/smqtt-bootstrap/pom.xml
+++ b/smqtt-bootstrap/pom.xml
@@ -57,6 +57,21 @@
io.github.quickmsg
1.1.2
+
+ org.springframework.boot
+ spring-boot
+ 2.2.1.RELEASE
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ 2.2.1.RELEASE
+
+
+ io.github.quickmsg
+ smqtt-spring-boot-starter
+ 1.1.2
+
diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java
index a41fc41d5b650df8ceebad70243c64562245edf9..a26dffdedd08dc9744eae506a30266df3f8d1068 100644
--- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java
+++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java
@@ -53,6 +53,7 @@ public abstract class AbstractStarter {
.httpConfig(config.getSmqttConfig().getHttpConfig())
.websocketConfig(config.getSmqttConfig().getWebsocketConfig())
.clusterConfig(config.getSmqttConfig().getClusterConfig())
+ .persistenceType(config.getPersistenceType())
.redisConfig(config.getSmqttConfig().getRedisConfig())
.databaseConfig(config.getSmqttConfig().getDatabaseConfig())
.meterConfig(config.getSmqttConfig().getMeterConfig())
diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java
new file mode 100644
index 0000000000000000000000000000000000000000..4bb5e3e1c12b9e3b0650351aa778f6362768f22f
--- /dev/null
+++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java
@@ -0,0 +1,20 @@
+package io.github.quickmsg.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+import io.github.quickmsg.starter.EnableMqttServer;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+@SpringBootApplication
+@ComponentScan(basePackages = {"io.github.quickmsg"})
+@EnableMqttServer
+public class SpringBootStarter {
+ public static void main(String[] args) {
+ SpringApplication.run(SpringBootStarter.class, args);
+ }
+}
diff --git a/smqtt-bootstrap/src/main/resources/META-INF/services/io.github.quickmsg.common.auth.PasswordAuthentication b/smqtt-bootstrap/src/main/resources/META-INF/services/io.github.quickmsg.common.auth.PasswordAuthentication
new file mode 100644
index 0000000000000000000000000000000000000000..2fa0b9157a1a96af1144da9bd6c46fbf0838cebd
--- /dev/null
+++ b/smqtt-bootstrap/src/main/resources/META-INF/services/io.github.quickmsg.common.auth.PasswordAuthentication
@@ -0,0 +1 @@
+io.github.quickmsg.common.auth.PasswordAuthenticationImpl
\ No newline at end of file
diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml
new file mode 100644
index 0000000000000000000000000000000000000000..c45a6af9db2660e2b01e0182100ede2c98bb5be7
--- /dev/null
+++ b/smqtt-bootstrap/src/main/resources/application.yml
@@ -0,0 +1,141 @@
+ smqtt:
+ logLevel: INFO # 系统日志
+ tcp: # tcp配置
+ port: 1883 # mqtt端口号
+ username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口
+ password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口
+ wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG
+ bossThreadSize: 4 # boss线程
+ workThreadSize: 8 # work线程
+ lowWaterMark: 4000000 # 不建议配置 默认 32768
+ highWaterMark: 80000000 # 不建议配置 默认 65536
+ businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10
+ businessQueueSize: 100000 #业务队列 默认=100000
+ # globalReadWriteSize: 10000000,100000000 全局读写大小限制
+ # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制
+ ssl: # ssl配置
+ enable: false # 开关
+ key: /user/server.key # 指定ssl文件 默认系统生成
+ crt: /user/server.crt # 指定ssl文件 默认系统生成
+ whiteIp: 127.0.0.1
+
+ http: # http相关配置 端口固定60000
+ enable: true # 开关
+ accessLog: false # http访问日志
+ ssl: # ssl配置
+ enable: false
+ admin: # 后台管理配置
+ enable: true # 开关
+ username: smqtt # 访问用户名
+ password: smqtt # 访问密码
+ ws: # websocket配置
+ enable: false # 开关
+ port: 8999 # 端口
+ path: /mqtt # ws 的访问path mqtt.js请设置此选项
+ cluster: # 集群配置
+ enable: false # 集群开关
+ url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点
+ port: 7771 # 端口
+ node: node-1 # 集群节点名称 唯一
+ external:
+ host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项
+ port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项
+ redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】
+ mode: sentinel
+ database: 1
+ password: xuanwu-T3st*17
+ timeout: 3000
+ poolMinIdle: 8
+ poolConnTimeout: 3000
+ poolSize: 10
+ single:
+ address: 172.16.1.16:6392
+ cluster:
+ scanInterval: 1000
+ nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392
+ readMode: SLAVE
+ retryAttempts: 3
+ slaveConnectionPoolSize: 64
+ masterConnectionPoolSize: 64
+ retryInterval: 1500
+ sentinel:
+ master: mq_master
+ nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373
+ logLevel: INFO # 系统日志
+ tcp: # tcp配置
+ port: 1883 # mqtt端口号
+ username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口
+ password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口
+ wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG
+ bossThreadSize: 4 # boss线程
+ workThreadSize: 8 # work线程
+ lowWaterMark: 4000000 # 不建议配置 默认 32768
+ highWaterMark: 80000000 # 不建议配置 默认 65536
+ businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10
+ businessQueueSize: 100000 #业务队列 默认=100000
+ # globalReadWriteSize: 10000000,100000000 全局读写大小限制
+ # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制
+ ssl: # ssl配置
+ enable: false # 开关
+ key: /user/server.key # 指定ssl文件 默认系统生成
+ crt: /user/server.crt # 指定ssl文件 默认系统生成
+ http: # http相关配置 端口固定60000
+ enable: true # 开关
+ accessLog: false # http访问日志
+ ssl: # ssl配置
+ enable: false
+ admin: # 后台管理配置
+ enable: true # 开关
+ username: smqtt # 访问用户名
+ password: smqtt # 访问密码
+ ws: # websocket配置
+ enable: false # 开关
+ port: 8999 # 端口
+ path: /mqtt # ws 的访问path mqtt.js请设置此选项
+ cluster: # 集群配置
+ enable: false # 集群开关
+ url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点
+ port: 7771 # 端口
+ node: node-1 # 集群节点名称 唯一
+ external:
+ host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项
+ port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项
+ meter:
+ meterType: PROMETHEUS # INFLUXDB , PROMETHEUS
+
+ persistenceType: 1 # 持久化方式,0:内存(默认),1:Redis,2:DB
+ db: # 参数值配置参考https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration
+ jdbcUrl: jdbc:mysql://172.16.1.77:3306/smqtt
+ username: root
+ password: jx@xw!@#$~|{}
+ dataSourceCachePrepStmts: false
+ dataSourcePrepStmtCacheSize: 250
+ dataSourcePrepStmtCacheSqlLimit: 2048
+ dataSourceUseServerPrepStmts: true
+ dataSourceUseLocalSessionState: true
+ dataSourceRewriteBatchedStatements: true
+ dataSourceCacheResultSetMetadata: true
+ dataSourceCacheServerConfiguration: true
+ dataSourceElideSetAutoCommits: true
+ dataSourceMaintainTimeStats: false
+ redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】
+ mode: sentinel
+ database: 7
+ password: xuanwu-T3st*17
+ timeout: 3000
+ poolMinIdle: 8
+ poolConnTimeout: 3000
+ poolSize: 10
+ single:
+ address: 172.16.1.16:6392
+ cluster:
+ scanInterval: 1000
+ nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392
+ readMode: SLAVE
+ retryAttempts: 3
+ slaveConnectionPoolSize: 64
+ masterConnectionPoolSize: 64
+ retryInterval: 1500
+ sentinel:
+ master: mq_master
+ nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373
diff --git a/smqtt-bootstrap/src/main/resources/test.yaml b/smqtt-bootstrap/src/main/resources/test.yaml
deleted file mode 100644
index e4203080c7a10e0811258fd56cb1cf8b0f78465e..0000000000000000000000000000000000000000
--- a/smqtt-bootstrap/src/main/resources/test.yaml
+++ /dev/null
@@ -1,3 +0,0 @@
-smqtt:
- tcp:
- port: 7000
\ No newline at end of file
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/auth/PasswordAuthentication.java b/smqtt-common/src/main/java/io/github/quickmsg/common/auth/PasswordAuthentication.java
index fcd5fc31d55a86bbe6747690a442e9adc265b324..521140a727a0c6689fec946969b537f39e7d6faf 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/auth/PasswordAuthentication.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/auth/PasswordAuthentication.java
@@ -1,7 +1,11 @@
package io.github.quickmsg.common.auth;
import io.github.quickmsg.common.StartUp;
+import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.message.MessageRegistry;
import io.github.quickmsg.common.spi.DynamicLoader;
+import io.netty.handler.codec.mqtt.MqttConnectPayload;
+import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
/**
* @author luxurong
@@ -18,6 +22,6 @@ public interface PasswordAuthentication extends StartUp {
* @param clientIdentifier 设备标志
* @return 布尔
*/
- boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier);
+ boolean auth(MessageRegistry messageRegistry, MqttConnectVariableHeader mqttConnectVariableHeader, MqttConnectPayload mqttConnectPayload, MqttChannel mqttChannel, String whiteIp);
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
index ad5a890440d307136f1e9d9571441c9df4f33736..bd7da1b0cf77f7b9c6426c927b41ca9b3459ae3f 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
@@ -14,6 +14,9 @@ public class BootstrapKey {
/*redis前缀key*/
public static final String REDIS_SESSION_MESSAGE_PREFIX_KEY = "smqtt:session:message:";
+ /*redis前缀key 离线消息*/
+ public static final String REDIS_OFFLINE_MESSAGE_PREFIX_KEY = "smqtt:offline:message:";
+
/*模式*/
public static final String REDIS_MODE = "redis.mode";
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
index 6776a4da13c2a270c997bd50798e6862b300247e..64faa9b7eae6262036c4df1e2a6bb03b8356c7db 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
@@ -200,6 +200,11 @@ public class BootstrapConfig {
* PasswordAuthentication
*/
PasswordAuthentication authentication;
+
+ /**
+ * 白名单,如ipush-bridge服务IP,多个使用逗号分割
+ */
+ private String whiteIp;
}
@@ -279,6 +284,11 @@ public class BootstrapConfig {
private ClusterExternal external;
}
+ /**
+ * 持久化方式,0:内存(默认),1:Redis,2:DB
+ */
+ private Integer persistenceType;
+
@Data
@Builder
@NoArgsConstructor
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
index ca994d2b793e07445cb35b8d9d21f7d34250ec6e..de77bc9affc551ac4748f0fe4fc369732b2daea0 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
@@ -122,8 +122,12 @@ public interface Configuration {
*/
BootstrapConfig.MeterConfig getMeterConfig();
-
-
+ /**
+ * 获取持久化方式
+ *
+ * @return
+ */
+ Integer getPersistenceType();
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java b/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java
index 94229d0f4f5829bc28bf8cbec48aad2ad5fe27dc..1bfb6593edaba4e78af2f5fbf996e7ea99efbe4f 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/enums/Event.java
@@ -64,6 +64,29 @@ public enum Event {
mqttChannel.getUsername(),
ChannelStatus.OFFLINE)).getBytes());
}
+ },
+ /**
+ * 心跳事件
+ */
+ HEARTBEAT {
+ private static final String HEARTBEAT_TOPIC = "$event/heartbeat";
+
+ @Override
+ public void sender(MqttChannel mqttChannel, Object body, ReceiveContext> receiveContext) {
+ MqttPublishMessage mqttPublishMessage =
+ MqttMessageBuilder.buildPub(false, MqttQoS.AT_MOST_ONCE, 0, HEARTBEAT_TOPIC, writeBody(mqttChannel, body));
+ write(receiveContext, mqttChannel, mqttPublishMessage);
+ }
+
+ @Override
+ public ByteBuf writeBody(MqttChannel mqttChannel, Object body) {
+ return PooledByteBufAllocator.DEFAULT
+ .directBuffer().writeBytes(JacksonUtil.bean2Json(new ChannelStatusMessage(
+ mqttChannel.getClientIdentifier(),
+ System.currentTimeMillis(),
+ mqttChannel.getUsername(),
+ ChannelStatus.ONLINE)).getBytes());
+ }
};
/**
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
index cb6d17b619f128b6376236f54806ea4c049095b9..e34f7c8ab2697a988d3869a33468ae79f114b8da 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
@@ -78,6 +78,7 @@ public class MessageProxy {
.topic(header.topicName())
.retain(fixedHeader.isRetain())
.qos(fixedHeader.qosLevel().value())
+ .properties(header.properties())
.build();
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
index 19c340dac9389abc45642e94393119623571ad09..2ab22342784775c9b8d3cc01102251df3380543e 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
@@ -1,6 +1,7 @@
package io.github.quickmsg.common.message;
import io.github.quickmsg.common.utils.JacksonUtil;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.util.internal.StringUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -31,6 +32,8 @@ public class HeapMqttMessage {
private byte[] message;
+ private MqttProperties properties;
+
public Map getKeyMap() {
Map keys = new HashMap<>(5);
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
index c21887dba432d7aa5498b958707ec25a005e632f..5131a84aaeba17ec9ceffd2c9517bbaa4bfdabba 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
@@ -4,6 +4,7 @@ import io.github.quickmsg.common.StartUp;
import io.github.quickmsg.common.spi.DynamicLoader;
import java.util.List;
+import java.util.function.Supplier;
/**
* @author luxurong
@@ -47,4 +48,28 @@ public interface MessageRegistry extends StartUp {
List getRetainMessage(String topic);
+ /**
+ * 持久化离线消息
+ *
+ * @param message
+ */
+ void saveOfflineMessage(OfflineMessage message);
+
+ /**
+ * 获取离线消息
+ *
+ * @param topic
+ * @return
+ */
+ List getOfflineMessage(String topic);
+
+
+ /**
+ * 从缓存获取对象信息
+ *
+ * @param topic
+ * @return
+ */
+ T getObjCache(Supplier cacheKeyFun, Class clazz);
+ List getOfflineMessage(String topic);
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
index d6a16539cc6c0dfaa5f0e63141210ed658e9ea7f..5346a17693eb9bbb3f86e9ab851216c3a343735c 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
@@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.*;
import java.util.List;
+import java.util.Map;
+
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
/**
@@ -11,6 +18,32 @@ import java.util.List;
*/
public class MqttMessageBuilder {
+ private static MqttProperties genMqttProperties(Map userPropertiesMap) {
+ MqttProperties mqttProperties = null;
+ if (userPropertiesMap != null) {
+ mqttProperties = new MqttProperties();
+ MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties();
+ for (Map.Entry entry : userPropertiesMap.entrySet()) {
+ userProperties.add(entry.getKey(), entry.getValue());
+ }
+ mqttProperties.add(userProperties);
+ }
+ return mqttProperties;
+ }
+
+ public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, MqttProperties properties) {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties);
+ MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message);
+ return mqttPublishMessage;
+ }
+
+ public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, Map userPropertiesMap) {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, genMqttProperties(userPropertiesMap));
+ MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message);
+ return mqttPublishMessage;
+ }
public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
@@ -69,8 +102,33 @@ public class MqttMessageBuilder {
return new MqttUnsubAckMessage(mqttFixedHeader, variableHeader);
}
- public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode) {
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false);
+ public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, int version) {
+ MqttProperties properties = MqttProperties.NO_PROPERTIES;
+ if (MqttVersion.MQTT_5.protocolLevel() == (byte) version) {
+ properties = new MqttProperties();
+ properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RETAIN_AVAILABLE.value(), 1));
+ // don't support shared subscription
+ properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE.value(), 0));
+ switch (connectReturnCode) {
+ case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
+ connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
+ break;
+ case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
+ connectReturnCode = CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
+ break;
+ case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
+ connectReturnCode = CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
+ break;
+ case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
+ connectReturnCode = CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
+ break;
+ case CONNECTION_REFUSED_NOT_AUTHORIZED:
+ connectReturnCode = CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+ break;
+
+ }
+ }
+ MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false, properties);
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..9771c5657bf2410287f92d32279b32d1fcb3f9f1
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java
@@ -0,0 +1,62 @@
+package io.github.quickmsg.common.message;
+
+import java.util.HashMap;
+import java.util.Optional;
+
+import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
+import io.github.quickmsg.common.utils.MessageUtils;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+@Data
+@Builder
+public class OfflineMessage {
+ private int qos;
+
+ private String topic;
+
+ private byte[] body;
+
+ private String userProperties;
+
+ public static OfflineMessage of(MqttPublishMessage mqttPublishMessage) {
+ MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
+ return OfflineMessage.builder()
+ .topic(publishVariableHeader.topicName())
+ .qos(mqttPublishMessage.fixedHeader().qosLevel().value())
+ .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>(list.size());
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).orElseGet(HashMap::new)))
+ .build();
+ }
+
+ public MqttPublishMessage toPublishMessage(MqttChannel mqttChannel) {
+ return MqttMessageBuilder.buildPub(
+ false,
+ MqttQoS.valueOf(this.qos),
+ qos > 0 ? mqttChannel.generateMessageId() : 0,
+ topic,
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
+ }
+
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
index fbc623529cdcbc7e0f9fb110feb2f8899ebc30e4..2d67c21e50281155a26fec9f839a6bfa3c8929a7 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
@@ -1,8 +1,13 @@
package io.github.quickmsg.common.message;
+import java.util.HashMap;
+import java.util.Optional;
+
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
@@ -22,12 +27,25 @@ public class RetainMessage {
private byte[] body;
+ private String userProperties;
+
public static RetainMessage of(MqttPublishMessage mqttPublishMessage) {
MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
return RetainMessage.builder()
.topic(publishVariableHeader.topicName())
.qos(mqttPublishMessage.fixedHeader().qosLevel().value())
.body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>(list.size());
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).orElseGet(HashMap::new)))
.build();
}
@@ -37,7 +55,8 @@ public class RetainMessage {
MqttQoS.valueOf(this.qos),
qos > 0 ? mqttChannel.generateMessageId() : 0,
topic,
- PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body));
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
}
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
index a3596af3aa8e4686cf1786d39598480c42efc2d8..f5aecea5ff339c029a868517f2e1d7fb8c243de8 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
@@ -1,8 +1,13 @@
package io.github.quickmsg.common.message;
+import java.util.HashMap;
+import java.util.Optional;
+
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
@@ -27,6 +32,8 @@ public class SessionMessage {
private boolean retain;
+ private String userProperties;
+
public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) {
MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
return SessionMessage.builder()
@@ -35,6 +42,17 @@ public class SessionMessage {
.qos(mqttPublishMessage.fixedHeader().qosLevel().value())
.retain(mqttPublishMessage.fixedHeader().isRetain())
.body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>(list.size());
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).orElseGet(HashMap::new)))
.build();
}
@@ -44,7 +62,8 @@ public class SessionMessage {
MqttQoS.valueOf(this.qos),
qos > 0 ? mqttChannel.generateMessageId() : 0,
topic,
- PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body));
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
}
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
index fa197573b91107d755751afe9c9cf6c6e2877aa1..1b949a1639f554c09cee0e025b6eee41690182fa 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
@@ -67,7 +67,8 @@ public class MessageUtils {
MqttPublishVariableHeader mqttPublishVariableHeader = message.variableHeader();
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength());
- MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId);
+ // mqtt 5 support properties
+ MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties());
return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy());
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/vo/IpushApplication.java b/smqtt-common/src/main/java/io/github/quickmsg/common/vo/IpushApplication.java
new file mode 100644
index 0000000000000000000000000000000000000000..460907fbf9dbb049441215f1ba1de631e7b133b5
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/vo/IpushApplication.java
@@ -0,0 +1,121 @@
+package io.github.quickmsg.common.vo;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+/**
+ * 推送平台的应用信息
+ * @author huangzhuojie
+ * @date 2021/05/07
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IpushApplication implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 主键
+ */
+ private Long id;
+
+ /**
+ * 应用的appKey
+ */
+ private String appKey;
+
+ /**
+ * 应用的appSecret
+ */
+ private String appSecret;
+
+ /**
+ * 应用的应用名称
+ */
+ private String appName;
+
+ /**
+ * 创建时间
+ */
+ private Date createTime;
+
+ /**
+ * 记录更新时间
+ */
+ private Date updateTime;
+
+ /**
+ * 可发生量加密值
+ */
+ private long canSendCount;
+
+ /**
+ * 日活秘钥
+ */
+ private String licenseKey;
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public String getAppKey() {
+ return this.appKey;
+ }
+
+ public void setAppKey(String appKey) {
+ this.appKey = appKey;
+ }
+
+ public String getAppSecret() {
+ return this.appSecret;
+ }
+
+ public void setAppSecret(String appSecret) {
+ this.appSecret = appSecret;
+ }
+
+ public String getAppName() {
+ return this.appName;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
+ }
+
+ public Date getCreateTime() {
+ return this.createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Date getUpdateTime() {
+ return this.updateTime;
+ }
+
+ public void setUpdateTime(Date updateTime) {
+ this.updateTime = updateTime;
+ }
+
+ public long getCanSendCount() {
+ return canSendCount;
+ }
+
+ public void setCanSendCount(long canSendCount) {
+ this.canSendCount = canSendCount;
+ }
+
+ public String getLicenseKey() {
+ return licenseKey;
+ }
+
+ public void setLicenseKey(String licenseKey) {
+ this.licenseKey = licenseKey;
+ }
+}
diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml
index dbb26bfb59fcfad27bf26045f1afee2353a59415..c303255fb650e87603bd2e6294be1d2a8fe7b617 100644
--- a/smqtt-core/pom.xml
+++ b/smqtt-core/pom.xml
@@ -13,7 +13,7 @@
io.github.quickmsg
- smqtt-common
+ smqtt-icc-common
1.1.2
@@ -31,6 +31,16 @@
smqtt-metric-prometheus
1.1.2
+
+ io.github.quickmsg
+ smqtt-persistent-redis
+ 1.1.2
+
+
+ io.github.quickmsg
+ smqtt-persistent-db
+ 1.1.2
+
\ No newline at end of file
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
index 2ab1843fde8d307151c4c7001508d22c63f2d7d7..914bb94ef05ac82e84617770be3efe883333af61 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
@@ -43,6 +43,8 @@ public class Bootstrap {
private BootstrapConfig.ClusterConfig clusterConfig;
+ private Integer persistenceType;
+
private BootstrapConfig.RedisConfig redisConfig;
private BootstrapConfig.DatabaseConfig databaseConfig;
@@ -67,15 +69,16 @@ public class Bootstrap {
private MqttConfiguration initMqttConfiguration() {
MqttConfiguration mqttConfiguration = defaultConfiguration();
- if (tcpConfig.getAuthentication() != null) {
- mqttConfiguration.setReactivePasswordAuth(tcpConfig.getAuthentication());
- } else {
- if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) {
- mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword()));
- } else {
- mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> true);
- }
- }
+// if (tcpConfig.getAuthentication() != null) {
+// mqttConfiguration.setReactivePasswordAuth(tcpConfig.getAuthentication());
+// } else {
+// if (tcpConfig.getUsername() != null || tcpConfig.getPassword() != null) {
+// mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> user.equals(tcpConfig.getUsername()) && new String(pwd).equals(tcpConfig.getPassword()));
+// } else {
+// mqttConfiguration.setReactivePasswordAuth((user, pwd, id) -> true);
+// }
+// }
+
Optional.ofNullable(tcpConfig.getConnectModel()).ifPresent(mqttConfiguration::setConnectModel);
Optional.ofNullable(tcpConfig.getNotKickSecond()).ifPresent(mqttConfiguration::setNotKickSecond);
Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort);
@@ -92,8 +95,10 @@ public class Bootstrap {
Optional.ofNullable(tcpConfig.getSsl()).ifPresent(mqttConfiguration::setSslContext);
Optional.ofNullable(tcpConfig.getSsl()).ifPresent(mqttConfiguration::setSslContext);
Optional.ofNullable(tcpConfig.getMessageMaxSize()).ifPresent(mqttConfiguration::setMessageMaxSize);
+ Optional.ofNullable(tcpConfig.getWhiteIp()).ifPresent(mqttConfiguration::setWhiteIp);
Optional.ofNullable(clusterConfig).ifPresent(mqttConfiguration::setClusterConfig);
Optional.ofNullable(meterConfig).ifPresent(mqttConfiguration::setMeterConfig);
+ Optional.ofNullable(persistenceType).ifPresent(mqttConfiguration::setPersistenceType);
if (websocketConfig != null && websocketConfig.isEnable()) {
mqttConfiguration.setWebSocketPort(websocketConfig.getPort());
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
index d751f33ba3eb3b42e7f01106fdb31dc4cc4e4ec3..62d5f3a39dbf0be92f69cd5092cd3c3c2211d79f 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
@@ -56,7 +56,7 @@ public class ClusterReceiver {
MqttQoS.valueOf(heapMqttMessage.getQos()),
0,
heapMqttMessage.getTopic(),
- PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())), System.currentTimeMillis(), Boolean.TRUE);
+ PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE);
}
}
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
index 12fc2b003a703d0bb4a7593d0ae6078fcd258284..8c9a6161e1a7c903a6626e090e5f86732b039540 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
@@ -37,6 +37,8 @@ public class HttpConfiguration implements Configuration {
private BootstrapConfig.MeterConfig meterConfig;
+ private Integer persistenceType = 0;
+
@Override
public ConnectModel getConnectModel() {
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
index 288e898e0ed8a0db2bbbafb88711cd2882db4864..2b78f347bcd47d769ee63d4eb2a3707d293710be 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
@@ -30,6 +30,8 @@ import io.github.quickmsg.core.spi.DefaultTopicRegistry;
import io.github.quickmsg.dsl.RuleDslParser;
import io.github.quickmsg.metric.InfluxDbMetricFactory;
import io.github.quickmsg.metric.PrometheusMetricFactory;
+import io.github.quickmsg.persistent.registry.DbMessageRegistry;
+import io.github.quickmsg.persistent.registry.RedisMessageRegistry;
import io.github.quickmsg.rule.source.SourceManager;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
@@ -91,7 +93,7 @@ public abstract class AbstractReceiveContext implements
this.topicRegistry = topicRegistry();
this.loopResources = LoopResources.create("smqtt-cluster-io", configuration.getBossThreadSize(), configuration.getWorkThreadSize(), true);
this.trafficHandlerLoader = trafficHandlerLoader();
- this.messageRegistry = messageRegistry();
+ this.messageRegistry = messageRegistry(abstractConfiguration.getPersistenceType());
this.clusterRegistry = clusterRegistry();
this.passwordAuthentication = basicAuthentication();
this.channelRegistry.startUp(abstractConfiguration.getEnvironmentMap());
@@ -122,8 +124,15 @@ public abstract class AbstractReceiveContext implements
return Event::sender;
}
- private MessageRegistry messageRegistry() {
- return Optional.ofNullable(MessageRegistry.INSTANCE).orElseGet(DefaultMessageRegistry::new);
+ private MessageRegistry messageRegistry(int persistenceType) {
+ switch (persistenceType) {
+ case 1:
+ return new RedisMessageRegistry();
+ case 2:
+ return new DbMessageRegistry();
+ default:
+ return new DefaultMessageRegistry();
+ }
}
private PasswordAuthentication basicAuthentication() {
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
index 03bda94cb65e1bc261e3fb8cc9c0e1deaf84bf53..8e87671a3ab90b9158826396e8c378adc6285f3c 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
@@ -39,7 +39,7 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon
private Integer notKickSecond;
- private PasswordAuthentication reactivePasswordAuth = (u, p, c) -> true;
+ private PasswordAuthentication reactivePasswordAuth = (u, p, c, f, g) -> true;
private Integer bossThreadSize = Runtime.getRuntime().availableProcessors();
@@ -68,5 +68,9 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon
private Map