diff --git a/pom.xml b/pom.xml
index 77b4fa0773ed71955829e6f290df8e892f025eb8..42658ff77e178794bc38f7ee9af8b8f10942df10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,12 +4,12 @@
org.smartboot.mqtt
smart-mqtt
smart-mqtt
- 0.11
+ 0.12
4.0.0
mqtt broker
- 0.11
+ 0.12
1.5.23
2.6
4.3
diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml
index e2279b50baa89822fef36c78f029a2ff495f0e45..8373da63e6eb8dce2dd5be2164edaaedffd9e8d2 100644
--- a/smart-mqtt-broker/pom.xml
+++ b/smart-mqtt-broker/pom.xml
@@ -5,7 +5,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.11
+ 0.12
../pom.xml
4.0.0
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
index 8d68ca2ccf6702ed38bc16c01a76238d3077dbb2..19adc89d6634ea0da2880fd2ce60b92ca5830bbf 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
@@ -26,7 +26,7 @@ public class BrokerConfigure {
/**
* 当前smart-mqtt
*/
- public static final String VERSION = "v0.11";
+ public static final String VERSION = "v0.12";
static final Map SystemEnvironments = new HashMap<>();
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
index 338327e78f408110d192ecc598c091e5d9eea265..c2652ba88ab6910e9db3b56429609b8792db7fa0 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
@@ -1,7 +1,7 @@
package org.smartboot.mqtt.broker;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
-import org.smartboot.mqtt.broker.plugin.provider.Providers;
+import org.smartboot.mqtt.broker.provider.Providers;
import org.smartboot.mqtt.common.eventbus.EventBus;
import java.io.IOException;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
index 4f891426a549e8a280ded19718d96a74008f9c75..0678fec4373cd84981c4173f8c0cd5b804eca8e2 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
@@ -6,16 +6,16 @@ import com.alibaba.fastjson2.JSONReader;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.smartboot.mqtt.broker.eventbus.ConnectAuthenticationSubscriber;
import org.smartboot.mqtt.broker.eventbus.ConnectIdleTimeMonitorSubscriber;
import org.smartboot.mqtt.broker.eventbus.KeepAliveMonitorSubscriber;
import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber;
import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer;
-import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage;
import org.smartboot.mqtt.broker.plugin.Plugin;
-import org.smartboot.mqtt.broker.plugin.provider.Providers;
+import org.smartboot.mqtt.broker.provider.Providers;
+import org.smartboot.mqtt.broker.provider.impl.ConfiguredConnectAuthenticationProviderImpl;
+import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.eventbus.EventBus;
@@ -93,6 +93,8 @@ public class BrokerContextImpl implements BrokerContext {
updateBrokerConfigure();
+ initProvider();
+
subscribeEventBus();
subscribeMessageBus();
@@ -103,7 +105,7 @@ public class BrokerContextImpl implements BrokerContext {
try {
pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true);
server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor);
- server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(brokerConfigure.getThreadNum());
+ server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum()));
server.start();
System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")");
System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt");
@@ -125,6 +127,11 @@ public class BrokerContextImpl implements BrokerContext {
configJson = null;
}
+ private void initProvider() {
+ //连接鉴权处理器
+ providers.setConnectAuthenticationProvider(new ConfiguredConnectAuthenticationProviderImpl(this));
+ }
+
private void initPushThread() {
if (brokerConfigure.getTopicLimit() <= 0) {
brokerConfigure.setTopicLimit(10);
@@ -208,8 +215,7 @@ public class BrokerContextImpl implements BrokerContext {
});
//连接鉴权超时监控
eventBus.subscribe(ServerEventType.SESSION_CREATE, new ConnectIdleTimeMonitorSubscriber(this));
- //连接鉴权
- eventBus.subscribe(ServerEventType.CONNECT, new ConnectAuthenticationSubscriber(this));
+
//保持连接状态监听,长时间没有消息通信将断开连接
eventBus.subscribe(ServerEventType.CONNECT, new KeepAliveMonitorSubscriber(this));
@@ -257,7 +263,8 @@ public class BrokerContextImpl implements BrokerContext {
}
});
//打印消息日志
-// eventBus.subscribe(Arrays.asList(EventType.RECEIVE_MESSAGE, EventType.WRITE_MESSAGE), new MessageLoggerSubscriber());
+// eventBus.subscribe(Arrays.asList(EventType.RECEIVE_MESSAGE, EventType.WRITE_MESSAGE), new
+// MessageLoggerSubscriber());
}
private void notifyPush(BrokerTopic topic) {
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
index 33f8fd42f37211205f1fae9482aa07d06f3873ac..cb983cf3f9181bf53dab0f24f81a557b21791eeb 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
@@ -3,7 +3,7 @@ package org.smartboot.mqtt.broker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.eventbus.ServerEventType;
-import org.smartboot.mqtt.broker.persistence.session.SessionState;
+import org.smartboot.mqtt.broker.provider.impl.session.SessionState;
import org.smartboot.mqtt.common.AbstractSession;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.MqttWriter;
@@ -120,8 +120,13 @@ public class MqttSession extends AbstractSession {
this.username = username;
}
- public void subscribe(String topicFilter, MqttQoS mqttQoS) {
- subscribe0(topicFilter, mqttQoS, true);
+ public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) {
+ if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) {
+ subscribe0(topicFilter, mqttQoS, true);
+ return mqttQoS;
+ } else {
+ return MqttQoS.FAILURE;
+ }
}
private void subscribe0(String topicFilter, MqttQoS mqttQoS, boolean newSubscribe) {
@@ -138,7 +143,7 @@ public class MqttSession extends AbstractSession {
//通配符匹配存量Topic
for (BrokerTopic topic : mqttContext.getTopics()) {
- if (TopicTokenUtil.match(topic.getTopicToken(), topicToken)) {
+ if (TopicTokenUtil.match(topic.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topic.getTopic(), this)) {
TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic);
if (newSubscribe) {
mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription);
@@ -163,7 +168,7 @@ public class MqttSession extends AbstractSession {
@Override
public void subscribe(EventType eventType, BrokerTopic object) {
- if (TopicTokenUtil.match(object.getTopicToken(), topicToken)) {
+ if (TopicTokenUtil.match(object.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) {
TopicSubscriber subscription = MqttSession.this.subscribeSuccess(mqttQoS, topicToken, object);
mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription);
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
index b5fc2d50ef2c629c61a92041172133de081d5fc1..4bfe887a155ab22fccb32dee26cdfa4cd750dbaa 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
@@ -2,8 +2,8 @@ package org.smartboot.mqtt.broker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage;
-import org.smartboot.mqtt.broker.persistence.message.PersistenceProvider;
+import org.smartboot.mqtt.broker.provider.PersistenceProvider;
+import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java
deleted file mode 100644
index 24f1b2d22e1b339b69c913b413473804863722f9..0000000000000000000000000000000000000000
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.smartboot.mqtt.broker.eventbus;
-
-import org.smartboot.mqtt.broker.AuthenticationService;
-import org.smartboot.mqtt.broker.BrokerContext;
-import org.smartboot.mqtt.broker.ConfiguredAuthenticationServiceImpl;
-import org.smartboot.mqtt.common.eventbus.EventBusSubscriber;
-import org.smartboot.mqtt.common.eventbus.EventType;
-import org.smartboot.mqtt.common.message.MqttConnectMessage;
-
-/**
- * @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2022/7/16
- */
-public class ConnectAuthenticationSubscriber implements EventBusSubscriber> {
- private final BrokerContext context;
- private final AuthenticationService authenticationService;
-
- public ConnectAuthenticationSubscriber(BrokerContext context) {
- this.context = context;
- // TODO Determine use which implements.
- this.authenticationService = new ConfiguredAuthenticationServiceImpl(context);
- }
-
- @Override
- public void subscribe(EventType> eventType, EventObject object) {
-// String validUserName = context.getBrokerConfigure().getUsername();
-// if (StringUtils.isBlank(validUserName)) {
-// object.getSession().setAuthorized(true);
-// return;
-// }
- String userName = object.getObject().getPayload().userName();
- byte[] passwordBytes = object.getObject().getPayload().passwordInBytes();
- String password = passwordBytes == null ? "" : new String(passwordBytes);
-
- boolean result = authenticationService.authentication(userName, password, object.getSession());
-
-// //身份验证
-// ValidateUtils.isTrue(StringUtils.equals(validUserName, userName)
-// && (StringUtils.isBlank(context.getBrokerConfigure().getPassword())
-// || StringUtils.equals(password, context.getBrokerConfigure().getPassword()))
-// , "login fail", object.getSession()::disconnect);
- object.getSession().setAuthorized(result);
- object.getSession().setUsername(userName);
- }
-}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java
index 6fd605b273410ffed873504c850f0fafc29e8d8f..1ebfdb53da3060b7b3ad225b2ac90bd942093cfa 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java
@@ -36,7 +36,7 @@ public class ConnectIdleTimeMonitorSubscriber implements EventBusSubscriber {
@Override
public void process(BrokerContext context, MqttSession session, MqttConnectMessage mqttConnectMessage) {
// LOGGER.info("receive connect message:{}", mqttConnectMessage);
+ String clientId = mqttConnectMessage.getPayload().clientIdentifier();
+ //服务端可以允许客户端提供一个零字节的客户端标识符 (ClientId) ,如果这样做了,服务端必须将这看作特
+ //殊情况并分配唯一的客户端标识符给那个客户端。然后它必须假设客户端提供了那个唯一的客户端标识符,正常处理这个 CONNECT 报文
+ if (clientId.length() == 0) {
+ clientId = MqttUtil.createClientId();
+ }
+ session.setClientId(clientId);
+
//有效性校验
//服务端必须按照 3.1 节的要求验证 CONNECT 报文,如果报文不符合规范,服务端不发送CONNACK 报文直接关闭网络连接
checkMessage(session, mqttConnectMessage);
- // 先进行认证
+ //连接鉴权
+ ValidateUtils.isTrue(context.getProviders().getConnectAuthenticationProvider().authentication(mqttConnectMessage, session), "Client authentication failed", () -> connFailAck(CONNECTION_REFUSED_NOT_AUTHORIZED, session));
+
+ session.setAuthorized(true);
+ session.setUsername(mqttConnectMessage.getPayload().userName());
+
+
context.getEventBus().publish(ServerEventType.CONNECT, EventObject.newEventObject(session, mqttConnectMessage));
- if (!session.isAuthorized()) {
- throw new IllegalStateException("Authorization failed");
- }
//清理会话
refreshSession(context, session, mqttConnectMessage);
@@ -84,11 +95,7 @@ public class ConnectProcessor implements MqttProcessor {
// 如果发现不支持的协议级别,服务端必须给发送一个返回码为 0x01(不支持的协议级别)的 CONNACK 报文响应
//CONNECT 报文,然后断开客户端的连接
final MqttVersion mqttVersion = MqttVersion.getByProtocolWithVersion(protocol, connectVariableHeader.getProtocolLevel());
- ValidateUtils.notNull(mqttVersion, "invalid version", () -> {
- MqttConnAckMessage badProto = connFailAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
- session.write(badProto);
- session.disconnect();
- });
+ ValidateUtils.notNull(mqttVersion, "invalid version", () -> connFailAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, session));
//服务端必须验证 CONNECT 控制报文的保留标志位(第 0 位)是否为 0,如果不为 0 必须断开客户端连接。
ValidateUtils.isTrue(connectVariableHeader.getReserved() == 0, "", session::disconnect);
@@ -98,32 +105,20 @@ public class ConnectProcessor implements MqttProcessor {
//“0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ”(大写字母,小写字母和数字)
boolean invalidClient = StringUtils.isNotBlank(clientId) && (mqttVersion == MqttVersion.MQTT_3_1 && clientId.length() > MqttCodecUtil.MAX_CLIENT_ID_LENGTH);
ValidateUtils.isTrue(!invalidClient, "", () -> {
- MqttConnAckMessage connAckMessage = connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
- session.write(connAckMessage);
- session.disconnect();
+ connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED, session);
LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
});
//如果客户端提供的 ClientId 为零字节且清理会话标志为 0,
// 服务端必须发送返回码为 0x02(表示标识符不合格)的 CONNACK 报文响应客户端的 CONNECT 报文,然后关闭网络连接
ValidateUtils.isTrue(connectVariableHeader.isCleanSession() || !StringUtils.isBlank(clientId), "", () -> {
- MqttConnAckMessage connAckMessage = connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
- session.write(connAckMessage);
- session.disconnect();
+ connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED, session);
LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
});
}
private void refreshSession(BrokerContext context, MqttSession session, MqttConnectMessage mqttConnectMessage) {
- MqttConnectPayload payload = mqttConnectMessage.getPayload();
session.setCleanSession(mqttConnectMessage.getVariableHeader().isCleanSession());
- String clientId = payload.clientIdentifier();
- //服务端可以允许客户端提供一个零字节的客户端标识符 (ClientId) ,如果这样做了,服务端必须将这看作特
- //殊情况并分配唯一的客户端标识符给那个客户端。然后它必须假设客户端提供了那个唯一的客户端标识符,正常处理这个 CONNECT 报文
- if (clientId.length() == 0) {
- clientId = MqttUtil.createClientId();
- }
-
- MqttSession mqttSession = context.getSession(clientId);
+ MqttSession mqttSession = context.getSession(session.getClientId());
if (mqttSession != null) {
if (session.isCleanSession()) {
//如果清理会话(CleanSession)标志被设置为 1,客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。
@@ -135,7 +130,7 @@ public class ConnectProcessor implements MqttProcessor {
mqttSession.disconnect();
//如果清理会话(CleanSession)标志被设置为 0,服务端必须基于当前会话(使用客户端标识符识别)的状态恢复与客户端的通信。
SessionStateProvider sessionStateProvider = context.getProviders().getSessionStateProvider();
- SessionState sessionState = sessionStateProvider.get(clientId);
+ SessionState sessionState = sessionStateProvider.get(session.getClientId());
if (sessionState != null) {
session.getResponseConsumers().putAll(sessionState.getResponseConsumers());
sessionState.getSubscribers().forEach(session::subscribe);
@@ -150,7 +145,6 @@ public class ConnectProcessor implements MqttProcessor {
}
}
- session.setClientId(clientId);
context.addSession(session);
LOGGER.debug("add session for client:{}", session);
}
@@ -167,10 +161,12 @@ public class ConnectProcessor implements MqttProcessor {
session.setWillMessage(publishMessage);
}
- private MqttConnAckMessage connFailAck(MqttConnectReturnCode returnCode) {
+ private void connFailAck(MqttConnectReturnCode returnCode, MqttSession session) {
//如果服务端发送了一个包含非零返回码的 CONNACK 报文,它必须将当前会话标志设置为 0
ValidateUtils.isTrue(returnCode != CONNECTION_ACCEPTED, "");
- return connAck(returnCode, false);
+ MqttConnAckMessage badProto = connAck(returnCode, false);
+ session.write(badProto);
+ session.disconnect();
}
private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java
index d40b2b56efb2acf7f3df58499cf73ec620019837..693d12a8d601c72903178f1bdaafee6fd26024f7 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java
@@ -25,8 +25,7 @@ public class SubscribeProcessor extends AuthorizedMqttProcessor true;
+
public SessionStateProvider getSessionStateProvider() {
return sessionStateProvider;
}
@@ -39,4 +41,19 @@ public class Providers {
this.persistenceProvider = persistenceProvider;
}
+ public ConnectAuthenticationProvider getConnectAuthenticationProvider() {
+ return connectAuthenticationProvider;
+ }
+
+ public void setConnectAuthenticationProvider(ConnectAuthenticationProvider connectAuthenticationProvider) {
+ this.connectAuthenticationProvider = connectAuthenticationProvider;
+ }
+
+ public SubscribeProvider getSubscribeProvider() {
+ return subscribeProvider;
+ }
+
+ public void setSubscribeProvider(SubscribeProvider subscribeProvider) {
+ this.subscribeProvider = subscribeProvider;
+ }
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java
similarity index 79%
rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java
rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java
index 84b20d9eba16b6538379c0a353f5466672ac4e4a..9d2d4239c30c6a745589fc1b952ad65f5a32f423 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java
@@ -1,4 +1,6 @@
-package org.smartboot.mqtt.broker.persistence.session;
+package org.smartboot.mqtt.broker.provider;
+
+import org.smartboot.mqtt.broker.provider.impl.session.SessionState;
/**
* 会话状态Provider
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
new file mode 100644
index 0000000000000000000000000000000000000000..b2327695b795c5224977823378e50415ec413a15
--- /dev/null
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
@@ -0,0 +1,12 @@
+package org.smartboot.mqtt.broker.provider;
+
+import org.smartboot.mqtt.broker.MqttSession;
+
+/**
+ * Topic订阅
+ * @author 三刀(zhengjunweimail@163.com)
+ * @version V1.0 , 2022/12/28
+ */
+public interface SubscribeProvider {
+ boolean subscribeTopic(String topicFilter, MqttSession session);
+}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java
similarity index 54%
rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java
rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java
index 90a01ca01567f318382f1ce42658de98dcf63fc7..01d64cc8c567309b14da72b54268c09a5df970fb 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java
@@ -1,8 +1,14 @@
-package org.smartboot.mqtt.broker;
+package org.smartboot.mqtt.broker.provider.impl;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.broker.BrokerConfigure;
+import org.smartboot.mqtt.broker.BrokerContext;
+import org.smartboot.mqtt.broker.MqttSession;
+import org.smartboot.mqtt.broker.provider.ConnectAuthenticationProvider;
+import org.smartboot.mqtt.common.message.MqttConnectMessage;
+import org.smartboot.mqtt.common.util.MqttUtil;
import java.util.Objects;
@@ -11,21 +17,24 @@ import java.util.Objects;
* @date 2022-08-05 16:45:50
* @since 1.0.0
*/
-public class ConfiguredAuthenticationServiceImpl implements AuthenticationService {
+public class ConfiguredConnectAuthenticationProviderImpl implements ConnectAuthenticationProvider {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredAuthenticationServiceImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredConnectAuthenticationProviderImpl.class);
private final BrokerConfigure configure;
- public ConfiguredAuthenticationServiceImpl(BrokerContext context) {
+ public ConfiguredConnectAuthenticationProviderImpl(BrokerContext context) {
configure = context.getBrokerConfigure();
}
+
@Override
- public boolean authentication(String username, String password, MqttSession session) {
+ public boolean authentication(MqttConnectMessage connectMessage, MqttSession session) {
+ String username = connectMessage.getPayload().userName();
+ String password = connectMessage.getPayload().passwordInBytes() == null ? "" : new String(connectMessage.getPayload().passwordInBytes());
String configuredUsername = configure.getUsername();
String configuredPassword = configure.getPassword();
- String host = getHost(session);
+ String host = MqttUtil.getRemoteAddress(session);
if (StringUtils.isEmpty(configuredPassword) || StringUtils.isEmpty(configuredUsername)) {
@@ -36,18 +45,10 @@ public class ConfiguredAuthenticationServiceImpl implements AuthenticationServic
boolean auth = Objects.equals(configuredUsername, username) && Objects.equals(configuredPassword, password);
if (auth) {
LOGGER.info("auth success, ip:{} clientId: {}, username: {}", host, session.getClientId(), username);
- } else {
+ } else {
LOGGER.info("auth failed, ip:{} clientId: {}, username: {}", host, session.getClientId(), username);
}
return auth;
}
-
- private static String getHost(MqttSession session) {
- try {
- return session.getRemoteAddress().getHostName();
- } catch (Exception e) {
- return "";
- }
- }
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java
similarity index 96%
rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java
rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java
index b2f4de4804d39d8b3256ec7e078bcbdcb986938e..eccb87bb424e57efdfe37b1e442829997ac26d9a 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java
@@ -1,4 +1,4 @@
-package org.smartboot.mqtt.broker.persistence.message;
+package org.smartboot.mqtt.broker.provider.impl.message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java
similarity index 92%
rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java
rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java
index 349c30df2147efd69eb036510a4f335984b966c7..60ab977f237dd6c0f9d75673158c63df82b4252b 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java
@@ -1,5 +1,6 @@
-package org.smartboot.mqtt.broker.persistence.message;
+package org.smartboot.mqtt.broker.provider.impl.message;
+import org.smartboot.mqtt.broker.provider.PersistenceProvider;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import java.util.concurrent.ConcurrentHashMap;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java
similarity index 95%
rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java
rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java
index 0a556a9ca166a2036e6e8111e8af28084ce8e592..9b0caf7759e7762a63e7b41aef899e75dbabb5f1 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java
@@ -1,4 +1,4 @@
-package org.smartboot.mqtt.broker.persistence.message;
+package org.smartboot.mqtt.broker.provider.impl.message;
import org.smartboot.mqtt.common.ToString;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java
similarity index 84%
rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java
rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java
index 13b4d12f0e87602f724d1af29ae964a3d4392606..b72ea7be8d05b2e75fd68746ee48c8539020bc44 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java
@@ -1,4 +1,6 @@
-package org.smartboot.mqtt.broker.persistence.session;
+package org.smartboot.mqtt.broker.provider.impl.session;
+
+import org.smartboot.mqtt.broker.provider.SessionStateProvider;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java
similarity index 91%
rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java
rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java
index 4b1bdafaf21ded08bb42ac81079792a4b73bdc20..a9e6904cf63343c6fa9008077dd766c370c46943 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java
@@ -1,4 +1,4 @@
-package org.smartboot.mqtt.broker.persistence.session;
+package org.smartboot.mqtt.broker.provider.impl.session;
import org.smartboot.mqtt.common.AckMessage;
import org.smartboot.mqtt.common.enums.MqttQoS;
diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml
index fdc187fb8f41eb27d73dca2bd32d6d11c5d934d3..6ffee2e18eb4e0f80aecc497d899a1490f059876 100644
--- a/smart-mqtt-client/pom.xml
+++ b/smart-mqtt-client/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.11
+ 0.12
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml
index 22da02a637c278707285b8c3b759f5d33417051a..a65febd1eb026978a97f86390be8bef5bf7d5d9d 100644
--- a/smart-mqtt-common/pom.xml
+++ b/smart-mqtt-common/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.11
+ 0.12
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java
index 80519e345b0d1105f5b338801dce603ab1053a61..5bf0c08817d127196289740afe91da248c0e064e 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java
@@ -2,13 +2,9 @@ package org.smartboot.mqtt.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttQoS;
-import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.mqtt.common.message.MqttPubAckMessage;
-import org.smartboot.mqtt.common.message.MqttPubCompMessage;
-import org.smartboot.mqtt.common.message.MqttPubRecMessage;
-import org.smartboot.mqtt.common.message.MqttPubRelMessage;
-import org.smartboot.mqtt.common.message.MqttPublishMessage;
+import org.smartboot.mqtt.common.message.*;
import org.smartboot.mqtt.common.util.ValidateUtils;
import java.util.Objects;
@@ -26,7 +22,7 @@ public abstract class QosPublisher {
//至少一次
session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> {
- ValidateUtils.isTrue(message instanceof MqttPubAckMessage, "invalid message type");
+ ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBACK, "invalid message type");
future.complete(true);
session.responseConsumers.remove(cacheKey);
LOGGER.info("Qos1消息发送成功...");
@@ -44,13 +40,13 @@ public abstract class QosPublisher {
CompletableFuture publishFuture = new CompletableFuture<>();
//只有一次
session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> {
- ValidateUtils.isTrue(message instanceof MqttPubRecMessage, "invalid message type");
+ ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBREC, "invalid message type");
ValidateUtils.isTrue(Objects.equals(message.getVariableHeader().getPacketId(), publishMessage.getVariableHeader().getPacketId()), "invalid packetId");
publishFuture.complete(true);
MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(message.getVariableHeader().getPacketId());
CompletableFuture pubRelFuture = new CompletableFuture<>();
session.responseConsumers.put(cacheKey, new AckMessage(pubRelMessage, compMessage -> {
- ValidateUtils.isTrue(compMessage instanceof MqttPubCompMessage, "invalid message type");
+ ValidateUtils.isTrue(compMessage.getFixedHeader().getMessageType() == MqttMessageType.PUBCOMP, "invalid message type");
ValidateUtils.isTrue(Objects.equals(compMessage.getVariableHeader().getPacketId(), pubRelMessage.getVariableHeader().getPacketId()), "invalid packetId");
pubRelFuture.complete(true);
LOGGER.info("Qos2消息发送成功...");
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java
index 8084318396a1ef2bd045a7dd7e1302d9651aa129..aa9554bc8f3b9f14548f6bf995ac84c5e9e4673c 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java
@@ -4,6 +4,7 @@ package org.smartboot.mqtt.common.enums;
*
*/
public enum MqttConnectReturnCode {
+ //MQTT3
CONNECTION_ACCEPTED((byte) 0x00, "连接已被服务端接受"),
CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01, "服务端不支持客户端请求的 MQTT 协议级别"),
CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02, "客户端标识符是正确的 UTF-8 编码,但服务 端不允许使用"),
@@ -11,7 +12,6 @@ public enum MqttConnectReturnCode {
CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04, "用户名或密码的数据格式无效"),
CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器");
-
private final byte code;
private final String desc;
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java
index cfd7a6c9d972b2c4473740396b03ea2181dd4728..94fcc5c5ef686136b95e6151b99037b48bfe4f56 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java
@@ -5,7 +5,7 @@ package org.smartboot.mqtt.common.enums;
* @version V1.0 , 2022/3/23
*/
public enum MqttProtocolEnum {
- MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT");
+ MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"), MQTT_5("MQTT");
/**
* 协议名
*/
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java
new file mode 100644
index 0000000000000000000000000000000000000000..aecd578d8306062ba3266cfcea5521219d9ce5cf
--- /dev/null
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java
@@ -0,0 +1,78 @@
+package org.smartboot.mqtt.common.enums;
+
+/**
+ *
+ */
+public enum MqttReasonCode {
+ //MQTT5
+ SUCCESS((byte) 0x00, "成功"),
+ NORMAL_DISCONNECTION((byte) 0x00, "断开连接"),
+ GRANTED_QOS0((byte) 0x00, "最大允许qos为0"),
+ GRANTED_QOS1((byte) 0x01, "最大允许qos为1"),
+ GRANTED_QOS2((byte) 0x02, "最大允许qos为2"),
+ DISCONNECT_WITH_WILL_MESSAGE((byte) 0x04, "客户端需要断开连接后发送遗嘱消息"),
+ NO_MATCHING_SUBSCRIBERS((byte) 0X10, "无匹配的订阅者"),
+ N0_SUBSCRIPTION_EXISTED((byte) 0X11, ""),
+ CONTINUE_AUTHENTICATION((byte) 0X18, ""),
+ RE_AUTHENTICATE((byte) 0X19, ""),
+ UNSPECIFIED_ERROR((byte) 0x80, "未指明的错误"),
+ MALFORMED_PACKET((byte) 0x81, "数据未正确解析"),
+ PROTOCOL_ERROR((byte) 0x82, "协议版本错误"),
+ IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83, "接收者不接受"),
+ UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"),
+ CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"),
+ BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"),
+ NOT_AUTHORIZED((byte) 0x87, "未授权"),
+ SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"),
+ SERVER_BUSY((byte) 0x89, "服务端繁忙中"),
+ BANNED((byte) 0x8A, "客户端被禁用"),
+ SERVER_SHUTTING_DOWN((byte) 0x8B, ""),
+ BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"),
+ KEEP_ALIVE_TIMEOUT((byte) 0x8D, ""),
+ SESSION_TAKEN_OVER((byte) 0x8E, "相同客户端id上线导致被踢出下线"),
+ TOPIC_FILTER_INVALID((byte) 0x8F, "消息过滤非法"),
+ TOPIC_NAME_INVALID((byte) 0x90, "topic名非法"),
+ PACKET_IDENTIFIER_IN_USE((byte) 0x91, "packetId已被使用"),
+ PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92, ""),
+ RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93, ""),
+ TOPIC_ALIAS_INVALID((byte) 0x94, ""),
+ PACKET_TOO_LARGE((byte) 0x95, "包大小超限"),
+ MESSAGE_RATE_TOO_HIGH((byte) 0x96, ""),
+ QUOTA_EXCEEDED((byte) 0x97, "已超限"),
+ ADMINISTRATIVE_ACTION((byte) 0x98, ""),
+ PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"),
+ RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"),
+ QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"),
+ USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"),
+ SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"),
+ SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E, ""),
+ CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"),
+ MAXIMUM_CONNECT_TIME((byte) 0xA0, ""),
+ SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1, ""),
+ WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2, "");
+
+ private final byte code;
+ private final String desc;
+
+ MqttReasonCode(byte code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ public static MqttReasonCode valueOf(byte b) {
+ for (MqttReasonCode v : values()) {
+ if (b == v.code) {
+ return v;
+ }
+ }
+ throw new IllegalArgumentException("unknown reason code: " + (b & 0xFF));
+ }
+
+ public byte getCode() {
+ return code;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+}
\ No newline at end of file
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java
index 5aa5040fd73a786dd097c92b5788a4a0aff94d9b..58ee607c299686775274b08f7f5d4f364d543660 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java
@@ -1,7 +1,7 @@
package org.smartboot.mqtt.common.enums;
public enum MqttVersion {
- MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4);
+ MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4), MQTT_5(MqttProtocolEnum.MQTT_5, (byte) 5);
private final MqttProtocolEnum protocol;
private final byte level;
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java
index 0a8193f2a460a15afc9303136461868d11dad95b..cb5a0f95e517d9c21a58828bdc2cee20f58a63a3 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java
@@ -2,6 +2,7 @@ package org.smartboot.mqtt.common.message;
import org.smartboot.mqtt.common.MqttWriter;
import org.smartboot.mqtt.common.enums.MqttConnectReturnCode;
+import org.smartboot.mqtt.common.protocol.DecodeUnit;
import org.smartboot.socket.util.BufferUtils;
import java.nio.ByteBuffer;
@@ -22,7 +23,7 @@ public class MqttConnAckMessage extends MqttVariableMessage {
+
+ public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader) {
+ super(mqttFixedHeader);
+ }
+
+ public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId) {
+ this(mqttFixedHeader, packetId, (byte)0, null);
+ }
+
+ public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId, byte reasonCode, MqttProperties mqttProperties) {
+ super(mqttFixedHeader);
+ setVariableHeader(new MqttPubReplyVariableHeader(packetId, reasonCode, mqttProperties));
+ }
+
+ @Override
+ public final void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) {
+ int packetId = buffer.getShort();
+ MqttPubReplyVariableHeader header;
+ if (unit.mqttVersion == MqttVersion.MQTT_5){
+ byte reasonCode = buffer.get();
+ byte propertyLen = buffer.get();
+ header = new MqttPubReplyVariableHeader(packetId, reasonCode, null);
+ }else {
+ header = new MqttPubReplyVariableHeader(packetId, (byte)0, null);
+ }
+ setVariableHeader(header);
+ }
+
+
+ @Override
+ public void writeTo(MqttWriter mqttWriter) throws IOException {
+ MqttPubReplyVariableHeader variableHeader = getVariableHeader();
+ int variableHeaderBufferSize = 2; // variable part only has a message id
+ mqttWriter.writeByte(getFixedHeaderByte1(fixedHeader));
+ writeVariableLengthInt(mqttWriter, variableHeaderBufferSize);
+ mqttWriter.writeShort((short) variableHeader.getPacketId());
+ mqttWriter.writeByte(variableHeader.getReasonCode());
+ }
+}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java
index 34fde45b05ac5e1f358b14d7bff9439da9585651..0b4b5a9e3207655ea2af8df4b4a16650d38be7e0 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java
@@ -3,6 +3,8 @@ package org.smartboot.mqtt.common.message;
import org.smartboot.mqtt.common.MqttWriter;
import org.smartboot.mqtt.common.ToString;
import org.smartboot.mqtt.common.exception.MqttProcessException;
+import org.smartboot.mqtt.common.protocol.DecodeUnit;
+import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.socket.util.BufferUtils;
import org.smartboot.socket.util.DecoderException;
@@ -141,7 +143,7 @@ public class MqttMessage extends ToString {
*
* @param buffer
*/
- public void decodeVariableHeader(ByteBuffer buffer) {
+ public void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) {
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java
index 64ea394a58f497f9eba454871a95fd083fcd7623..ed85e1b970c850277fde68b8ddf5de4fd8a9b223 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java
@@ -10,6 +10,12 @@ public class MqttPacketIdVariableHeader extends MqttVariableHeader {
*/
private int packetId;
+ public MqttPacketIdVariableHeader() {
+ }
+
+ public MqttPacketIdVariableHeader(int packetId) {
+ this.packetId = packetId;
+ }
public void setPacketId(int packetId) {
this.packetId = packetId;
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java
index ff4fd6a4c7655c8bd3099bef33ea6ed43508cbc7..ee86a2176b48c69cc040ab04f7cfa2aafe7cacd3 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java
@@ -1,6 +1,7 @@
package org.smartboot.mqtt.common.message;
import org.smartboot.mqtt.common.MqttWriter;
+import org.smartboot.mqtt.common.protocol.DecodeUnit;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -28,7 +29,7 @@ public class MqttPacketIdentifierMessage extends MqttVariableMessage property type
+ */
+ public abstract static class MqttProperty {
+ final T value;
+ final int propertyId;
+
+ protected MqttProperty(int propertyId, T value) {
+ this.propertyId = propertyId;
+ this.value = value;
+ }
+
+ /**
+ * Get MQTT property value
+ *
+ * @return property value
+ */
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Get MQTT property ID
+ * @return property ID
+ */
+ public int propertyId() {
+ return propertyId;
+ }
+
+ @Override
+ public int hashCode() {
+ return propertyId + 31 * value.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ MqttProperty that = (MqttProperty) obj;
+ return this.propertyId == that.propertyId && this.value.equals(that.value);
+ }
+ }
+
+ public static final class IntegerProperty extends MqttProperty {
+
+ public IntegerProperty(int propertyId, Integer value) {
+ super(propertyId, value);
+ }
+
+ @Override
+ public String toString() {
+ return "IntegerProperty(" + propertyId + ", " + value + ")";
+ }
+ }
+
+ public static final class StringProperty extends MqttProperty {
+
+ public StringProperty(int propertyId, String value) {
+ super(propertyId, value);
+ }
+
+ @Override
+ public String toString() {
+ return "StringProperty(" + propertyId + ", " + value + ")";
+ }
+ }
+
+ public static final class StringPair {
+ public final String key;
+ public final String value;
+
+ public StringPair(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode() + 31 * value.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ StringPair that = (StringPair) obj;
+
+ return that.key.equals(this.key) && that.value.equals(this.value);
+ }
+ }
+
+ //User properties are the only properties that may be included multiple times and
+ //are the only properties where ordering is required. Therefore, they need a special handling
+ public static final class UserProperties extends MqttProperty> {
+ public UserProperties() {
+ super(MqttPropertyType.USER_PROPERTY.value, new ArrayList());
+ }
+
+ /**
+ * Create user properties from the collection of the String pair values
+ *
+ * @param values string pairs. Collection entries are copied, collection itself isn't shared
+ */
+ public UserProperties(Collection values) {
+ this();
+ this.value.addAll(values);
+ }
+
+ private static UserProperties fromUserPropertyCollection(Collection properties) {
+ UserProperties userProperties = new UserProperties();
+ for (UserProperty property: properties) {
+ userProperties.add(new StringPair(property.value.key, property.value.value));
+ }
+ return userProperties;
+ }
+
+ public void add(StringPair pair) {
+ this.value.add(pair);
+ }
+
+ public void add(String key, String value) {
+ this.value.add(new StringPair(key, value));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("UserProperties(");
+ boolean first = true;
+ for (StringPair pair: value) {
+ if (!first) {
+ builder.append(", ");
+ }
+ builder.append(pair.key + "->" + pair.value);
+ first = false;
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+ }
+
+ public static final class UserProperty extends MqttProperty {
+ public UserProperty(String key, String value) {
+ super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value));
+ }
+
+ @Override
+ public String toString() {
+ return "UserProperty(" + value.key + ", " + value.value + ")";
+ }
+ }
+
+ public static final class BinaryProperty extends MqttProperty {
+
+ public BinaryProperty(int propertyId, byte[] value) {
+ super(propertyId, value);
+ }
+
+ @Override
+ public String toString() {
+ return "BinaryProperty(" + propertyId + ", " + value.length + " bytes)";
+ }
+ }
+
+
+ private Map props;
+ private List userProperties;
+ private List subscriptionIds;
+
+ public void add(MqttProperty property) {
+ Map props = this.props;
+ if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) {
+ List userProperties = this.userProperties;
+ if (userProperties == null) {
+ userProperties = new ArrayList(1);
+ this.userProperties = userProperties;
+ }
+ if (property instanceof UserProperty) {
+ userProperties.add((UserProperty) property);
+ } else if (property instanceof UserProperties) {
+ for (StringPair pair: ((UserProperties) property).value) {
+ userProperties.add(new UserProperty(pair.key, pair.value));
+ }
+ } else {
+ throw new IllegalArgumentException("User property must be of UserProperty or UserProperties type");
+ }
+ } else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
+ List subscriptionIds = this.subscriptionIds;
+ if (subscriptionIds == null) {
+ subscriptionIds = new ArrayList(1);
+ this.subscriptionIds = subscriptionIds;
+ }
+ if (property instanceof IntegerProperty) {
+ subscriptionIds.add((IntegerProperty) property);
+ } else {
+ throw new IllegalArgumentException("Subscription ID must be an integer property");
+ }
+ } else {
+ if (props == null) {
+ props = new HashMap<>();
+ this.props = props;
+ }
+ props.put(property.propertyId, property);
+ }
+ }
+
+ public Collection extends MqttProperty> listAll() {
+ Map props = this.props;
+ if (props == null && subscriptionIds == null && userProperties == null) {
+ return Collections.emptyList();
+ }
+ if (subscriptionIds == null && userProperties == null) {
+ return props.values();
+ }
+ if (props == null && userProperties == null) {
+ return subscriptionIds;
+ }
+ List propValues = new ArrayList(props != null ? props.size() : 1);
+ if (props != null) {
+ propValues.addAll(props.values());
+ }
+ if (subscriptionIds != null) {
+ propValues.addAll(subscriptionIds);
+ }
+ if (userProperties != null) {
+ propValues.add(UserProperties.fromUserPropertyCollection(userProperties));
+ }
+ return propValues;
+ }
+
+ public boolean isEmpty() {
+ Map props = this.props;
+ return props == null || props.isEmpty();
+ }
+
+ /**
+ * Get property by ID. If there are multiple properties of this type (can be with Subscription ID)
+ * then return the first one.
+ *
+ * @param propertyId ID of the property
+ * @return a property if it is set, null otherwise
+ */
+ public MqttProperty getProperty(int propertyId) {
+ if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
+ //special handling to keep compatibility with earlier versions
+ List userProperties = this.userProperties;
+ if (userProperties == null) {
+ return null;
+ }
+ return UserProperties.fromUserPropertyCollection(userProperties);
+ }
+ if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
+ List subscriptionIds = this.subscriptionIds;
+ if (subscriptionIds == null || subscriptionIds.isEmpty()) {
+ return null;
+ }
+ return subscriptionIds.get(0);
+ }
+ Map props = this.props;
+ return props == null ? null : props.get(propertyId);
+ }
+
+ /**
+ * Get properties by ID.
+ * Some properties (Subscription ID and User Properties) may occur multiple times,
+ * this method returns all their values in order.
+ *
+ * @param propertyId ID of the property
+ * @return all properties having specified ID
+ */
+ public List extends MqttProperty> getProperties(int propertyId) {
+ if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
+ return userProperties == null ? Collections.emptyList() : userProperties;
+ }
+ if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
+ return subscriptionIds == null ? Collections.emptyList() : subscriptionIds;
+ }
+ Map props = this.props;
+ return (props == null || !props.containsKey(propertyId)) ?
+ Collections.emptyList() :
+ Collections.singletonList(props.get(propertyId));
+ }
+}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java
index 670a9ed9f1e679a9ac16aa33653af5c0734f3e18..f1b8f364631cea60bf8b2811bf65dc01b3deffff 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java
@@ -9,7 +9,7 @@ public class MqttPubCompMessage extends MqttPacketIdentifierMessage {
super(mqttFixedHeader);
}
- public MqttPubCompMessage(int mqttPacketIdVariableHeader) {
- super(MqttFixedHeader.PUB_COMP_HEADER, mqttPacketIdVariableHeader);
+ public MqttPubCompMessage(int packetId) {
+ super(MqttFixedHeader.PUB_COMP_HEADER, packetId);
}
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java
new file mode 100644
index 0000000000000000000000000000000000000000..7b72dd97b2fb08e31d26070c6460c3e906f6bae3
--- /dev/null
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java
@@ -0,0 +1,24 @@
+package org.smartboot.mqtt.common.message;
+
+public class MqttPubReplyVariableHeader extends MqttPacketIdVariableHeader{
+
+ private final byte reasonCode;
+ private final MqttProperties properties;
+
+ public MqttPubReplyVariableHeader(int packetId, byte reasonCode, MqttProperties properties) {
+ super(packetId);
+ if (packetId < 1 || packetId > 0xffff) {
+ throw new IllegalArgumentException("packetId: " + packetId + " (should be: 1 ~ 65535)");
+ }
+ this.reasonCode = reasonCode;
+ this.properties = properties;
+ }
+
+ public byte getReasonCode() {
+ return reasonCode;
+ }
+
+ public MqttProperties getProperties() {
+ return properties;
+ }
+}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
index d6280122bef726fbfdc0043ca9cec9148943591e..39e508d1580df8b689b1ec3550fe2c968071a54c 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
@@ -1,6 +1,7 @@
package org.smartboot.mqtt.common.message;
import org.smartboot.mqtt.common.MqttWriter;
+import org.smartboot.mqtt.common.protocol.DecodeUnit;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.socket.util.DecoderException;
@@ -25,7 +26,7 @@ public class MqttPublishMessage extends MqttVariableMessage {
if (payloadBuffer.remaining() < remainingLength) {
break;
}
- unit.mqttMessage.decodeVariableHeader(payloadBuffer);
+ unit.mqttMessage.decodeVariableHeader(unit, payloadBuffer);
unit.state = READ_PAYLOAD;
@@ -181,10 +182,4 @@ public class MqttProtocol implements Protocol {
READ_FIXED_HEADER, READ_VARIABLE_HEADER, READ_PAYLOAD, FINISH,
}
- class DecodeUnit {
- DecoderState state;
- MqttMessage mqttMessage;
-
- ByteBuffer disposableBuffer;
- }
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java
index b472a137157f9759240e7a8586858ba294b03f62..40c5d51f0c0cbd6d9114b05e65fe0062bb926e2c 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java
@@ -1,5 +1,8 @@
package org.smartboot.mqtt.common.util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.common.AbstractSession;
import org.smartboot.mqtt.common.MqttMessageBuilders;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
@@ -11,6 +14,7 @@ import java.util.UUID;
* @version V1.0 , 2022/3/29
*/
public class MqttUtil {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MqttUtil.class);
/**
* Topic 通配符
*/
@@ -33,4 +37,12 @@ public class MqttUtil {
return UUID.randomUUID().toString().replace("-", "");
}
+ public static String getRemoteAddress(AbstractSession session) {
+ try {
+ return session.getRemoteAddress().toString();
+ } catch (Exception e) {
+ LOGGER.error("getRemoteAddress exception", e);
+ return "";
+ }
+ }
}