From c8e5573daa8222fe3f78e3150bd0d465783b4c15 Mon Sep 17 00:00:00 2001 From: cea <396157168@qq.com> Date: Sun, 25 Dec 2022 22:37:05 +0800 Subject: [PATCH 01/12] =?UTF-8?q?update:=20mqtt5=20connect=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E4=BD=93=E7=9B=B8=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/enums/MqttConnectReturnCode.java | 25 +- .../mqtt/common/enums/MqttProtocolEnum.java | 2 +- .../mqtt/common/enums/MqttVersion.java | 2 +- .../message/MqttConnAckVariableHeader.java | 11 + .../common/message/MqttConnectPayload.java | 13 +- .../message/MqttConnectVariableHeader.java | 20 + .../mqtt/common/message/MqttProperties.java | 374 ++++++++++++++++++ 7 files changed, 443 insertions(+), 4 deletions(-) create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java index 80843183..63eb04fe 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java @@ -4,13 +4,36 @@ package org.smartboot.mqtt.common.enums; * */ public enum MqttConnectReturnCode { + //MQTT3 CONNECTION_ACCEPTED((byte) 0x00, "连接已被服务端接受"), CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01, "服务端不支持客户端请求的 MQTT 协议级别"), CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02, "客户端标识符是正确的 UTF-8 编码,但服务 端不允许使用"), CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03, "网络连接已建立,但 MQTT 服务不可用"), CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04, "用户名或密码的数据格式无效"), - CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"); + CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"), + //MQTT5 + CONNECTION_REFUSED_UNSPECIFIED_ERROR((byte) 0x80, "未识别的错误"), + CONNECTION_REFUSED_MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), + CONNECTION_REFUSED_PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), + CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC((byte) 0x83, "服务端拒绝了连接"), + CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), + CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), + CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), + CONNECTION_REFUSED_NOT_AUTHORIZED_5((byte) 0x87, "未授权"), + CONNECTION_REFUSED_SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), + CONNECTION_REFUSED_SERVER_BUSY((byte) 0x89, "服务端繁忙中"), + CONNECTION_REFUSED_BANNED((byte) 0x8A, "客户端被禁用"), + CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), + CONNECTION_REFUSED_TOPIC_NAME_INVALID((byte) 0x90, "topic不被允许"), + CONNECTION_REFUSED_PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), + CONNECTION_REFUSED_QUOTA_EXCEEDED((byte) 0x97, "已超限"), + CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), + CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), + CONNECTION_REFUSED_QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), + CONNECTION_REFUSED_USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), + CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), + CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"); private final byte code; private final String desc; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java index cfd7a6c9..94fcc5c5 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java @@ -5,7 +5,7 @@ package org.smartboot.mqtt.common.enums; * @version V1.0 , 2022/3/23 */ public enum MqttProtocolEnum { - MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"); + MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"), MQTT_5("MQTT"); /** * 协议名 */ diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java index 5aa5040f..58ee607c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java @@ -1,7 +1,7 @@ package org.smartboot.mqtt.common.enums; public enum MqttVersion { - MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4); + MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4), MQTT_5(MqttProtocolEnum.MQTT_5, (byte) 5); private final MqttProtocolEnum protocol; private final byte level; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java index ef478d4b..bde5cd72 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java @@ -14,9 +14,16 @@ public class MqttConnAckVariableHeader extends MqttVariableHeader { */ private final boolean sessionPresent; + private final MqttProperties properties; + + public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent) { + this(connectReturnCode, sessionPresent, null); + } + public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent, MqttProperties properties) { this.connectReturnCode = connectReturnCode; this.sessionPresent = sessionPresent; + this.properties = properties; } public MqttConnectReturnCode connectReturnCode() { @@ -26,4 +33,8 @@ public class MqttConnAckVariableHeader extends MqttVariableHeader { public boolean isSessionPresent() { return sessionPresent; } + + public MqttProperties getProperties() { + return properties; + } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java index 3e9b066c..23336141 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java @@ -11,6 +11,10 @@ public final class MqttConnectPayload { * 客户端标识符 */ private final String clientIdentifier; + /** + * 遗嘱属性 + */ + private final MqttProperties willProperties; /** * 遗嘱主题 */ @@ -29,11 +33,16 @@ public final class MqttConnectPayload { private final byte[] password; public MqttConnectPayload(String clientIdentifier, String willTopic, byte[] willMessage, String userName, byte[] password) { + this(clientIdentifier, willTopic, willMessage, userName, password, null); + } + + public MqttConnectPayload(String clientIdentifier, String willTopic, byte[] willMessage, String userName, byte[] password, MqttProperties willProperties) { this.clientIdentifier = clientIdentifier; this.willTopic = willTopic; this.willMessage = willMessage; this.userName = userName; this.password = password; + this.willProperties = willProperties; } public String clientIdentifier() { @@ -56,5 +65,7 @@ public final class MqttConnectPayload { return password; } - + public MqttProperties getWillProperties() { + return willProperties; + } } \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java index 02572cd9..5943349a 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java @@ -25,12 +25,22 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { private final boolean isCleanSession; private final int reserved; private final int keepAliveTimeSeconds; + private final MqttProperties properties; public MqttConnectVariableHeader( String name, byte protocolLevel, int connectFlag, int keepAliveTimeSeconds) { + this(name, protocolLevel, connectFlag, keepAliveTimeSeconds, null); + } + + public MqttConnectVariableHeader( + String name, + byte protocolLevel, + int connectFlag, + int keepAliveTimeSeconds, + MqttProperties properties) { this.protocolName = name; this.protocolLevel = protocolLevel; this.hasUserName = (connectFlag & 0x80) == 0x80; @@ -41,9 +51,14 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { this.isCleanSession = (connectFlag & 0x02) == 0x02; this.reserved = (connectFlag & 0x01); this.keepAliveTimeSeconds = keepAliveTimeSeconds; + this.properties = properties; } public MqttConnectVariableHeader(MqttVersion mqttVersion, boolean hasUserName, boolean hasPassword, WillMessage willMessage, boolean isCleanSession, int keepAliveTimeSeconds) { + this(mqttVersion, hasUserName, hasPassword, willMessage, isCleanSession, keepAliveTimeSeconds, null); + } + + public MqttConnectVariableHeader(MqttVersion mqttVersion, boolean hasUserName, boolean hasPassword, WillMessage willMessage, boolean isCleanSession, int keepAliveTimeSeconds, MqttProperties properties) { this.protocolName = mqttVersion.protocolName(); this.protocolLevel = mqttVersion.protocolLevel(); this.hasUserName = hasUserName; @@ -55,8 +70,10 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { //服务端必须验证 CONNECT 控制报文的保留标志位(第 0 位)是否为 0,如果不为 0 必须断开客户端连接 this.reserved = 0; this.keepAliveTimeSeconds = keepAliveTimeSeconds; + this.properties = properties; } + public String protocolName() { return protocolName; } @@ -97,4 +114,7 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { return reserved; } + public MqttProperties getProperties() { + return properties; + } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java new file mode 100644 index 00000000..2539dae4 --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java @@ -0,0 +1,374 @@ +package org.smartboot.mqtt.common.message; + +import java.util.*; + +/** + * MQTT Properties container from netty + * */ +public final class MqttProperties { + + public enum MqttPropertyType { + // single byte properties + PAYLOAD_FORMAT_INDICATOR(0x01), + REQUEST_PROBLEM_INFORMATION(0x17), + REQUEST_RESPONSE_INFORMATION(0x19), + MAXIMUM_QOS(0x24), + RETAIN_AVAILABLE(0x25), + WILDCARD_SUBSCRIPTION_AVAILABLE(0x28), + SUBSCRIPTION_IDENTIFIER_AVAILABLE(0x29), + SHARED_SUBSCRIPTION_AVAILABLE(0x2A), + + // two bytes properties + SERVER_KEEP_ALIVE(0x13), + RECEIVE_MAXIMUM(0x21), + TOPIC_ALIAS_MAXIMUM(0x22), + TOPIC_ALIAS(0x23), + + // four bytes properties + PUBLICATION_EXPIRY_INTERVAL(0x02), + SESSION_EXPIRY_INTERVAL(0x11), + WILL_DELAY_INTERVAL(0x18), + MAXIMUM_PACKET_SIZE(0x27), + + // Variable Byte Integer + SUBSCRIPTION_IDENTIFIER(0x0B), + + // UTF-8 Encoded String properties + CONTENT_TYPE(0x03), + RESPONSE_TOPIC(0x08), + ASSIGNED_CLIENT_IDENTIFIER(0x12), + AUTHENTICATION_METHOD(0x15), + RESPONSE_INFORMATION(0x1A), + SERVER_REFERENCE(0x1C), + REASON_STRING(0x1F), + USER_PROPERTY(0x26), + + // Binary Data + CORRELATION_DATA(0x09), + AUTHENTICATION_DATA(0x16); + + private static final MqttPropertyType[] VALUES; + + static { + VALUES = new MqttPropertyType[43]; + for (MqttPropertyType v : values()) { + VALUES[v.value] = v; + } + } + + private final int value; + + MqttPropertyType(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static MqttPropertyType valueOf(int type) { + MqttPropertyType t = null; + try { + t = VALUES[type]; + } catch (ArrayIndexOutOfBoundsException ignored) { + // nop + } + if (t == null) { + throw new IllegalArgumentException("unknown property type: " + type); + } + return t; + } + } + + /** + * MQTT property base class + * + * @param property type + */ + public abstract static class MqttProperty { + final T value; + final int propertyId; + + protected MqttProperty(int propertyId, T value) { + this.propertyId = propertyId; + this.value = value; + } + + /** + * Get MQTT property value + * + * @return property value + */ + public T value() { + return value; + } + + /** + * Get MQTT property ID + * @return property ID + */ + public int propertyId() { + return propertyId; + } + + @Override + public int hashCode() { + return propertyId + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + MqttProperty that = (MqttProperty) obj; + return this.propertyId == that.propertyId && this.value.equals(that.value); + } + } + + public static final class IntegerProperty extends MqttProperty { + + public IntegerProperty(int propertyId, Integer value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "IntegerProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringProperty extends MqttProperty { + + public StringProperty(int propertyId, String value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "StringProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringPair { + public final String key; + public final String value; + + public StringPair(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public int hashCode() { + return key.hashCode() + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + StringPair that = (StringPair) obj; + + return that.key.equals(this.key) && that.value.equals(this.value); + } + } + + //User properties are the only properties that may be included multiple times and + //are the only properties where ordering is required. Therefore, they need a special handling + public static final class UserProperties extends MqttProperty> { + public UserProperties() { + super(MqttPropertyType.USER_PROPERTY.value, new ArrayList()); + } + + /** + * Create user properties from the collection of the String pair values + * + * @param values string pairs. Collection entries are copied, collection itself isn't shared + */ + public UserProperties(Collection values) { + this(); + this.value.addAll(values); + } + + private static UserProperties fromUserPropertyCollection(Collection properties) { + UserProperties userProperties = new UserProperties(); + for (UserProperty property: properties) { + userProperties.add(new StringPair(property.value.key, property.value.value)); + } + return userProperties; + } + + public void add(StringPair pair) { + this.value.add(pair); + } + + public void add(String key, String value) { + this.value.add(new StringPair(key, value)); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("UserProperties("); + boolean first = true; + for (StringPair pair: value) { + if (!first) { + builder.append(", "); + } + builder.append(pair.key + "->" + pair.value); + first = false; + } + builder.append(")"); + return builder.toString(); + } + } + + public static final class UserProperty extends MqttProperty { + public UserProperty(String key, String value) { + super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value)); + } + + @Override + public String toString() { + return "UserProperty(" + value.key + ", " + value.value + ")"; + } + } + + public static final class BinaryProperty extends MqttProperty { + + public BinaryProperty(int propertyId, byte[] value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "BinaryProperty(" + propertyId + ", " + value.length + " bytes)"; + } + } + + + private Map props; + private List userProperties; + private List subscriptionIds; + + public void add(MqttProperty property) { + Map props = this.props; + if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) { + List userProperties = this.userProperties; + if (userProperties == null) { + userProperties = new ArrayList(1); + this.userProperties = userProperties; + } + if (property instanceof UserProperty) { + userProperties.add((UserProperty) property); + } else if (property instanceof UserProperties) { + for (StringPair pair: ((UserProperties) property).value) { + userProperties.add(new UserProperty(pair.key, pair.value)); + } + } else { + throw new IllegalArgumentException("User property must be of UserProperty or UserProperties type"); + } + } else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null) { + subscriptionIds = new ArrayList(1); + this.subscriptionIds = subscriptionIds; + } + if (property instanceof IntegerProperty) { + subscriptionIds.add((IntegerProperty) property); + } else { + throw new IllegalArgumentException("Subscription ID must be an integer property"); + } + } else { + if (props == null) { + props = new HashMap<>(); + this.props = props; + } + props.put(property.propertyId, property); + } + } + + public Collection listAll() { + Map props = this.props; + if (props == null && subscriptionIds == null && userProperties == null) { + return Collections.emptyList(); + } + if (subscriptionIds == null && userProperties == null) { + return props.values(); + } + if (props == null && userProperties == null) { + return subscriptionIds; + } + List propValues = new ArrayList(props != null ? props.size() : 1); + if (props != null) { + propValues.addAll(props.values()); + } + if (subscriptionIds != null) { + propValues.addAll(subscriptionIds); + } + if (userProperties != null) { + propValues.add(UserProperties.fromUserPropertyCollection(userProperties)); + } + return propValues; + } + + public boolean isEmpty() { + Map props = this.props; + return props == null || props.isEmpty(); + } + + /** + * Get property by ID. If there are multiple properties of this type (can be with Subscription ID) + * then return the first one. + * + * @param propertyId ID of the property + * @return a property if it is set, null otherwise + */ + public MqttProperty getProperty(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + //special handling to keep compatibility with earlier versions + List userProperties = this.userProperties; + if (userProperties == null) { + return null; + } + return UserProperties.fromUserPropertyCollection(userProperties); + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null || subscriptionIds.isEmpty()) { + return null; + } + return subscriptionIds.get(0); + } + Map props = this.props; + return props == null ? null : props.get(propertyId); + } + + /** + * Get properties by ID. + * Some properties (Subscription ID and User Properties) may occur multiple times, + * this method returns all their values in order. + * + * @param propertyId ID of the property + * @return all properties having specified ID + */ + public List getProperties(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + return userProperties == null ? Collections.emptyList() : userProperties; + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + return subscriptionIds == null ? Collections.emptyList() : subscriptionIds; + } + Map props = this.props; + return (props == null || !props.containsKey(propertyId)) ? + Collections.emptyList() : + Collections.singletonList(props.get(propertyId)); + } +} -- Gitee From 9b8c1de742ccb6c3dc927923e6ba07f7fc7af421 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 28 Dec 2022 19:39:11 +0800 Subject: [PATCH 02/12] =?UTF-8?q?=E5=BC=80=E6=94=BE=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E8=AE=A4=E8=AF=81=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/BrokerContextImpl.java | 14 +++++- ...redConnectAuthenticationProviderImpl.java} | 31 +++++++------ .../mqtt/broker/TopicSubscriber.java | 2 +- .../ConnectAuthenticationSubscriber.java | 45 ------------------- .../message/MemoryPersistenceProvider.java | 1 + .../session/MemorySessionStateProvider.java | 2 + .../ConnectAuthenticationProvider.java} | 14 +++--- .../provider}/PersistenceProvider.java | 3 +- .../broker/plugin/provider/Providers.java | 11 ++++- .../provider}/SessionStateProvider.java | 4 +- .../broker/processor/ConnectProcessor.java | 2 +- 11 files changed, 57 insertions(+), 72 deletions(-) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{ConfiguredAuthenticationServiceImpl.java => ConfiguredConnectAuthenticationProviderImpl.java} (67%) delete mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{AuthenticationService.java => plugin/provider/ConnectAuthenticationProvider.java} (32%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{persistence/message => plugin/provider}/PersistenceProvider.java (85%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{persistence/session => plugin/provider}/SessionStateProvider.java (78%) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 4f891426..57d568c6 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -6,7 +6,6 @@ import com.alibaba.fastjson2.JSONReader; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.eventbus.ConnectAuthenticationSubscriber; import org.smartboot.mqtt.broker.eventbus.ConnectIdleTimeMonitorSubscriber; import org.smartboot.mqtt.broker.eventbus.KeepAliveMonitorSubscriber; import org.smartboot.mqtt.broker.eventbus.ServerEventType; @@ -93,6 +92,8 @@ public class BrokerContextImpl implements BrokerContext { updateBrokerConfigure(); + initProvider(); + subscribeEventBus(); subscribeMessageBus(); @@ -125,6 +126,11 @@ public class BrokerContextImpl implements BrokerContext { configJson = null; } + private void initProvider() { + //连接鉴权处理器 + providers.setConnectAuthenticationProvider(new ConfiguredConnectAuthenticationProviderImpl(this)); + } + private void initPushThread() { if (brokerConfigure.getTopicLimit() <= 0) { brokerConfigure.setTopicLimit(10); @@ -209,7 +215,11 @@ public class BrokerContextImpl implements BrokerContext { //连接鉴权超时监控 eventBus.subscribe(ServerEventType.SESSION_CREATE, new ConnectIdleTimeMonitorSubscriber(this)); //连接鉴权 - eventBus.subscribe(ServerEventType.CONNECT, new ConnectAuthenticationSubscriber(this)); + eventBus.subscribe(ServerEventType.CONNECT, (eventType, object) -> { + boolean suc = providers.getConnectAuthenticationProvider().authentication(object.getObject(), object.getSession()); + object.getSession().setAuthorized(suc); + object.getSession().setUsername(object.getObject().getPayload().userName()); + }); //保持连接状态监听,长时间没有消息通信将断开连接 eventBus.subscribe(ServerEventType.CONNECT, new KeepAliveMonitorSubscriber(this)); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java similarity index 67% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java index 90a01ca0..02728a8c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredAuthenticationServiceImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java @@ -3,6 +3,8 @@ package org.smartboot.mqtt.broker; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.plugin.provider.ConnectAuthenticationProvider; +import org.smartboot.mqtt.common.message.MqttConnectMessage; import java.util.Objects; @@ -11,18 +13,29 @@ import java.util.Objects; * @date 2022-08-05 16:45:50 * @since 1.0.0 */ -public class ConfiguredAuthenticationServiceImpl implements AuthenticationService { +public class ConfiguredConnectAuthenticationProviderImpl implements ConnectAuthenticationProvider { - private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredAuthenticationServiceImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredConnectAuthenticationProviderImpl.class); private final BrokerConfigure configure; - public ConfiguredAuthenticationServiceImpl(BrokerContext context) { + public ConfiguredConnectAuthenticationProviderImpl(BrokerContext context) { configure = context.getBrokerConfigure(); } + + private static String getHost(MqttSession session) { + try { + return session.getRemoteAddress().getHostName(); + } catch (Exception e) { + return ""; + } + } + @Override - public boolean authentication(String username, String password, MqttSession session) { + public boolean authentication(MqttConnectMessage connectMessage, MqttSession session) { + String username = connectMessage.getPayload().userName(); + String password = connectMessage.getPayload().passwordInBytes() == null ? "" : new String(connectMessage.getPayload().passwordInBytes()); String configuredUsername = configure.getUsername(); String configuredPassword = configure.getPassword(); String host = getHost(session); @@ -36,18 +49,10 @@ public class ConfiguredAuthenticationServiceImpl implements AuthenticationServic boolean auth = Objects.equals(configuredUsername, username) && Objects.equals(configuredPassword, password); if (auth) { LOGGER.info("auth success, ip:{} clientId: {}, username: {}", host, session.getClientId(), username); - } else { + } else { LOGGER.info("auth failed, ip:{} clientId: {}, username: {}", host, session.getClientId(), username); } return auth; } - - private static String getHost(MqttSession session) { - try { - return session.getRemoteAddress().getHostName(); - } catch (Exception e) { - return ""; - } - } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index b5fc2d50..a48bd173 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -3,7 +3,7 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage; -import org.smartboot.mqtt.broker.persistence.message.PersistenceProvider; +import org.smartboot.mqtt.broker.plugin.provider.PersistenceProvider; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java deleted file mode 100644 index 24f1b2d2..00000000 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectAuthenticationSubscriber.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.smartboot.mqtt.broker.eventbus; - -import org.smartboot.mqtt.broker.AuthenticationService; -import org.smartboot.mqtt.broker.BrokerContext; -import org.smartboot.mqtt.broker.ConfiguredAuthenticationServiceImpl; -import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; -import org.smartboot.mqtt.common.eventbus.EventType; -import org.smartboot.mqtt.common.message.MqttConnectMessage; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2022/7/16 - */ -public class ConnectAuthenticationSubscriber implements EventBusSubscriber> { - private final BrokerContext context; - private final AuthenticationService authenticationService; - - public ConnectAuthenticationSubscriber(BrokerContext context) { - this.context = context; - // TODO Determine use which implements. - this.authenticationService = new ConfiguredAuthenticationServiceImpl(context); - } - - @Override - public void subscribe(EventType> eventType, EventObject object) { -// String validUserName = context.getBrokerConfigure().getUsername(); -// if (StringUtils.isBlank(validUserName)) { -// object.getSession().setAuthorized(true); -// return; -// } - String userName = object.getObject().getPayload().userName(); - byte[] passwordBytes = object.getObject().getPayload().passwordInBytes(); - String password = passwordBytes == null ? "" : new String(passwordBytes); - - boolean result = authenticationService.authentication(userName, password, object.getSession()); - -// //身份验证 -// ValidateUtils.isTrue(StringUtils.equals(validUserName, userName) -// && (StringUtils.isBlank(context.getBrokerConfigure().getPassword()) -// || StringUtils.equals(password, context.getBrokerConfigure().getPassword())) -// , "login fail", object.getSession()::disconnect); - object.getSession().setAuthorized(result); - object.getSession().setUsername(userName); - } -} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java index 349c30df..ff61ff7f 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java @@ -1,5 +1,6 @@ package org.smartboot.mqtt.broker.persistence.message; +import org.smartboot.mqtt.broker.plugin.provider.PersistenceProvider; import org.smartboot.mqtt.common.message.MqttPublishMessage; import java.util.concurrent.ConcurrentHashMap; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java index 13b4d12f..ab50893e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java @@ -1,5 +1,7 @@ package org.smartboot.mqtt.broker.persistence.session; +import org.smartboot.mqtt.broker.plugin.provider.SessionStateProvider; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/AuthenticationService.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/ConnectAuthenticationProvider.java similarity index 32% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/AuthenticationService.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/ConnectAuthenticationProvider.java index 541a4756..45c37d55 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/AuthenticationService.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/ConnectAuthenticationProvider.java @@ -1,19 +1,21 @@ -package org.smartboot.mqtt.broker; +package org.smartboot.mqtt.broker.plugin.provider; + +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.common.message.MqttConnectMessage; /** * @author qinluo * @date 2022-08-05 16:42:40 * @since 1.0.0 */ -public interface AuthenticationService { +public interface ConnectAuthenticationProvider { /** * 进行用户名密码授权认证 * - * @param username 用户名 - * @param password 密码 + * @param connectMessage connect消息 * @param session 当前连接绘画 - * @return 是否认证成功 + * @return 是否认证成功 */ - boolean authentication(String username, String password, MqttSession session); + boolean authentication(MqttConnectMessage connectMessage, MqttSession session); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java similarity index 85% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java index 68b180a6..2c8701ff 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java @@ -1,5 +1,6 @@ -package org.smartboot.mqtt.broker.persistence.message; +package org.smartboot.mqtt.broker.plugin.provider; +import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; /** diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java index e0b08f9c..a53ad12c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java @@ -1,9 +1,7 @@ package org.smartboot.mqtt.broker.plugin.provider; import org.smartboot.mqtt.broker.persistence.message.MemoryPersistenceProvider; -import org.smartboot.mqtt.broker.persistence.message.PersistenceProvider; import org.smartboot.mqtt.broker.persistence.session.MemorySessionStateProvider; -import org.smartboot.mqtt.broker.persistence.session.SessionStateProvider; /** * @author 三刀(zhengjunweimail@163.com) @@ -15,6 +13,8 @@ public class Providers { private PersistenceProvider retainMessageProvider = new MemoryPersistenceProvider(); private PersistenceProvider persistenceProvider = new MemoryPersistenceProvider(); + private ConnectAuthenticationProvider connectAuthenticationProvider; + public SessionStateProvider getSessionStateProvider() { return sessionStateProvider; } @@ -39,4 +39,11 @@ public class Providers { this.persistenceProvider = persistenceProvider; } + public ConnectAuthenticationProvider getConnectAuthenticationProvider() { + return connectAuthenticationProvider; + } + + public void setConnectAuthenticationProvider(ConnectAuthenticationProvider connectAuthenticationProvider) { + this.connectAuthenticationProvider = connectAuthenticationProvider; + } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java similarity index 78% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java index 84b20d9e..c7b1a3a8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java @@ -1,4 +1,6 @@ -package org.smartboot.mqtt.broker.persistence.session; +package org.smartboot.mqtt.broker.plugin.provider; + +import org.smartboot.mqtt.broker.persistence.session.SessionState; /** * 会话状态Provider diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index d26c6506..183e0c71 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -8,7 +8,7 @@ import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.eventbus.EventObject; import org.smartboot.mqtt.broker.eventbus.ServerEventType; import org.smartboot.mqtt.broker.persistence.session.SessionState; -import org.smartboot.mqtt.broker.persistence.session.SessionStateProvider; +import org.smartboot.mqtt.broker.plugin.provider.SessionStateProvider; import org.smartboot.mqtt.common.MqttMessageBuilders; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; import org.smartboot.mqtt.common.enums.MqttProtocolEnum; -- Gitee From babe2115e58d0ab8d834fe38adbbeae941163c86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 28 Dec 2022 20:02:30 +0800 Subject: [PATCH 03/12] =?UTF-8?q?=E5=BC=80=E6=94=BE=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E8=AE=A4=E8=AF=81=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/BrokerContextImpl.java | 7 +---- .../broker/processor/ConnectProcessor.java | 31 +++++++++---------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 57d568c6..d7ab3575 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -214,12 +214,7 @@ public class BrokerContextImpl implements BrokerContext { }); //连接鉴权超时监控 eventBus.subscribe(ServerEventType.SESSION_CREATE, new ConnectIdleTimeMonitorSubscriber(this)); - //连接鉴权 - eventBus.subscribe(ServerEventType.CONNECT, (eventType, object) -> { - boolean suc = providers.getConnectAuthenticationProvider().authentication(object.getObject(), object.getSession()); - object.getSession().setAuthorized(suc); - object.getSession().setUsername(object.getObject().getPayload().userName()); - }); + //保持连接状态监听,长时间没有消息通信将断开连接 eventBus.subscribe(ServerEventType.CONNECT, new KeepAliveMonitorSubscriber(this)); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 183e0c71..358bd8b8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -42,11 +42,14 @@ public class ConnectProcessor implements MqttProcessor { //服务端必须按照 3.1 节的要求验证 CONNECT 报文,如果报文不符合规范,服务端不发送CONNACK 报文直接关闭网络连接 checkMessage(session, mqttConnectMessage); - // 先进行认证 + //连接鉴权 + ValidateUtils.isTrue(context.getProviders().getConnectAuthenticationProvider().authentication(mqttConnectMessage, session), "Client authentication failed", () -> connFailAck(CONNECTION_REFUSED_NOT_AUTHORIZED, session)); + + session.setAuthorized(true); + session.setUsername(mqttConnectMessage.getPayload().userName()); + + context.getEventBus().publish(ServerEventType.CONNECT, EventObject.newEventObject(session, mqttConnectMessage)); - if (!session.isAuthorized()) { - throw new IllegalStateException("Authorization failed"); - } //清理会话 refreshSession(context, session, mqttConnectMessage); @@ -84,11 +87,7 @@ public class ConnectProcessor implements MqttProcessor { // 如果发现不支持的协议级别,服务端必须给发送一个返回码为 0x01(不支持的协议级别)的 CONNACK 报文响应 //CONNECT 报文,然后断开客户端的连接 final MqttVersion mqttVersion = MqttVersion.getByProtocolWithVersion(protocol, connectVariableHeader.getProtocolLevel()); - ValidateUtils.notNull(mqttVersion, "invalid version", () -> { - MqttConnAckMessage badProto = connFailAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION); - session.write(badProto); - session.disconnect(); - }); + ValidateUtils.notNull(mqttVersion, "invalid version", () -> connFailAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, session)); //服务端必须验证 CONNECT 控制报文的保留标志位(第 0 位)是否为 0,如果不为 0 必须断开客户端连接。 ValidateUtils.isTrue(connectVariableHeader.getReserved() == 0, "", session::disconnect); @@ -98,17 +97,13 @@ public class ConnectProcessor implements MqttProcessor { //“0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ”(大写字母,小写字母和数字) boolean invalidClient = StringUtils.isNotBlank(clientId) && (mqttVersion == MqttVersion.MQTT_3_1 && clientId.length() > MqttCodecUtil.MAX_CLIENT_ID_LENGTH); ValidateUtils.isTrue(!invalidClient, "", () -> { - MqttConnAckMessage connAckMessage = connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED); - session.write(connAckMessage); - session.disconnect(); + connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED, session); LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName()); }); //如果客户端提供的 ClientId 为零字节且清理会话标志为 0, // 服务端必须发送返回码为 0x02(表示标识符不合格)的 CONNACK 报文响应客户端的 CONNECT 报文,然后关闭网络连接 ValidateUtils.isTrue(connectVariableHeader.isCleanSession() || !StringUtils.isBlank(clientId), "", () -> { - MqttConnAckMessage connAckMessage = connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED); - session.write(connAckMessage); - session.disconnect(); + connFailAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED, session); LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName()); }); } @@ -167,10 +162,12 @@ public class ConnectProcessor implements MqttProcessor { session.setWillMessage(publishMessage); } - private MqttConnAckMessage connFailAck(MqttConnectReturnCode returnCode) { + private void connFailAck(MqttConnectReturnCode returnCode, MqttSession session) { //如果服务端发送了一个包含非零返回码的 CONNACK 报文,它必须将当前会话标志设置为 0 ValidateUtils.isTrue(returnCode != CONNECTION_ACCEPTED, ""); - return connAck(returnCode, false); + MqttConnAckMessage badProto = connAck(returnCode, false); + session.write(badProto); + session.disconnect(); } private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) { -- Gitee From 188134985513c7d690f8836e15b4fa52eb37e919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 28 Dec 2022 20:14:53 +0800 Subject: [PATCH 04/12] =?UTF-8?q?=E8=AE=BE=E7=BD=AE=E6=9C=80=E5=B0=8F?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/broker/BrokerContextImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index d7ab3575..85f0f1a9 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -104,7 +104,7 @@ public class BrokerContextImpl implements BrokerContext { try { pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); - server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(brokerConfigure.getThreadNum()); + server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); server.start(); System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt"); @@ -262,7 +262,8 @@ public class BrokerContextImpl implements BrokerContext { } }); //打印消息日志 -// eventBus.subscribe(Arrays.asList(EventType.RECEIVE_MESSAGE, EventType.WRITE_MESSAGE), new MessageLoggerSubscriber()); +// eventBus.subscribe(Arrays.asList(EventType.RECEIVE_MESSAGE, EventType.WRITE_MESSAGE), new +// MessageLoggerSubscriber()); } private void notifyPush(BrokerTopic topic) { -- Gitee From c0fbaccb238f1196786360f213a52d8c4f3fbdc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 28 Dec 2022 20:26:48 +0800 Subject: [PATCH 05/12] =?UTF-8?q?=E4=BC=98=E5=8C=96clientId=E8=B5=8B?= =?UTF-8?q?=E5=80=BC=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/processor/ConnectProcessor.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 358bd8b8..7f731d4d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -38,6 +38,14 @@ public class ConnectProcessor implements MqttProcessor { @Override public void process(BrokerContext context, MqttSession session, MqttConnectMessage mqttConnectMessage) { // LOGGER.info("receive connect message:{}", mqttConnectMessage); + String clientId = mqttConnectMessage.getPayload().clientIdentifier(); + //服务端可以允许客户端提供一个零字节的客户端标识符 (ClientId) ,如果这样做了,服务端必须将这看作特 + //殊情况并分配唯一的客户端标识符给那个客户端。然后它必须假设客户端提供了那个唯一的客户端标识符,正常处理这个 CONNECT 报文 + if (clientId.length() == 0) { + clientId = MqttUtil.createClientId(); + } + session.setClientId(clientId); + //有效性校验 //服务端必须按照 3.1 节的要求验证 CONNECT 报文,如果报文不符合规范,服务端不发送CONNACK 报文直接关闭网络连接 checkMessage(session, mqttConnectMessage); @@ -109,16 +117,8 @@ public class ConnectProcessor implements MqttProcessor { } private void refreshSession(BrokerContext context, MqttSession session, MqttConnectMessage mqttConnectMessage) { - MqttConnectPayload payload = mqttConnectMessage.getPayload(); session.setCleanSession(mqttConnectMessage.getVariableHeader().isCleanSession()); - String clientId = payload.clientIdentifier(); - //服务端可以允许客户端提供一个零字节的客户端标识符 (ClientId) ,如果这样做了,服务端必须将这看作特 - //殊情况并分配唯一的客户端标识符给那个客户端。然后它必须假设客户端提供了那个唯一的客户端标识符,正常处理这个 CONNECT 报文 - if (clientId.length() == 0) { - clientId = MqttUtil.createClientId(); - } - - MqttSession mqttSession = context.getSession(clientId); + MqttSession mqttSession = context.getSession(session.getClientId()); if (mqttSession != null) { if (session.isCleanSession()) { //如果清理会话(CleanSession)标志被设置为 1,客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。 @@ -130,7 +130,7 @@ public class ConnectProcessor implements MqttProcessor { mqttSession.disconnect(); //如果清理会话(CleanSession)标志被设置为 0,服务端必须基于当前会话(使用客户端标识符识别)的状态恢复与客户端的通信。 SessionStateProvider sessionStateProvider = context.getProviders().getSessionStateProvider(); - SessionState sessionState = sessionStateProvider.get(clientId); + SessionState sessionState = sessionStateProvider.get(session.getClientId()); if (sessionState != null) { session.getResponseConsumers().putAll(sessionState.getResponseConsumers()); sessionState.getSubscribers().forEach(session::subscribe); @@ -145,7 +145,6 @@ public class ConnectProcessor implements MqttProcessor { } } - session.setClientId(clientId); context.addSession(session); LOGGER.debug("add session for client:{}", session); } -- Gitee From 563f429c62736a5a162ea070f4fa929dff8d68f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Wed, 28 Dec 2022 22:07:59 +0800 Subject: [PATCH 06/12] =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89topic=E8=AE=A2=E9=98=85=E6=9D=83=E9=99=90=E6=8E=A7?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/broker/BrokerContextImpl.java | 2 +- .../org/smartboot/mqtt/broker/MqttSession.java | 15 ++++++++++----- .../smartboot/mqtt/broker/TopicSubscriber.java | 2 +- .../plugin/provider/PersistenceProvider.java | 2 +- .../mqtt/broker/plugin/provider/Providers.java | 14 ++++++++++++-- .../plugin/provider/SessionStateProvider.java | 2 +- .../broker/plugin/provider/SubscribeProvider.java | 11 +++++++++++ .../impl}/message/MemoryMessageStoreQueue.java | 2 +- .../impl}/message/MemoryPersistenceProvider.java | 2 +- .../impl}/message/PersistenceMessage.java | 2 +- .../impl}/session/MemorySessionStateProvider.java | 2 +- .../provider/impl}/session/SessionState.java | 2 +- .../mqtt/broker/processor/ConnectProcessor.java | 2 +- .../mqtt/broker/processor/SubscribeProcessor.java | 3 +-- 14 files changed, 44 insertions(+), 19 deletions(-) create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SubscribeProvider.java rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{persistence => plugin/provider/impl}/message/MemoryMessageStoreQueue.java (95%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{persistence => plugin/provider/impl}/message/MemoryPersistenceProvider.java (95%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{persistence => plugin/provider/impl}/message/PersistenceMessage.java (94%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{persistence => plugin/provider/impl}/session/MemorySessionStateProvider.java (92%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{persistence => plugin/provider/impl}/session/SessionState.java (90%) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 85f0f1a9..cdc80fd0 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -12,9 +12,9 @@ import org.smartboot.mqtt.broker.eventbus.ServerEventType; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer; -import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage; import org.smartboot.mqtt.broker.plugin.Plugin; import org.smartboot.mqtt.broker.plugin.provider.Providers; +import org.smartboot.mqtt.broker.plugin.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.eventbus.EventBus; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 33f8fd42..47e8516c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -3,7 +3,7 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.eventbus.ServerEventType; -import org.smartboot.mqtt.broker.persistence.session.SessionState; +import org.smartboot.mqtt.broker.plugin.provider.impl.session.SessionState; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.MqttWriter; @@ -120,8 +120,13 @@ public class MqttSession extends AbstractSession { this.username = username; } - public void subscribe(String topicFilter, MqttQoS mqttQoS) { - subscribe0(topicFilter, mqttQoS, true); + public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) { + if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) { + subscribe0(topicFilter, mqttQoS, true); + return mqttQoS; + } else { + return MqttQoS.FAILURE; + } } private void subscribe0(String topicFilter, MqttQoS mqttQoS, boolean newSubscribe) { @@ -138,7 +143,7 @@ public class MqttSession extends AbstractSession { //通配符匹配存量Topic for (BrokerTopic topic : mqttContext.getTopics()) { - if (TopicTokenUtil.match(topic.getTopicToken(), topicToken)) { + if (TopicTokenUtil.match(topic.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topic.getTopic(), this)) { TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic); if (newSubscribe) { mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); @@ -163,7 +168,7 @@ public class MqttSession extends AbstractSession { @Override public void subscribe(EventType eventType, BrokerTopic object) { - if (TopicTokenUtil.match(object.getTopicToken(), topicToken)) { + if (TopicTokenUtil.match(object.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) { TopicSubscriber subscription = MqttSession.this.subscribeSuccess(mqttQoS, topicToken, object); mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index a48bd173..bababa81 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -2,8 +2,8 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage; import org.smartboot.mqtt.broker.plugin.provider.PersistenceProvider; +import org.smartboot.mqtt.broker.plugin.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java index 2c8701ff..7b5874c5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java @@ -1,6 +1,6 @@ package org.smartboot.mqtt.broker.plugin.provider; -import org.smartboot.mqtt.broker.persistence.message.PersistenceMessage; +import org.smartboot.mqtt.broker.plugin.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; /** diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java index a53ad12c..483cc70c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java @@ -1,7 +1,7 @@ package org.smartboot.mqtt.broker.plugin.provider; -import org.smartboot.mqtt.broker.persistence.message.MemoryPersistenceProvider; -import org.smartboot.mqtt.broker.persistence.session.MemorySessionStateProvider; +import org.smartboot.mqtt.broker.plugin.provider.impl.message.MemoryPersistenceProvider; +import org.smartboot.mqtt.broker.plugin.provider.impl.session.MemorySessionStateProvider; /** * @author 三刀(zhengjunweimail@163.com) @@ -15,6 +15,8 @@ public class Providers { private ConnectAuthenticationProvider connectAuthenticationProvider; + private SubscribeProvider subscribeProvider = (topicFilter, session) -> true; + public SessionStateProvider getSessionStateProvider() { return sessionStateProvider; } @@ -46,4 +48,12 @@ public class Providers { public void setConnectAuthenticationProvider(ConnectAuthenticationProvider connectAuthenticationProvider) { this.connectAuthenticationProvider = connectAuthenticationProvider; } + + public SubscribeProvider getSubscribeProvider() { + return subscribeProvider; + } + + public void setSubscribeProvider(SubscribeProvider subscribeProvider) { + this.subscribeProvider = subscribeProvider; + } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java index c7b1a3a8..6cd5f1b0 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java @@ -1,6 +1,6 @@ package org.smartboot.mqtt.broker.plugin.provider; -import org.smartboot.mqtt.broker.persistence.session.SessionState; +import org.smartboot.mqtt.broker.plugin.provider.impl.session.SessionState; /** * 会话状态Provider diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SubscribeProvider.java new file mode 100644 index 00000000..88db4d4c --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SubscribeProvider.java @@ -0,0 +1,11 @@ +package org.smartboot.mqtt.broker.plugin.provider; + +import org.smartboot.mqtt.broker.MqttSession; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 2022/12/28 + */ +public interface SubscribeProvider { + boolean subscribeTopic(String topicFilter, MqttSession session); +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryMessageStoreQueue.java similarity index 95% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryMessageStoreQueue.java index b2f4de48..f4b1d053 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryMessageStoreQueue.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryMessageStoreQueue.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.message; +package org.smartboot.mqtt.broker.plugin.provider.impl.message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryPersistenceProvider.java similarity index 95% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryPersistenceProvider.java index ff61ff7f..da5ce1dd 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/MemoryPersistenceProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryPersistenceProvider.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.message; +package org.smartboot.mqtt.broker.plugin.provider.impl.message; import org.smartboot.mqtt.broker.plugin.provider.PersistenceProvider; import org.smartboot.mqtt.common.message.MqttPublishMessage; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/PersistenceMessage.java similarity index 94% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/PersistenceMessage.java index 0a556a9c..d89d1d6a 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/message/PersistenceMessage.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/PersistenceMessage.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.message; +package org.smartboot.mqtt.broker.plugin.provider.impl.message; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.message.MqttPublishMessage; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/MemorySessionStateProvider.java similarity index 92% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/MemorySessionStateProvider.java index ab50893e..cf7d6112 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/MemorySessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/MemorySessionStateProvider.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.session; +package org.smartboot.mqtt.broker.plugin.provider.impl.session; import org.smartboot.mqtt.broker.plugin.provider.SessionStateProvider; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/SessionState.java similarity index 90% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/SessionState.java index 4b1bdafa..cef8dcfc 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/persistence/session/SessionState.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/SessionState.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.persistence.session; +package org.smartboot.mqtt.broker.plugin.provider.impl.session; import org.smartboot.mqtt.common.AckMessage; import org.smartboot.mqtt.common.enums.MqttQoS; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 7f731d4d..73531c97 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -7,8 +7,8 @@ import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.eventbus.EventObject; import org.smartboot.mqtt.broker.eventbus.ServerEventType; -import org.smartboot.mqtt.broker.persistence.session.SessionState; import org.smartboot.mqtt.broker.plugin.provider.SessionStateProvider; +import org.smartboot.mqtt.broker.plugin.provider.impl.session.SessionState; import org.smartboot.mqtt.common.MqttMessageBuilders; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; import org.smartboot.mqtt.common.enums.MqttProtocolEnum; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java index d40b2b56..693d12a8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java @@ -25,8 +25,7 @@ public class SubscribeProcessor extends AuthorizedMqttProcessor Date: Thu, 29 Dec 2022 19:51:13 +0800 Subject: [PATCH 07/12] =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=8C=85=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/smartboot/mqtt/broker/BrokerContext.java | 2 +- .../java/org/smartboot/mqtt/broker/BrokerContextImpl.java | 4 ++-- .../broker/ConfiguredConnectAuthenticationProviderImpl.java | 5 +++-- .../main/java/org/smartboot/mqtt/broker/MqttSession.java | 2 +- .../java/org/smartboot/mqtt/broker/TopicSubscriber.java | 4 ++-- .../smartboot/mqtt/broker/processor/ConnectProcessor.java | 4 ++-- .../provider/ConnectAuthenticationProvider.java | 3 ++- .../broker/{plugin => }/provider/PersistenceProvider.java | 4 ++-- .../mqtt/broker/{plugin => }/provider/Providers.java | 6 +++--- .../broker/{plugin => }/provider/SessionStateProvider.java | 4 ++-- .../broker/{plugin => }/provider/SubscribeProvider.java | 3 ++- .../provider/impl/message/MemoryMessageStoreQueue.java | 2 +- .../provider/impl/message/MemoryPersistenceProvider.java | 4 ++-- .../provider/impl/message/PersistenceMessage.java | 2 +- .../provider/impl/session/MemorySessionStateProvider.java | 4 ++-- .../{plugin => }/provider/impl/session/SessionState.java | 2 +- 16 files changed, 29 insertions(+), 26 deletions(-) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/ConnectAuthenticationProvider.java (89%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/PersistenceProvider.java (85%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/Providers.java (88%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/SessionStateProvider.java (77%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/SubscribeProvider.java (80%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/impl/message/MemoryMessageStoreQueue.java (95%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/impl/message/MemoryPersistenceProvider.java (91%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/impl/message/PersistenceMessage.java (94%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/impl/session/MemorySessionStateProvider.java (83%) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{plugin => }/provider/impl/session/SessionState.java (90%) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 338327e7..c2652ba8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -1,7 +1,7 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; -import org.smartboot.mqtt.broker.plugin.provider.Providers; +import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.common.eventbus.EventBus; import java.io.IOException; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index cdc80fd0..c32675e8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -13,8 +13,8 @@ import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer; import org.smartboot.mqtt.broker.plugin.Plugin; -import org.smartboot.mqtt.broker.plugin.provider.Providers; -import org.smartboot.mqtt.broker.plugin.provider.impl.message.PersistenceMessage; +import org.smartboot.mqtt.broker.provider.Providers; +import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.eventbus.EventBus; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java index 02728a8c..a188e44d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java @@ -3,7 +3,7 @@ package org.smartboot.mqtt.broker; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.plugin.provider.ConnectAuthenticationProvider; +import org.smartboot.mqtt.broker.provider.ConnectAuthenticationProvider; import org.smartboot.mqtt.common.message.MqttConnectMessage; import java.util.Objects; @@ -24,10 +24,11 @@ public class ConfiguredConnectAuthenticationProviderImpl implements ConnectAuthe } - private static String getHost(MqttSession session) { + private String getHost(MqttSession session) { try { return session.getRemoteAddress().getHostName(); } catch (Exception e) { + LOGGER.error("get remote address exception", e); return ""; } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 47e8516c..cb983cf3 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -3,7 +3,7 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.eventbus.ServerEventType; -import org.smartboot.mqtt.broker.plugin.provider.impl.session.SessionState; +import org.smartboot.mqtt.broker.provider.impl.session.SessionState; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.MqttWriter; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index bababa81..4bfe887a 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -2,8 +2,8 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.plugin.provider.PersistenceProvider; -import org.smartboot.mqtt.broker.plugin.provider.impl.message.PersistenceMessage; +import org.smartboot.mqtt.broker.provider.PersistenceProvider; +import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 73531c97..3d2b7994 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -7,8 +7,8 @@ import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.eventbus.EventObject; import org.smartboot.mqtt.broker.eventbus.ServerEventType; -import org.smartboot.mqtt.broker.plugin.provider.SessionStateProvider; -import org.smartboot.mqtt.broker.plugin.provider.impl.session.SessionState; +import org.smartboot.mqtt.broker.provider.SessionStateProvider; +import org.smartboot.mqtt.broker.provider.impl.session.SessionState; import org.smartboot.mqtt.common.MqttMessageBuilders; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; import org.smartboot.mqtt.common.enums.MqttProtocolEnum; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/ConnectAuthenticationProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/ConnectAuthenticationProvider.java similarity index 89% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/ConnectAuthenticationProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/ConnectAuthenticationProvider.java index 45c37d55..b74ab9a1 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/ConnectAuthenticationProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/ConnectAuthenticationProvider.java @@ -1,9 +1,10 @@ -package org.smartboot.mqtt.broker.plugin.provider; +package org.smartboot.mqtt.broker.provider; import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.common.message.MqttConnectMessage; /** + * 连接认证 * @author qinluo * @date 2022-08-05 16:42:40 * @since 1.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/PersistenceProvider.java similarity index 85% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/PersistenceProvider.java index 7b5874c5..504136b5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/PersistenceProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/PersistenceProvider.java @@ -1,6 +1,6 @@ -package org.smartboot.mqtt.broker.plugin.provider; +package org.smartboot.mqtt.broker.provider; -import org.smartboot.mqtt.broker.plugin.provider.impl.message.PersistenceMessage; +import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; /** diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java similarity index 88% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java index 483cc70c..6750bae5 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/Providers.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/Providers.java @@ -1,7 +1,7 @@ -package org.smartboot.mqtt.broker.plugin.provider; +package org.smartboot.mqtt.broker.provider; -import org.smartboot.mqtt.broker.plugin.provider.impl.message.MemoryPersistenceProvider; -import org.smartboot.mqtt.broker.plugin.provider.impl.session.MemorySessionStateProvider; +import org.smartboot.mqtt.broker.provider.impl.message.MemoryPersistenceProvider; +import org.smartboot.mqtt.broker.provider.impl.session.MemorySessionStateProvider; /** * @author 三刀(zhengjunweimail@163.com) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java similarity index 77% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java index 6cd5f1b0..9d2d4239 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SessionStateProvider.java @@ -1,6 +1,6 @@ -package org.smartboot.mqtt.broker.plugin.provider; +package org.smartboot.mqtt.broker.provider; -import org.smartboot.mqtt.broker.plugin.provider.impl.session.SessionState; +import org.smartboot.mqtt.broker.provider.impl.session.SessionState; /** * 会话状态Provider diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java similarity index 80% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SubscribeProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java index 88db4d4c..b2327695 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/SubscribeProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java @@ -1,8 +1,9 @@ -package org.smartboot.mqtt.broker.plugin.provider; +package org.smartboot.mqtt.broker.provider; import org.smartboot.mqtt.broker.MqttSession; /** + * Topic订阅 * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/12/28 */ diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryMessageStoreQueue.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java similarity index 95% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryMessageStoreQueue.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java index f4b1d053..eccb87bb 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryMessageStoreQueue.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryMessageStoreQueue.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.plugin.provider.impl.message; +package org.smartboot.mqtt.broker.provider.impl.message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryPersistenceProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java similarity index 91% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryPersistenceProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java index da5ce1dd..60ab977f 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/MemoryPersistenceProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/MemoryPersistenceProvider.java @@ -1,6 +1,6 @@ -package org.smartboot.mqtt.broker.plugin.provider.impl.message; +package org.smartboot.mqtt.broker.provider.impl.message; -import org.smartboot.mqtt.broker.plugin.provider.PersistenceProvider; +import org.smartboot.mqtt.broker.provider.PersistenceProvider; import org.smartboot.mqtt.common.message.MqttPublishMessage; import java.util.concurrent.ConcurrentHashMap; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/PersistenceMessage.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java similarity index 94% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/PersistenceMessage.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java index d89d1d6a..9b0caf77 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/message/PersistenceMessage.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/message/PersistenceMessage.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.plugin.provider.impl.message; +package org.smartboot.mqtt.broker.provider.impl.message; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.message.MqttPublishMessage; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/MemorySessionStateProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java similarity index 83% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/MemorySessionStateProvider.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java index cf7d6112..b72ea7be 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/MemorySessionStateProvider.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/MemorySessionStateProvider.java @@ -1,6 +1,6 @@ -package org.smartboot.mqtt.broker.plugin.provider.impl.session; +package org.smartboot.mqtt.broker.provider.impl.session; -import org.smartboot.mqtt.broker.plugin.provider.SessionStateProvider; +import org.smartboot.mqtt.broker.provider.SessionStateProvider; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/SessionState.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java similarity index 90% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/SessionState.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java index cef8dcfc..a9e6904c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/plugin/provider/impl/session/SessionState.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java @@ -1,4 +1,4 @@ -package org.smartboot.mqtt.broker.plugin.provider.impl.session; +package org.smartboot.mqtt.broker.provider.impl.session; import org.smartboot.mqtt.common.AckMessage; import org.smartboot.mqtt.common.enums.MqttQoS; -- Gitee From 9b713b652266ce9d5d99a31e573df8dcacbef0cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Thu, 29 Dec 2022 21:00:02 +0800 Subject: [PATCH 08/12] =?UTF-8?q?=E6=B7=BB=E5=8A=A0InetSocketAddress.getHo?= =?UTF-8?q?stName=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/ConfiguredConnectAuthenticationProviderImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java index a188e44d..ce61ea79 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java @@ -25,11 +25,17 @@ public class ConfiguredConnectAuthenticationProviderImpl implements ConnectAuthe private String getHost(MqttSession session) { + long start = System.currentTimeMillis(); try { return session.getRemoteAddress().getHostName(); } catch (Exception e) { LOGGER.error("get remote address exception", e); return ""; + } finally { + long cost = System.currentTimeMillis() - start; + if (cost > 1000) { + LOGGER.warn("InetSocketAddress.getHostName cost: " + cost + "ms"); + } } } -- Gitee From 745829f50831b67a590e15f74989357e33de11d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 30 Dec 2022 22:24:08 +0800 Subject: [PATCH 09/12] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...guredConnectAuthenticationProviderImpl.java | 18 ++---------------- .../ConnectIdleTimeMonitorSubscriber.java | 2 +- .../smartboot/mqtt/common/util/MqttUtil.java | 12 ++++++++++++ 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java index ce61ea79..e2813c9d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.provider.ConnectAuthenticationProvider; import org.smartboot.mqtt.common.message.MqttConnectMessage; +import org.smartboot.mqtt.common.util.MqttUtil; import java.util.Objects; @@ -24,28 +25,13 @@ public class ConfiguredConnectAuthenticationProviderImpl implements ConnectAuthe } - private String getHost(MqttSession session) { - long start = System.currentTimeMillis(); - try { - return session.getRemoteAddress().getHostName(); - } catch (Exception e) { - LOGGER.error("get remote address exception", e); - return ""; - } finally { - long cost = System.currentTimeMillis() - start; - if (cost > 1000) { - LOGGER.warn("InetSocketAddress.getHostName cost: " + cost + "ms"); - } - } - } - @Override public boolean authentication(MqttConnectMessage connectMessage, MqttSession session) { String username = connectMessage.getPayload().userName(); String password = connectMessage.getPayload().passwordInBytes() == null ? "" : new String(connectMessage.getPayload().passwordInBytes()); String configuredUsername = configure.getUsername(); String configuredPassword = configure.getPassword(); - String host = getHost(session); + String host = MqttUtil.getRemoteAddress(session); if (StringUtils.isEmpty(configuredPassword) || StringUtils.isEmpty(configuredUsername)) { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java index 6fd605b2..1ebfdb53 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java @@ -36,7 +36,7 @@ public class ConnectIdleTimeMonitorSubscriber implements EventBusSubscriber Date: Sat, 31 Dec 2022 10:27:05 +0800 Subject: [PATCH 10/12] =?UTF-8?q?=E5=BC=80=E5=8F=91v0.12?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 ++-- smart-mqtt-broker/pom.xml | 2 +- .../main/java/org/smartboot/mqtt/broker/BrokerConfigure.java | 2 +- smart-mqtt-client/pom.xml | 2 +- smart-mqtt-common/pom.xml | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 77b4fa07..42658ff7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.11 + 0.12 4.0.0 mqtt broker - 0.11 + 0.12 1.5.23 2.6 4.3 diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index e2279b50..8373da63 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.11 + 0.12 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 8d68ca2c..19adc89d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -26,7 +26,7 @@ public class BrokerConfigure { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.11"; + public static final String VERSION = "v0.12"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index fdc187fb..6ffee2e1 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.11 + 0.12 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 22da02a6..a65febd1 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.11 + 0.12 ../pom.xml 4.0.0 -- Gitee From 1a5e4f951a26b45ac1938f8d92b51d809a354dcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 31 Dec 2022 12:01:16 +0800 Subject: [PATCH 11/12] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/broker/BrokerContextImpl.java | 1 + .../impl}/ConfiguredConnectAuthenticationProviderImpl.java | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) rename smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/{ => provider/impl}/ConfiguredConnectAuthenticationProviderImpl.java (91%) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index c32675e8..0678fec4 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -14,6 +14,7 @@ import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer; import org.smartboot.mqtt.broker.plugin.Plugin; import org.smartboot.mqtt.broker.provider.Providers; +import org.smartboot.mqtt.broker.provider.impl.ConfiguredConnectAuthenticationProviderImpl; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java similarity index 91% rename from smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java rename to smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java index e2813c9d..01d64cc8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/ConfiguredConnectAuthenticationProviderImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/ConfiguredConnectAuthenticationProviderImpl.java @@ -1,8 +1,11 @@ -package org.smartboot.mqtt.broker; +package org.smartboot.mqtt.broker.provider.impl; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.BrokerConfigure; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.provider.ConnectAuthenticationProvider; import org.smartboot.mqtt.common.message.MqttConnectMessage; import org.smartboot.mqtt.common.util.MqttUtil; -- Gitee From c6e8711e97341fa03f54fde4e7cdfa9ffffe9c3b Mon Sep 17 00:00:00 2001 From: cea <396157168@qq.com> Date: Mon, 2 Jan 2023 16:24:18 +0800 Subject: [PATCH 12/12] =?UTF-8?q?update:=20mqtt5=20conn=E3=80=81connAck?= =?UTF-8?q?=E3=80=81=E6=B6=88=E6=81=AF=E4=BD=93=E7=9B=B8=E5=85=B3=EF=BC=8C?= =?UTF-8?q?MqttIdPropertyMessage=E6=B6=88=E6=81=AF=E4=BD=93=E5=B0=81?= =?UTF-8?q?=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/common/QosPublisher.java | 14 ++-- .../common/enums/MqttConnectReturnCode.java | 25 +----- .../mqtt/common/enums/MqttReasonCode.java | 78 +++++++++++++++++++ .../common/message/MqttConnAckMessage.java | 3 +- .../common/message/MqttConnectMessage.java | 7 +- .../common/message/MqttIdPropertyMessage.java | 55 +++++++++++++ .../mqtt/common/message/MqttMessage.java | 4 +- .../message/MqttPacketIdVariableHeader.java | 6 ++ .../message/MqttPacketIdentifierMessage.java | 3 +- .../common/message/MqttPubCompMessage.java | 4 +- .../message/MqttPubReplyVariableHeader.java | 24 ++++++ .../common/message/MqttPublishMessage.java | 3 +- .../message/MqttPublishVariableHeader.java | 8 ++ .../mqtt/common/protocol/DecodeUnit.java | 13 ++++ .../mqtt/common/protocol/MqttProtocol.java | 9 +-- 15 files changed, 209 insertions(+), 47 deletions(-) create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttIdPropertyMessage.java create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/DecodeUnit.java diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java index 80519e34..5bf0c088 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java @@ -2,13 +2,9 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; -import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.*; import org.smartboot.mqtt.common.util.ValidateUtils; import java.util.Objects; @@ -26,7 +22,7 @@ public abstract class QosPublisher { //至少一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubAckMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBACK, "invalid message type"); future.complete(true); session.responseConsumers.remove(cacheKey); LOGGER.info("Qos1消息发送成功..."); @@ -44,13 +40,13 @@ public abstract class QosPublisher { CompletableFuture publishFuture = new CompletableFuture<>(); //只有一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubRecMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBREC, "invalid message type"); ValidateUtils.isTrue(Objects.equals(message.getVariableHeader().getPacketId(), publishMessage.getVariableHeader().getPacketId()), "invalid packetId"); publishFuture.complete(true); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(message.getVariableHeader().getPacketId()); CompletableFuture pubRelFuture = new CompletableFuture<>(); session.responseConsumers.put(cacheKey, new AckMessage(pubRelMessage, compMessage -> { - ValidateUtils.isTrue(compMessage instanceof MqttPubCompMessage, "invalid message type"); + ValidateUtils.isTrue(compMessage.getFixedHeader().getMessageType() == MqttMessageType.PUBCOMP, "invalid message type"); ValidateUtils.isTrue(Objects.equals(compMessage.getVariableHeader().getPacketId(), pubRelMessage.getVariableHeader().getPacketId()), "invalid packetId"); pubRelFuture.complete(true); LOGGER.info("Qos2消息发送成功..."); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java index 63eb04fe..aa9554bc 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java @@ -10,30 +10,7 @@ public enum MqttConnectReturnCode { CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02, "客户端标识符是正确的 UTF-8 编码,但服务 端不允许使用"), CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03, "网络连接已建立,但 MQTT 服务不可用"), CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04, "用户名或密码的数据格式无效"), - CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"), - - //MQTT5 - CONNECTION_REFUSED_UNSPECIFIED_ERROR((byte) 0x80, "未识别的错误"), - CONNECTION_REFUSED_MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), - CONNECTION_REFUSED_PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), - CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC((byte) 0x83, "服务端拒绝了连接"), - CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), - CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), - CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), - CONNECTION_REFUSED_NOT_AUTHORIZED_5((byte) 0x87, "未授权"), - CONNECTION_REFUSED_SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), - CONNECTION_REFUSED_SERVER_BUSY((byte) 0x89, "服务端繁忙中"), - CONNECTION_REFUSED_BANNED((byte) 0x8A, "客户端被禁用"), - CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), - CONNECTION_REFUSED_TOPIC_NAME_INVALID((byte) 0x90, "topic不被允许"), - CONNECTION_REFUSED_PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), - CONNECTION_REFUSED_QUOTA_EXCEEDED((byte) 0x97, "已超限"), - CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), - CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), - CONNECTION_REFUSED_QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), - CONNECTION_REFUSED_USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), - CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), - CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"); + CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"); private final byte code; private final String desc; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java new file mode 100644 index 00000000..aecd578d --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java @@ -0,0 +1,78 @@ +package org.smartboot.mqtt.common.enums; + +/** + * + */ +public enum MqttReasonCode { + //MQTT5 + SUCCESS((byte) 0x00, "成功"), + NORMAL_DISCONNECTION((byte) 0x00, "断开连接"), + GRANTED_QOS0((byte) 0x00, "最大允许qos为0"), + GRANTED_QOS1((byte) 0x01, "最大允许qos为1"), + GRANTED_QOS2((byte) 0x02, "最大允许qos为2"), + DISCONNECT_WITH_WILL_MESSAGE((byte) 0x04, "客户端需要断开连接后发送遗嘱消息"), + NO_MATCHING_SUBSCRIBERS((byte) 0X10, "无匹配的订阅者"), + N0_SUBSCRIPTION_EXISTED((byte) 0X11, ""), + CONTINUE_AUTHENTICATION((byte) 0X18, ""), + RE_AUTHENTICATE((byte) 0X19, ""), + UNSPECIFIED_ERROR((byte) 0x80, "未指明的错误"), + MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), + PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), + IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83, "接收者不接受"), + UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), + CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), + BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), + NOT_AUTHORIZED((byte) 0x87, "未授权"), + SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), + SERVER_BUSY((byte) 0x89, "服务端繁忙中"), + BANNED((byte) 0x8A, "客户端被禁用"), + SERVER_SHUTTING_DOWN((byte) 0x8B, ""), + BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), + KEEP_ALIVE_TIMEOUT((byte) 0x8D, ""), + SESSION_TAKEN_OVER((byte) 0x8E, "相同客户端id上线导致被踢出下线"), + TOPIC_FILTER_INVALID((byte) 0x8F, "消息过滤非法"), + TOPIC_NAME_INVALID((byte) 0x90, "topic名非法"), + PACKET_IDENTIFIER_IN_USE((byte) 0x91, "packetId已被使用"), + PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92, ""), + RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93, ""), + TOPIC_ALIAS_INVALID((byte) 0x94, ""), + PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), + MESSAGE_RATE_TOO_HIGH((byte) 0x96, ""), + QUOTA_EXCEEDED((byte) 0x97, "已超限"), + ADMINISTRATIVE_ACTION((byte) 0x98, ""), + PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), + RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), + QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), + USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), + SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), + SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E, ""), + CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"), + MAXIMUM_CONNECT_TIME((byte) 0xA0, ""), + SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1, ""), + WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2, ""); + + private final byte code; + private final String desc; + + MqttReasonCode(byte code, String desc) { + this.code = code; + this.desc = desc; + } + + public static MqttReasonCode valueOf(byte b) { + for (MqttReasonCode v : values()) { + if (b == v.code) { + return v; + } + } + throw new IllegalArgumentException("unknown reason code: " + (b & 0xFF)); + } + + public byte getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java index 0a8193f2..cb5a0f95 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java @@ -2,6 +2,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.socket.util.BufferUtils; import java.nio.ByteBuffer; @@ -22,7 +23,7 @@ public class MqttConnAckMessage extends MqttVariableMessage { + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader) { + super(mqttFixedHeader); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId) { + this(mqttFixedHeader, packetId, (byte)0, null); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId, byte reasonCode, MqttProperties mqttProperties) { + super(mqttFixedHeader); + setVariableHeader(new MqttPubReplyVariableHeader(packetId, reasonCode, mqttProperties)); + } + + @Override + public final void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { + int packetId = buffer.getShort(); + MqttPubReplyVariableHeader header; + if (unit.mqttVersion == MqttVersion.MQTT_5){ + byte reasonCode = buffer.get(); + byte propertyLen = buffer.get(); + header = new MqttPubReplyVariableHeader(packetId, reasonCode, null); + }else { + header = new MqttPubReplyVariableHeader(packetId, (byte)0, null); + } + setVariableHeader(header); + } + + + @Override + public void writeTo(MqttWriter mqttWriter) throws IOException { + MqttPubReplyVariableHeader variableHeader = getVariableHeader(); + int variableHeaderBufferSize = 2; // variable part only has a message id + mqttWriter.writeByte(getFixedHeaderByte1(fixedHeader)); + writeVariableLengthInt(mqttWriter, variableHeaderBufferSize); + mqttWriter.writeShort((short) variableHeader.getPacketId()); + mqttWriter.writeByte(variableHeader.getReasonCode()); + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java index 34fde45b..0b4b5a9e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java @@ -3,6 +3,8 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.exception.MqttProcessException; +import org.smartboot.mqtt.common.protocol.DecodeUnit; +import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.socket.util.BufferUtils; import org.smartboot.socket.util.DecoderException; @@ -141,7 +143,7 @@ public class MqttMessage extends ToString { * * @param buffer */ - public void decodeVariableHeader(ByteBuffer buffer) { + public void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java index 64ea394a..ed85e1b9 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java @@ -10,6 +10,12 @@ public class MqttPacketIdVariableHeader extends MqttVariableHeader { */ private int packetId; + public MqttPacketIdVariableHeader() { + } + + public MqttPacketIdVariableHeader(int packetId) { + this.packetId = packetId; + } public void setPacketId(int packetId) { this.packetId = packetId; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java index ff4fd6a4..ee86a217 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,7 +29,7 @@ public class MqttPacketIdentifierMessage extends MqttVariableMessage 0xffff) { + throw new IllegalArgumentException("packetId: " + packetId + " (should be: 1 ~ 65535)"); + } + this.reasonCode = reasonCode; + this.properties = properties; + } + + public byte getReasonCode() { + return reasonCode; + } + + public MqttProperties getProperties() { + return properties; + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java index d6280122..39e508d1 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.socket.util.DecoderException; @@ -25,7 +26,7 @@ public class MqttPublishMessage extends MqttVariableMessage { if (payloadBuffer.remaining() < remainingLength) { break; } - unit.mqttMessage.decodeVariableHeader(payloadBuffer); + unit.mqttMessage.decodeVariableHeader(unit, payloadBuffer); unit.state = READ_PAYLOAD; @@ -181,10 +182,4 @@ public class MqttProtocol implements Protocol { READ_FIXED_HEADER, READ_VARIABLE_HEADER, READ_PAYLOAD, FINISH, } - class DecodeUnit { - DecoderState state; - MqttMessage mqttMessage; - - ByteBuffer disposableBuffer; - } } -- Gitee