diff --git a/pom.xml b/pom.xml index 77b4fa0773ed71955829e6f290df8e892f025eb8..42658ff77e178794bc38f7ee9af8b8f10942df10 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.11 + 0.12 4.0.0 mqtt broker - 0.11 + 0.12 1.5.23 2.6 4.3 diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index e2279b50baa89822fef36c78f029a2ff495f0e45..8373da63e6eb8dce2dd5be2164edaaedffd9e8d2 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.11 + 0.12 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 8d68ca2ccf6702ed38bc16c01a76238d3077dbb2..19adc89d6634ea0da2880fd2ce60b92ca5830bbf 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -26,7 +26,7 @@ public class BrokerConfigure { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.11"; + public static final String VERSION = "v0.12"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 338327e78f408110d192ecc598c091e5d9eea265..c2652ba88ab6910e9db3b56429609b8792db7fa0 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -1,7 +1,7 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; -import org.smartboot.mqtt.broker.plugin.provider.Providers; +import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.common.eventbus.EventBus; import java.io.IOException; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 4f891426a549e8a280ded19718d96a74008f9c75..0678fec4373cd84981c4173f8c0cd5b804eca8e2 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -6,16 +6,16 @@ import com.alibaba.fastjson2.JSONReader; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.eventbus.ConnectAuthenticationSubscriber; import org.smartboot.mqtt.broker.eventbus.ConnectIdleTimeMonitorSubscriber; import org.smartboot.mqtt.broker.eventbus.KeepAliveMonitorSubscriber; import org.smartboot.mqtt.broker.eventbus.ServerEventType; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer; -import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage; import org.smartboot.mqtt.broker.plugin.Plugin; -import org.smartboot.mqtt.broker.plugin.provider.Providers; +import org.smartboot.mqtt.broker.provider.Providers; +import org.smartboot.mqtt.broker.provider.impl.ConfiguredConnectAuthenticationProviderImpl; +import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.eventbus.EventBus; @@ -93,6 +93,8 @@ public class BrokerContextImpl implements BrokerContext { updateBrokerConfigure(); + initProvider(); + subscribeEventBus(); subscribeMessageBus(); @@ -103,7 +105,7 @@ public class BrokerContextImpl implements BrokerContext { try { pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); - server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(brokerConfigure.getThreadNum()); + server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); server.start(); System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt"); @@ -125,6 +127,11 @@ public class BrokerContextImpl implements BrokerContext { configJson = null; } + private void initProvider() { + //连接鉴权处理器 + providers.setConnectAuthenticationProvider(new ConfiguredConnectAuthenticationProviderImpl(this)); + } + private void initPushThread() { if (brokerConfigure.getTopicLimit() <= 0) { brokerConfigure.setTopicLimit(10); @@ -208,8 +215,7 @@ public class BrokerContextImpl implements BrokerContext { }); //连接鉴权超时监控 eventBus.subscribe(ServerEventType.SESSION_CREATE, new ConnectIdleTimeMonitorSubscriber(this)); - //连接鉴权 - eventBus.subscribe(ServerEventType.CONNECT, new ConnectAuthenticationSubscriber(this)); + //保持连接状态监听,长时间没有消息通信将断开连接 eventBus.subscribe(ServerEventType.CONNECT, new KeepAliveMonitorSubscriber(this)); @@ -257,7 +263,8 @@ public class BrokerContextImpl implements BrokerContext { } }); //打印消息日志 -// eventBus.subscribe(Arrays.asList(EventType.RECEIVE_MESSAGE, EventType.WRITE_MESSAGE), new MessageLoggerSubscriber()); +// eventBus.subscribe(Arrays.asList(EventType.RECEIVE_MESSAGE, EventType.WRITE_MESSAGE), new +// MessageLoggerSubscriber()); } private void notifyPush(BrokerTopic topic) { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 33f8fd42f37211205f1fae9482aa07d06f3873ac..cb983cf3f9181bf53dab0f24f81a557b21791eeb 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -3,7 +3,7 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.eventbus.ServerEventType; -import org.smartboot.mqtt.broker.persistence.session.SessionState; +import org.smartboot.mqtt.broker.provider.impl.session.SessionState; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.MqttWriter; @@ -120,8 +120,13 @@ public class MqttSession extends AbstractSession { this.username = username; } - public void subscribe(String topicFilter, MqttQoS mqttQoS) { - subscribe0(topicFilter, mqttQoS, true); + public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) { + if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) { + subscribe0(topicFilter, mqttQoS, true); + return mqttQoS; + } else { + return MqttQoS.FAILURE; + } } private void subscribe0(String topicFilter, MqttQoS mqttQoS, boolean newSubscribe) { @@ -138,7 +143,7 @@ public class MqttSession extends AbstractSession { //通配符匹配存量Topic for (BrokerTopic topic : mqttContext.getTopics()) { - if (TopicTokenUtil.match(topic.getTopicToken(), topicToken)) { + if (TopicTokenUtil.match(topic.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topic.getTopic(), this)) { TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic); if (newSubscribe) { mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); @@ -163,7 +168,7 @@ public class MqttSession extends AbstractSession { @Override public void subscribe(EventType eventType, BrokerTopic object) { - if (TopicTokenUtil.match(object.getTopicToken(), topicToken)) { + if (TopicTokenUtil.match(object.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) { TopicSubscriber subscription = MqttSession.this.subscribeSuccess(mqttQoS, topicToken, object); mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index b5fc2d50ef2c629c61a92041172133de081d5fc1..4bfe887a155ab22fccb32dee26cdfa4cd750dbaa 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -2,8 +2,8 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage; -import org.smartboot.mqtt.broker.persistence.message.PersistenceProvider; +import org.smartboot.mqtt.broker.provider.PersistenceProvider; +import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java deleted file mode 100644 index 24f1b2d22e1b339b69c913b413473804863722f9..0000000000000000000000000000000000000000 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.smartboot.mqtt.broker.eventbus; - -import org.smartboot.mqtt.broker.AuthenticationService; -import org.smartboot.mqtt.broker.BrokerContext; -import org.smartboot.mqtt.broker.ConfiguredAuthenticationServiceImpl; -import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; -import org.smartboot.mqtt.common.eventbus.EventType; -import org.smartboot.mqtt.common.message.MqttConnectMessage; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2022/7/16 - */ -public class ConnectAuthenticationSubscriber implements EventBusSubscriber> { - private final BrokerContext context; - private final AuthenticationService authenticationService; - - public ConnectAuthenticationSubscriber(BrokerContext context) { - this.context = context; - // TODO Determine use which implements. - this.authenticationService = new ConfiguredAuthenticationServiceImpl(context); - } - - @Override - public void subscribe(EventType> eventType, EventObject object) { -// String validUserName = context.getBrokerConfigure().getUsername(); -// if (StringUtils.isBlank(validUserName)) { -// object.getSession().setAuthorized(true); -// return; -// } - String userName = object.getObject().getPayload().userName(); - byte[] passwordBytes = object.getObject().getPayload().passwordInBytes(); - String password = passwordBytes == null ? "" : new String(passwordBytes); - - boolean result = authenticationService.authentication(userName, password, object.getSession()); - -// //身份验证 -// ValidateUtils.isTrue(StringUtils.equals(validUserName, userName) -// && (StringUtils.isBlank(context.getBrokerConfigure().getPassword()) -// || StringUtils.equals(password, context.getBrokerConfigure().getPassword())) -// , "login fail", object.getSession()::disconnect); - object.getSession().setAuthorized(result); - object.getSession().setUsername(userName); - } -} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java index 6fd605b273410ffed873504c850f0fafc29e8d8f..1ebfdb53da3060b7b3ad225b2ac90bd942093cfa 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java @@ -36,7 +36,7 @@ public class ConnectIdleTimeMonitorSubscriber implements EventBusSubscriber { @Override public void process(BrokerContext context, MqttSession session, MqttConnectMessage mqttConnectMessage) { // LOGGER.info("receive connect message:{}", mqttConnectMessage); + String clientId = mqttConnectMessage.getPayload().clientIdentifier(); + //服务端可以允许客户端提供一个零字节的客户端标识符 (ClientId) ,如果这样做了,服务端必须将这看作特 + //殊情况并分配唯一的客户端标识符给那个客户端。然后它必须假设客户端提供了那个唯一的客户端标识符,正常处理这个 CONNECT 报文 + if (clientId.length() == 0) { + clientId = MqttUtil.createClientId(); + } + session.setClientId(clientId); + //有效性校验 //服务端必须按照 3.1 节的要求验证 CONNECT 报文,如果报文不符合规范,服务端不发送CONNACK 报文直接关闭网络连接 checkMessage(session, mqttConnectMessage); - // 先进行认证 + //连接鉴权 + ValidateUtils.isTrue(context.getProviders().getConnectAuthenticationProvider().authentication(mqttConnectMessage, session), "Client authentication failed", () -> connFailAck(CONNECTION_REFUSED_NOT_AUTHORIZED, session)); + + session.setAuthorized(true); + session.setUsername(mqttConnectMessage.getPayload().userName()); + + context.getEventBus().publish(ServerEventType.CONNECT, EventObject.newEventObject(session, mqttConnectMessage)); - if (!session.isAuthorized()) { - throw new IllegalStateException("Authorization failed"); - } //清理会话 refreshSession(context, session, mqttConnectMessage); @@ -84,11 +95,7 @@ public class ConnectProcessor implements MqttProcessor { // 如果发现不支持的协议级别,服务端必须给发送一个返回码为 0x01(不支持的协议级别)的 CONNACK 报文响应 //CONNECT 报文,然后断开客户端的连接 final MqttVersion mqttVersion = MqttVersion.getByProtocolWithVersion(protocol, connectVariableHeader.getProtocolLevel()); - ValidateUtils.notNull(mqttVersion, "invalid version", () -> { - MqttConnAckMessage badProto = connFailAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION); - session.write(badProto); - session.disconnect(); - }); + ValidateUtils.notNull(mqttVersion, "invalid version", () -> connFailAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, session)); //服务端必须验证 CONNECT 控制报文的保留标志位(第 0 位)是否为 0,如果不为 0 必须断开客户端连接。 ValidateUtils.isTrue(connectVariableHeader.getReserved() == 0, "", session::disconnect); @@ -98,32 +105,20 @@ public class ConnectProcessor implements MqttProcessor { //“0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ”(大写字母,小写字母和数字) boolean invalidClient = StringUtils.isNotBlank(clientId) && (mqttVersion == MqttVersion.MQTT_3_1 && clientId.length() > MqttCodecUtil.MAX_CLIENT_ID_LENGTH); ValidateUtils.isTrue(!invalidClient, "", () -> { - MqttConnAckMessage connAckMessage = connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED); - session.write(connAckMessage); - session.disconnect(); + connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED, session); LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName()); }); //如果客户端提供的 ClientId 为零字节且清理会话标志为 0, // 服务端必须发送返回码为 0x02(表示标识符不合格)的 CONNACK 报文响应客户端的 CONNECT 报文,然后关闭网络连接 ValidateUtils.isTrue(connectVariableHeader.isCleanSession() || !StringUtils.isBlank(clientId), "", () -> { - MqttConnAckMessage connAckMessage = connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED); - session.write(connAckMessage); - session.disconnect(); + connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED, session); LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName()); }); } private void refreshSession(BrokerContext context, MqttSession session, MqttConnectMessage mqttConnectMessage) { - MqttConnectPayload payload = mqttConnectMessage.getPayload(); session.setCleanSession(mqttConnectMessage.getVariableHeader().isCleanSession()); - String clientId = payload.clientIdentifier(); - //服务端可以允许客户端提供一个零字节的客户端标识符 (ClientId) ,如果这样做了,服务端必须将这看作特 - //殊情况并分配唯一的客户端标识符给那个客户端。然后它必须假设客户端提供了那个唯一的客户端标识符,正常处理这个 CONNECT 报文 - if (clientId.length() == 0) { - clientId = MqttUtil.createClientId(); - } - - MqttSession mqttSession = context.getSession(clientId); + MqttSession mqttSession = context.getSession(session.getClientId()); if (mqttSession != null) { if (session.isCleanSession()) { //如果清理会话(CleanSession)标志被设置为 1,客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。 @@ -135,7 +130,7 @@ public class ConnectProcessor implements MqttProcessor { mqttSession.disconnect(); //如果清理会话(CleanSession)标志被设置为 0,服务端必须基于当前会话(使用客户端标识符识别)的状态恢复与客户端的通信。 SessionStateProvider sessionStateProvider = context.getProviders().getSessionStateProvider(); - SessionState sessionState = sessionStateProvider.get(clientId); + SessionState sessionState = sessionStateProvider.get(session.getClientId()); if (sessionState != null) { session.getResponseConsumers().putAll(sessionState.getResponseConsumers()); sessionState.getSubscribers().forEach(session::subscribe); @@ -150,7 +145,6 @@ public class ConnectProcessor implements MqttProcessor { } } - session.setClientId(clientId); context.addSession(session); LOGGER.debug("add session for client:{}", session); } @@ -167,10 +161,12 @@ public class ConnectProcessor implements MqttProcessor { session.setWillMessage(publishMessage); } - private MqttConnAckMessage connFailAck(MqttConnectReturnCode returnCode) { + private void connFailAck(MqttConnectReturnCode returnCode, MqttSession session) { //如果服务端发送了一个包含非零返回码的 CONNACK 报文,它必须将当前会话标志设置为 0 ValidateUtils.isTrue(returnCode != CONNECTION_ACCEPTED, ""); - return connAck(returnCode, false); + MqttConnAckMessage badProto = connAck(returnCode, false); + session.write(badProto); + session.disconnect(); } private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java index d40b2b56efb2acf7f3df58499cf73ec620019837..693d12a8d601c72903178f1bdaafee6fd26024f7 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java @@ -25,8 +25,7 @@ public class SubscribeProcessor extends AuthorizedMqttProcessor true; + public SessionStateProvider getSessionStateProvider() { return sessionStateProvider; } @@ -39,4 +41,19 @@ public class Providers { this.persistenceProvider = persistenceProvider; } + public ConnectAuthenticationProvider getConnectAuthenticationProvider() { + return connectAuthenticationProvider; + } + + public void setConnectAuthenticationProvider(ConnectAuthenticationProvider connectAuthenticationProvider) { + this.connectAuthenticationProvider = connectAuthenticationProvider; + } + + public SubscribeProvider getSubscribeProvider() { + return subscribeProvider; + } + + public void setSubscribeProvider(SubscribeProvider subscribeProvider) { + this.subscribeProvider = subscribeProvider; + } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java similarity index 79% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java index 84b20d9eba16b6538379c0a353f5466672ac4e4a..9d2d4239c30c6a745589fc1b952ad65f5a32f423 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java @@ -1,4 +1,6 @@ -package org.smartboot.mqtt.broker.persistence.session; +package org.smartboot.mqtt.broker.provider; + +import org.smartboot.mqtt.broker.provider.impl.session.SessionState; /** * 会话状态Provider diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..b2327695b795c5224977823378e50415ec413a15 --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java @@ -0,0 +1,12 @@ +package org.smartboot.mqtt.broker.provider; + +import org.smartboot.mqtt.broker.MqttSession; + +/** + * Topic订阅 + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 2022/12/28 + */ +public interface SubscribeProvider { + boolean subscribeTopic(String topicFilter, MqttSession session); +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java similarity index 54% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java index 90a01ca01567f318382f1ce42658de98dcf63fc7..01d64cc8c567309b14da72b54268c09a5df970fb 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java @@ -1,8 +1,14 @@ -package org.smartboot.mqtt.broker; +package org.smartboot.mqtt.broker.provider.impl; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.BrokerConfigure; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.broker.provider.ConnectAuthenticationProvider; +import org.smartboot.mqtt.common.message.MqttConnectMessage; +import org.smartboot.mqtt.common.util.MqttUtil; import java.util.Objects; @@ -11,21 +17,24 @@ import java.util.Objects; * @date 2022-08-05 16:45:50 * @since 1.0.0 */ -public class ConfiguredAuthenticationServiceImpl implements AuthenticationService { +public class ConfiguredConnectAuthenticationProviderImpl implements ConnectAuthenticationProvider { - private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredAuthenticationServiceImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredConnectAuthenticationProviderImpl.class); private final BrokerConfigure configure; - public ConfiguredAuthenticationServiceImpl(BrokerContext context) { + public ConfiguredConnectAuthenticationProviderImpl(BrokerContext context) { configure = context.getBrokerConfigure(); } + @Override - public boolean authentication(String username, String password, MqttSession session) { + public boolean authentication(MqttConnectMessage connectMessage, MqttSession session) { + String username = connectMessage.getPayload().userName(); + String password = connectMessage.getPayload().passwordInBytes() == null ? "" : new String(connectMessage.getPayload().passwordInBytes()); String configuredUsername = configure.getUsername(); String configuredPassword = configure.getPassword(); - String host = getHost(session); + String host = MqttUtil.getRemoteAddress(session); if (StringUtils.isEmpty(configuredPassword) || StringUtils.isEmpty(configuredUsername)) { @@ -36,18 +45,10 @@ public class ConfiguredAuthenticationServiceImpl implements AuthenticationServic boolean auth = Objects.equals(configuredUsername, username) && Objects.equals(configuredPassword, password); if (auth) { LOGGER.info("auth success, ip:{} clientId: {}, username: {}", host, session.getClientId(), username); - } else { + } else { LOGGER.info("auth failed, ip:{} clientId: {}, username: {}", host, session.getClientId(), username); } return auth; } - - private static String getHost(MqttSession session) { - try { - return session.getRemoteAddress().getHostName(); - } catch (Exception e) { - return ""; - } - } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java similarity index 96% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java index b2f4de4804d39d8b3256ec7e078bcbdcb986938e..eccb87bb424e57efdfe37b1e442829997ac26d9a 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.message; +package org.smartboot.mqtt.broker.provider.impl.message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java similarity index 92% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java index 349c30df2147efd69eb036510a4f335984b966c7..60ab977f237dd6c0f9d75673158c63df82b4252b 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java @@ -1,5 +1,6 @@ -package org.smartboot.mqtt.broker.persistence.message; +package org.smartboot.mqtt.broker.provider.impl.message; +import org.smartboot.mqtt.broker.provider.PersistenceProvider; import org.smartboot.mqtt.common.message.MqttPublishMessage; import java.util.concurrent.ConcurrentHashMap; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java similarity index 95% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java index 0a556a9ca166a2036e6e8111e8af28084ce8e592..9b0caf7759e7762a63e7b41aef899e75dbabb5f1 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.message; +package org.smartboot.mqtt.broker.provider.impl.message; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.message.MqttPublishMessage; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java similarity index 84% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java index 13b4d12f0e87602f724d1af29ae964a3d4392606..b72ea7be8d05b2e75fd68746ee48c8539020bc44 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java @@ -1,4 +1,6 @@ -package org.smartboot.mqtt.broker.persistence.session; +package org.smartboot.mqtt.broker.provider.impl.session; + +import org.smartboot.mqtt.broker.provider.SessionStateProvider; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java similarity index 91% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java index 4b1bdafaf21ded08bb42ac81079792a4b73bdc20..a9e6904cf63343c6fa9008077dd766c370c46943 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.session; +package org.smartboot.mqtt.broker.provider.impl.session; import org.smartboot.mqtt.common.AckMessage; import org.smartboot.mqtt.common.enums.MqttQoS; diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index fdc187fb8f41eb27d73dca2bd32d6d11c5d934d3..6ffee2e18eb4e0f80aecc497d899a1490f059876 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.11 + 0.12 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 22da02a637c278707285b8c3b759f5d33417051a..a65febd1eb026978a97f86390be8bef5bf7d5d9d 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.11 + 0.12 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java index 80519e345b0d1105f5b338801dce603ab1053a61..5bf0c08817d127196289740afe91da248c0e064e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java @@ -2,13 +2,9 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; -import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.*; import org.smartboot.mqtt.common.util.ValidateUtils; import java.util.Objects; @@ -26,7 +22,7 @@ public abstract class QosPublisher { //至少一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubAckMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBACK, "invalid message type"); future.complete(true); session.responseConsumers.remove(cacheKey); LOGGER.info("Qos1消息发送成功..."); @@ -44,13 +40,13 @@ public abstract class QosPublisher { CompletableFuture publishFuture = new CompletableFuture<>(); //只有一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubRecMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBREC, "invalid message type"); ValidateUtils.isTrue(Objects.equals(message.getVariableHeader().getPacketId(), publishMessage.getVariableHeader().getPacketId()), "invalid packetId"); publishFuture.complete(true); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(message.getVariableHeader().getPacketId()); CompletableFuture pubRelFuture = new CompletableFuture<>(); session.responseConsumers.put(cacheKey, new AckMessage(pubRelMessage, compMessage -> { - ValidateUtils.isTrue(compMessage instanceof MqttPubCompMessage, "invalid message type"); + ValidateUtils.isTrue(compMessage.getFixedHeader().getMessageType() == MqttMessageType.PUBCOMP, "invalid message type"); ValidateUtils.isTrue(Objects.equals(compMessage.getVariableHeader().getPacketId(), pubRelMessage.getVariableHeader().getPacketId()), "invalid packetId"); pubRelFuture.complete(true); LOGGER.info("Qos2消息发送成功..."); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java index 8084318396a1ef2bd045a7dd7e1302d9651aa129..aa9554bc8f3b9f14548f6bf995ac84c5e9e4673c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java @@ -4,6 +4,7 @@ package org.smartboot.mqtt.common.enums; * */ public enum MqttConnectReturnCode { + //MQTT3 CONNECTION_ACCEPTED((byte) 0x00, "连接已被服务端接受"), CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01, "服务端不支持客户端请求的 MQTT 协议级别"), CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02, "客户端标识符是正确的 UTF-8 编码,但服务 端不允许使用"), @@ -11,7 +12,6 @@ public enum MqttConnectReturnCode { CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04, "用户名或密码的数据格式无效"), CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"); - private final byte code; private final String desc; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java index cfd7a6c9d972b2c4473740396b03ea2181dd4728..94fcc5c5ef686136b95e6151b99037b48bfe4f56 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java @@ -5,7 +5,7 @@ package org.smartboot.mqtt.common.enums; * @version V1.0 , 2022/3/23 */ public enum MqttProtocolEnum { - MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"); + MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"), MQTT_5("MQTT"); /** * 协议名 */ diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java new file mode 100644 index 0000000000000000000000000000000000000000..aecd578d8306062ba3266cfcea5521219d9ce5cf --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java @@ -0,0 +1,78 @@ +package org.smartboot.mqtt.common.enums; + +/** + * + */ +public enum MqttReasonCode { + //MQTT5 + SUCCESS((byte) 0x00, "成功"), + NORMAL_DISCONNECTION((byte) 0x00, "断开连接"), + GRANTED_QOS0((byte) 0x00, "最大允许qos为0"), + GRANTED_QOS1((byte) 0x01, "最大允许qos为1"), + GRANTED_QOS2((byte) 0x02, "最大允许qos为2"), + DISCONNECT_WITH_WILL_MESSAGE((byte) 0x04, "客户端需要断开连接后发送遗嘱消息"), + NO_MATCHING_SUBSCRIBERS((byte) 0X10, "无匹配的订阅者"), + N0_SUBSCRIPTION_EXISTED((byte) 0X11, ""), + CONTINUE_AUTHENTICATION((byte) 0X18, ""), + RE_AUTHENTICATE((byte) 0X19, ""), + UNSPECIFIED_ERROR((byte) 0x80, "未指明的错误"), + MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), + PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), + IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83, "接收者不接受"), + UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), + CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), + BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), + NOT_AUTHORIZED((byte) 0x87, "未授权"), + SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), + SERVER_BUSY((byte) 0x89, "服务端繁忙中"), + BANNED((byte) 0x8A, "客户端被禁用"), + SERVER_SHUTTING_DOWN((byte) 0x8B, ""), + BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), + KEEP_ALIVE_TIMEOUT((byte) 0x8D, ""), + SESSION_TAKEN_OVER((byte) 0x8E, "相同客户端id上线导致被踢出下线"), + TOPIC_FILTER_INVALID((byte) 0x8F, "消息过滤非法"), + TOPIC_NAME_INVALID((byte) 0x90, "topic名非法"), + PACKET_IDENTIFIER_IN_USE((byte) 0x91, "packetId已被使用"), + PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92, ""), + RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93, ""), + TOPIC_ALIAS_INVALID((byte) 0x94, ""), + PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), + MESSAGE_RATE_TOO_HIGH((byte) 0x96, ""), + QUOTA_EXCEEDED((byte) 0x97, "已超限"), + ADMINISTRATIVE_ACTION((byte) 0x98, ""), + PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), + RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), + QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), + USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), + SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), + SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E, ""), + CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"), + MAXIMUM_CONNECT_TIME((byte) 0xA0, ""), + SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1, ""), + WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2, ""); + + private final byte code; + private final String desc; + + MqttReasonCode(byte code, String desc) { + this.code = code; + this.desc = desc; + } + + public static MqttReasonCode valueOf(byte b) { + for (MqttReasonCode v : values()) { + if (b == v.code) { + return v; + } + } + throw new IllegalArgumentException("unknown reason code: " + (b & 0xFF)); + } + + public byte getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java index 5aa5040fd73a786dd097c92b5788a4a0aff94d9b..58ee607c299686775274b08f7f5d4f364d543660 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java @@ -1,7 +1,7 @@ package org.smartboot.mqtt.common.enums; public enum MqttVersion { - MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4); + MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4), MQTT_5(MqttProtocolEnum.MQTT_5, (byte) 5); private final MqttProtocolEnum protocol; private final byte level; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java index 0a8193f2a460a15afc9303136461868d11dad95b..cb5a0f95e517d9c21a58828bdc2cee20f58a63a3 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java @@ -2,6 +2,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.socket.util.BufferUtils; import java.nio.ByteBuffer; @@ -22,7 +23,7 @@ public class MqttConnAckMessage extends MqttVariableMessage { + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader) { + super(mqttFixedHeader); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId) { + this(mqttFixedHeader, packetId, (byte)0, null); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId, byte reasonCode, MqttProperties mqttProperties) { + super(mqttFixedHeader); + setVariableHeader(new MqttPubReplyVariableHeader(packetId, reasonCode, mqttProperties)); + } + + @Override + public final void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { + int packetId = buffer.getShort(); + MqttPubReplyVariableHeader header; + if (unit.mqttVersion == MqttVersion.MQTT_5){ + byte reasonCode = buffer.get(); + byte propertyLen = buffer.get(); + header = new MqttPubReplyVariableHeader(packetId, reasonCode, null); + }else { + header = new MqttPubReplyVariableHeader(packetId, (byte)0, null); + } + setVariableHeader(header); + } + + + @Override + public void writeTo(MqttWriter mqttWriter) throws IOException { + MqttPubReplyVariableHeader variableHeader = getVariableHeader(); + int variableHeaderBufferSize = 2; // variable part only has a message id + mqttWriter.writeByte(getFixedHeaderByte1(fixedHeader)); + writeVariableLengthInt(mqttWriter, variableHeaderBufferSize); + mqttWriter.writeShort((short) variableHeader.getPacketId()); + mqttWriter.writeByte(variableHeader.getReasonCode()); + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java index 34fde45b05ac5e1f358b14d7bff9439da9585651..0b4b5a9e3207655ea2af8df4b4a16650d38be7e0 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java @@ -3,6 +3,8 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.exception.MqttProcessException; +import org.smartboot.mqtt.common.protocol.DecodeUnit; +import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.socket.util.BufferUtils; import org.smartboot.socket.util.DecoderException; @@ -141,7 +143,7 @@ public class MqttMessage extends ToString { * * @param buffer */ - public void decodeVariableHeader(ByteBuffer buffer) { + public void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java index 64ea394a58f497f9eba454871a95fd083fcd7623..ed85e1b970c850277fde68b8ddf5de4fd8a9b223 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java @@ -10,6 +10,12 @@ public class MqttPacketIdVariableHeader extends MqttVariableHeader { */ private int packetId; + public MqttPacketIdVariableHeader() { + } + + public MqttPacketIdVariableHeader(int packetId) { + this.packetId = packetId; + } public void setPacketId(int packetId) { this.packetId = packetId; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java index ff4fd6a4c7655c8bd3099bef33ea6ed43508cbc7..ee86a2176b48c69cc040ab04f7cfa2aafe7cacd3 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,7 +29,7 @@ public class MqttPacketIdentifierMessage extends MqttVariableMessage property type + */ + public abstract static class MqttProperty { + final T value; + final int propertyId; + + protected MqttProperty(int propertyId, T value) { + this.propertyId = propertyId; + this.value = value; + } + + /** + * Get MQTT property value + * + * @return property value + */ + public T value() { + return value; + } + + /** + * Get MQTT property ID + * @return property ID + */ + public int propertyId() { + return propertyId; + } + + @Override + public int hashCode() { + return propertyId + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + MqttProperty that = (MqttProperty) obj; + return this.propertyId == that.propertyId && this.value.equals(that.value); + } + } + + public static final class IntegerProperty extends MqttProperty { + + public IntegerProperty(int propertyId, Integer value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "IntegerProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringProperty extends MqttProperty { + + public StringProperty(int propertyId, String value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "StringProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringPair { + public final String key; + public final String value; + + public StringPair(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public int hashCode() { + return key.hashCode() + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + StringPair that = (StringPair) obj; + + return that.key.equals(this.key) && that.value.equals(this.value); + } + } + + //User properties are the only properties that may be included multiple times and + //are the only properties where ordering is required. Therefore, they need a special handling + public static final class UserProperties extends MqttProperty> { + public UserProperties() { + super(MqttPropertyType.USER_PROPERTY.value, new ArrayList()); + } + + /** + * Create user properties from the collection of the String pair values + * + * @param values string pairs. Collection entries are copied, collection itself isn't shared + */ + public UserProperties(Collection values) { + this(); + this.value.addAll(values); + } + + private static UserProperties fromUserPropertyCollection(Collection properties) { + UserProperties userProperties = new UserProperties(); + for (UserProperty property: properties) { + userProperties.add(new StringPair(property.value.key, property.value.value)); + } + return userProperties; + } + + public void add(StringPair pair) { + this.value.add(pair); + } + + public void add(String key, String value) { + this.value.add(new StringPair(key, value)); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("UserProperties("); + boolean first = true; + for (StringPair pair: value) { + if (!first) { + builder.append(", "); + } + builder.append(pair.key + "->" + pair.value); + first = false; + } + builder.append(")"); + return builder.toString(); + } + } + + public static final class UserProperty extends MqttProperty { + public UserProperty(String key, String value) { + super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value)); + } + + @Override + public String toString() { + return "UserProperty(" + value.key + ", " + value.value + ")"; + } + } + + public static final class BinaryProperty extends MqttProperty { + + public BinaryProperty(int propertyId, byte[] value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "BinaryProperty(" + propertyId + ", " + value.length + " bytes)"; + } + } + + + private Map props; + private List userProperties; + private List subscriptionIds; + + public void add(MqttProperty property) { + Map props = this.props; + if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) { + List userProperties = this.userProperties; + if (userProperties == null) { + userProperties = new ArrayList(1); + this.userProperties = userProperties; + } + if (property instanceof UserProperty) { + userProperties.add((UserProperty) property); + } else if (property instanceof UserProperties) { + for (StringPair pair: ((UserProperties) property).value) { + userProperties.add(new UserProperty(pair.key, pair.value)); + } + } else { + throw new IllegalArgumentException("User property must be of UserProperty or UserProperties type"); + } + } else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null) { + subscriptionIds = new ArrayList(1); + this.subscriptionIds = subscriptionIds; + } + if (property instanceof IntegerProperty) { + subscriptionIds.add((IntegerProperty) property); + } else { + throw new IllegalArgumentException("Subscription ID must be an integer property"); + } + } else { + if (props == null) { + props = new HashMap<>(); + this.props = props; + } + props.put(property.propertyId, property); + } + } + + public Collection listAll() { + Map props = this.props; + if (props == null && subscriptionIds == null && userProperties == null) { + return Collections.emptyList(); + } + if (subscriptionIds == null && userProperties == null) { + return props.values(); + } + if (props == null && userProperties == null) { + return subscriptionIds; + } + List propValues = new ArrayList(props != null ? props.size() : 1); + if (props != null) { + propValues.addAll(props.values()); + } + if (subscriptionIds != null) { + propValues.addAll(subscriptionIds); + } + if (userProperties != null) { + propValues.add(UserProperties.fromUserPropertyCollection(userProperties)); + } + return propValues; + } + + public boolean isEmpty() { + Map props = this.props; + return props == null || props.isEmpty(); + } + + /** + * Get property by ID. If there are multiple properties of this type (can be with Subscription ID) + * then return the first one. + * + * @param propertyId ID of the property + * @return a property if it is set, null otherwise + */ + public MqttProperty getProperty(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + //special handling to keep compatibility with earlier versions + List userProperties = this.userProperties; + if (userProperties == null) { + return null; + } + return UserProperties.fromUserPropertyCollection(userProperties); + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null || subscriptionIds.isEmpty()) { + return null; + } + return subscriptionIds.get(0); + } + Map props = this.props; + return props == null ? null : props.get(propertyId); + } + + /** + * Get properties by ID. + * Some properties (Subscription ID and User Properties) may occur multiple times, + * this method returns all their values in order. + * + * @param propertyId ID of the property + * @return all properties having specified ID + */ + public List getProperties(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + return userProperties == null ? Collections.emptyList() : userProperties; + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + return subscriptionIds == null ? Collections.emptyList() : subscriptionIds; + } + Map props = this.props; + return (props == null || !props.containsKey(propertyId)) ? + Collections.emptyList() : + Collections.singletonList(props.get(propertyId)); + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java index 670a9ed9f1e679a9ac16aa33653af5c0734f3e18..f1b8f364631cea60bf8b2811bf65dc01b3deffff 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java @@ -9,7 +9,7 @@ public class MqttPubCompMessage extends MqttPacketIdentifierMessage { super(mqttFixedHeader); } - public MqttPubCompMessage(int mqttPacketIdVariableHeader) { - super(MqttFixedHeader.PUB_COMP_HEADER, mqttPacketIdVariableHeader); + public MqttPubCompMessage(int packetId) { + super(MqttFixedHeader.PUB_COMP_HEADER, packetId); } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..7b72dd97b2fb08e31d26070c6460c3e906f6bae3 --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java @@ -0,0 +1,24 @@ +package org.smartboot.mqtt.common.message; + +public class MqttPubReplyVariableHeader extends MqttPacketIdVariableHeader{ + + private final byte reasonCode; + private final MqttProperties properties; + + public MqttPubReplyVariableHeader(int packetId, byte reasonCode, MqttProperties properties) { + super(packetId); + if (packetId < 1 || packetId > 0xffff) { + throw new IllegalArgumentException("packetId: " + packetId + " (should be: 1 ~ 65535)"); + } + this.reasonCode = reasonCode; + this.properties = properties; + } + + public byte getReasonCode() { + return reasonCode; + } + + public MqttProperties getProperties() { + return properties; + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java index d6280122bef726fbfdc0043ca9cec9148943591e..39e508d1580df8b689b1ec3550fe2c968071a54c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.socket.util.DecoderException; @@ -25,7 +26,7 @@ public class MqttPublishMessage extends MqttVariableMessage { if (payloadBuffer.remaining() < remainingLength) { break; } - unit.mqttMessage.decodeVariableHeader(payloadBuffer); + unit.mqttMessage.decodeVariableHeader(unit, payloadBuffer); unit.state = READ_PAYLOAD; @@ -181,10 +182,4 @@ public class MqttProtocol implements Protocol { READ_FIXED_HEADER, READ_VARIABLE_HEADER, READ_PAYLOAD, FINISH, } - class DecodeUnit { - DecoderState state; - MqttMessage mqttMessage; - - ByteBuffer disposableBuffer; - } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java index b472a137157f9759240e7a8586858ba294b03f62..40c5d51f0c0cbd6d9114b05e65fe0062bb926e2c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java @@ -1,5 +1,8 @@ package org.smartboot.mqtt.common.util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.MqttMessageBuilders; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.message.MqttPublishMessage; @@ -11,6 +14,7 @@ import java.util.UUID; * @version V1.0 , 2022/3/29 */ public class MqttUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(MqttUtil.class); /** * Topic 通配符 */ @@ -33,4 +37,12 @@ public class MqttUtil { return UUID.randomUUID().toString().replace("-", ""); } + public static String getRemoteAddress(AbstractSession session) { + try { + return session.getRemoteAddress().toString(); + } catch (Exception e) { + LOGGER.error("getRemoteAddress exception", e); + return ""; + } + } }