From 213d5d8da4c509179f998c16398bf2fbec642371 Mon Sep 17 00:00:00 2001 From: suntw <781128> Date: Sun, 25 Dec 2022 22:37:05 +0800 Subject: [PATCH 1/2] =?UTF-8?q?update:=20mqtt5=20connect=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=BD=93=E7=9B=B8=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/enums/MqttConnectReturnCode.java | 25 +- .../mqtt/common/enums/MqttProtocolEnum.java | 2 +- .../mqtt/common/enums/MqttVersion.java | 2 +- .../message/MqttConnAckVariableHeader.java | 11 + .../common/message/MqttConnectPayload.java | 13 +- .../message/MqttConnectVariableHeader.java | 20 + .../mqtt/common/message/MqttProperties.java | 374 ++++++++++++++++++ 7 files changed, 443 insertions(+), 4 deletions(-) create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java index 80843183..63eb04fe 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java @@ -4,13 +4,36 @@ package org.smartboot.mqtt.common.enums; * */ public enum MqttConnectReturnCode { + //MQTT3 CONNECTION_ACCEPTED((byte) 0x00, "连接已被服务端接受"), CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01, "服务端不支持客户端请求的 MQTT 协议级别"), CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02, "客户端标识符是正确的 UTF-8 编码,但服务 端不允许使用"), CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03, "网络连接已建立,但 MQTT 服务不可用"), CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04, "用户名或密码的数据格式无效"), - CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"); + CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"), + //MQTT5 + CONNECTION_REFUSED_UNSPECIFIED_ERROR((byte) 0x80, "未识别的错误"), + CONNECTION_REFUSED_MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), + CONNECTION_REFUSED_PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), + CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC((byte) 0x83, "服务端拒绝了连接"), + CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), + CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), + CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), + CONNECTION_REFUSED_NOT_AUTHORIZED_5((byte) 0x87, "未授权"), + CONNECTION_REFUSED_SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), + CONNECTION_REFUSED_SERVER_BUSY((byte) 0x89, "服务端繁忙中"), + CONNECTION_REFUSED_BANNED((byte) 0x8A, "客户端被禁用"), + CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), + CONNECTION_REFUSED_TOPIC_NAME_INVALID((byte) 0x90, "topic不被允许"), + CONNECTION_REFUSED_PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), + CONNECTION_REFUSED_QUOTA_EXCEEDED((byte) 0x97, "已超限"), + CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), + CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), + CONNECTION_REFUSED_QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), + CONNECTION_REFUSED_USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), + CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), + CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"); private final byte code; private final String desc; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java index cfd7a6c9..94fcc5c5 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java @@ -5,7 +5,7 @@ package org.smartboot.mqtt.common.enums; * @version V1.0 , 2022/3/23 */ public enum MqttProtocolEnum { - MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"); + MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"), MQTT_5("MQTT"); /** * 协议名 */ diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java index 5aa5040f..58ee607c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java @@ -1,7 +1,7 @@ package org.smartboot.mqtt.common.enums; public enum MqttVersion { - MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4); + MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4), MQTT_5(MqttProtocolEnum.MQTT_5, (byte) 5); private final MqttProtocolEnum protocol; private final byte level; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java index ef478d4b..bde5cd72 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckVariableHeader.java @@ -14,9 +14,16 @@ public class MqttConnAckVariableHeader extends MqttVariableHeader { */ private final boolean sessionPresent; + private final MqttProperties properties; + + public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent) { + this(connectReturnCode, sessionPresent, null); + } + public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent, MqttProperties properties) { this.connectReturnCode = connectReturnCode; this.sessionPresent = sessionPresent; + this.properties = properties; } public MqttConnectReturnCode connectReturnCode() { @@ -26,4 +33,8 @@ public class MqttConnAckVariableHeader extends MqttVariableHeader { public boolean isSessionPresent() { return sessionPresent; } + + public MqttProperties getProperties() { + return properties; + } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java index 3e9b066c..23336141 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectPayload.java @@ -11,6 +11,10 @@ public final class MqttConnectPayload { * 客户端标识符 */ private final String clientIdentifier; + /** + * 遗嘱属性 + */ + private final MqttProperties willProperties; /** * 遗嘱主题 */ @@ -29,11 +33,16 @@ public final class MqttConnectPayload { private final byte[] password; public MqttConnectPayload(String clientIdentifier, String willTopic, byte[] willMessage, String userName, byte[] password) { + this(clientIdentifier, willTopic, willMessage, userName, password, null); + } + + public MqttConnectPayload(String clientIdentifier, String willTopic, byte[] willMessage, String userName, byte[] password, MqttProperties willProperties) { this.clientIdentifier = clientIdentifier; this.willTopic = willTopic; this.willMessage = willMessage; this.userName = userName; this.password = password; + this.willProperties = willProperties; } public String clientIdentifier() { @@ -56,5 +65,7 @@ public final class MqttConnectPayload { return password; } - + public MqttProperties getWillProperties() { + return willProperties; + } } \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java index 02572cd9..5943349a 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectVariableHeader.java @@ -25,12 +25,22 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { private final boolean isCleanSession; private final int reserved; private final int keepAliveTimeSeconds; + private final MqttProperties properties; public MqttConnectVariableHeader( String name, byte protocolLevel, int connectFlag, int keepAliveTimeSeconds) { + this(name, protocolLevel, connectFlag, keepAliveTimeSeconds, null); + } + + public MqttConnectVariableHeader( + String name, + byte protocolLevel, + int connectFlag, + int keepAliveTimeSeconds, + MqttProperties properties) { this.protocolName = name; this.protocolLevel = protocolLevel; this.hasUserName = (connectFlag & 0x80) == 0x80; @@ -41,9 +51,14 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { this.isCleanSession = (connectFlag & 0x02) == 0x02; this.reserved = (connectFlag & 0x01); this.keepAliveTimeSeconds = keepAliveTimeSeconds; + this.properties = properties; } public MqttConnectVariableHeader(MqttVersion mqttVersion, boolean hasUserName, boolean hasPassword, WillMessage willMessage, boolean isCleanSession, int keepAliveTimeSeconds) { + this(mqttVersion, hasUserName, hasPassword, willMessage, isCleanSession, keepAliveTimeSeconds, null); + } + + public MqttConnectVariableHeader(MqttVersion mqttVersion, boolean hasUserName, boolean hasPassword, WillMessage willMessage, boolean isCleanSession, int keepAliveTimeSeconds, MqttProperties properties) { this.protocolName = mqttVersion.protocolName(); this.protocolLevel = mqttVersion.protocolLevel(); this.hasUserName = hasUserName; @@ -55,8 +70,10 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { //服务端必须验证 CONNECT 控制报文的保留标志位(第 0 位)是否为 0,如果不为 0 必须断开客户端连接 this.reserved = 0; this.keepAliveTimeSeconds = keepAliveTimeSeconds; + this.properties = properties; } + public String protocolName() { return protocolName; } @@ -97,4 +114,7 @@ public final class MqttConnectVariableHeader extends MqttVariableHeader { return reserved; } + public MqttProperties getProperties() { + return properties; + } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java new file mode 100644 index 00000000..2539dae4 --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttProperties.java @@ -0,0 +1,374 @@ +package org.smartboot.mqtt.common.message; + +import java.util.*; + +/** + * MQTT Properties container from netty + * */ +public final class MqttProperties { + + public enum MqttPropertyType { + // single byte properties + PAYLOAD_FORMAT_INDICATOR(0x01), + REQUEST_PROBLEM_INFORMATION(0x17), + REQUEST_RESPONSE_INFORMATION(0x19), + MAXIMUM_QOS(0x24), + RETAIN_AVAILABLE(0x25), + WILDCARD_SUBSCRIPTION_AVAILABLE(0x28), + SUBSCRIPTION_IDENTIFIER_AVAILABLE(0x29), + SHARED_SUBSCRIPTION_AVAILABLE(0x2A), + + // two bytes properties + SERVER_KEEP_ALIVE(0x13), + RECEIVE_MAXIMUM(0x21), + TOPIC_ALIAS_MAXIMUM(0x22), + TOPIC_ALIAS(0x23), + + // four bytes properties + PUBLICATION_EXPIRY_INTERVAL(0x02), + SESSION_EXPIRY_INTERVAL(0x11), + WILL_DELAY_INTERVAL(0x18), + MAXIMUM_PACKET_SIZE(0x27), + + // Variable Byte Integer + SUBSCRIPTION_IDENTIFIER(0x0B), + + // UTF-8 Encoded String properties + CONTENT_TYPE(0x03), + RESPONSE_TOPIC(0x08), + ASSIGNED_CLIENT_IDENTIFIER(0x12), + AUTHENTICATION_METHOD(0x15), + RESPONSE_INFORMATION(0x1A), + SERVER_REFERENCE(0x1C), + REASON_STRING(0x1F), + USER_PROPERTY(0x26), + + // Binary Data + CORRELATION_DATA(0x09), + AUTHENTICATION_DATA(0x16); + + private static final MqttPropertyType[] VALUES; + + static { + VALUES = new MqttPropertyType[43]; + for (MqttPropertyType v : values()) { + VALUES[v.value] = v; + } + } + + private final int value; + + MqttPropertyType(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static MqttPropertyType valueOf(int type) { + MqttPropertyType t = null; + try { + t = VALUES[type]; + } catch (ArrayIndexOutOfBoundsException ignored) { + // nop + } + if (t == null) { + throw new IllegalArgumentException("unknown property type: " + type); + } + return t; + } + } + + /** + * MQTT property base class + * + * @param property type + */ + public abstract static class MqttProperty { + final T value; + final int propertyId; + + protected MqttProperty(int propertyId, T value) { + this.propertyId = propertyId; + this.value = value; + } + + /** + * Get MQTT property value + * + * @return property value + */ + public T value() { + return value; + } + + /** + * Get MQTT property ID + * @return property ID + */ + public int propertyId() { + return propertyId; + } + + @Override + public int hashCode() { + return propertyId + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + MqttProperty that = (MqttProperty) obj; + return this.propertyId == that.propertyId && this.value.equals(that.value); + } + } + + public static final class IntegerProperty extends MqttProperty { + + public IntegerProperty(int propertyId, Integer value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "IntegerProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringProperty extends MqttProperty { + + public StringProperty(int propertyId, String value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "StringProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringPair { + public final String key; + public final String value; + + public StringPair(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public int hashCode() { + return key.hashCode() + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + StringPair that = (StringPair) obj; + + return that.key.equals(this.key) && that.value.equals(this.value); + } + } + + //User properties are the only properties that may be included multiple times and + //are the only properties where ordering is required. Therefore, they need a special handling + public static final class UserProperties extends MqttProperty> { + public UserProperties() { + super(MqttPropertyType.USER_PROPERTY.value, new ArrayList()); + } + + /** + * Create user properties from the collection of the String pair values + * + * @param values string pairs. Collection entries are copied, collection itself isn't shared + */ + public UserProperties(Collection values) { + this(); + this.value.addAll(values); + } + + private static UserProperties fromUserPropertyCollection(Collection properties) { + UserProperties userProperties = new UserProperties(); + for (UserProperty property: properties) { + userProperties.add(new StringPair(property.value.key, property.value.value)); + } + return userProperties; + } + + public void add(StringPair pair) { + this.value.add(pair); + } + + public void add(String key, String value) { + this.value.add(new StringPair(key, value)); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("UserProperties("); + boolean first = true; + for (StringPair pair: value) { + if (!first) { + builder.append(", "); + } + builder.append(pair.key + "->" + pair.value); + first = false; + } + builder.append(")"); + return builder.toString(); + } + } + + public static final class UserProperty extends MqttProperty { + public UserProperty(String key, String value) { + super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value)); + } + + @Override + public String toString() { + return "UserProperty(" + value.key + ", " + value.value + ")"; + } + } + + public static final class BinaryProperty extends MqttProperty { + + public BinaryProperty(int propertyId, byte[] value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "BinaryProperty(" + propertyId + ", " + value.length + " bytes)"; + } + } + + + private Map props; + private List userProperties; + private List subscriptionIds; + + public void add(MqttProperty property) { + Map props = this.props; + if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) { + List userProperties = this.userProperties; + if (userProperties == null) { + userProperties = new ArrayList(1); + this.userProperties = userProperties; + } + if (property instanceof UserProperty) { + userProperties.add((UserProperty) property); + } else if (property instanceof UserProperties) { + for (StringPair pair: ((UserProperties) property).value) { + userProperties.add(new UserProperty(pair.key, pair.value)); + } + } else { + throw new IllegalArgumentException("User property must be of UserProperty or UserProperties type"); + } + } else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null) { + subscriptionIds = new ArrayList(1); + this.subscriptionIds = subscriptionIds; + } + if (property instanceof IntegerProperty) { + subscriptionIds.add((IntegerProperty) property); + } else { + throw new IllegalArgumentException("Subscription ID must be an integer property"); + } + } else { + if (props == null) { + props = new HashMap<>(); + this.props = props; + } + props.put(property.propertyId, property); + } + } + + public Collection listAll() { + Map props = this.props; + if (props == null && subscriptionIds == null && userProperties == null) { + return Collections.emptyList(); + } + if (subscriptionIds == null && userProperties == null) { + return props.values(); + } + if (props == null && userProperties == null) { + return subscriptionIds; + } + List propValues = new ArrayList(props != null ? props.size() : 1); + if (props != null) { + propValues.addAll(props.values()); + } + if (subscriptionIds != null) { + propValues.addAll(subscriptionIds); + } + if (userProperties != null) { + propValues.add(UserProperties.fromUserPropertyCollection(userProperties)); + } + return propValues; + } + + public boolean isEmpty() { + Map props = this.props; + return props == null || props.isEmpty(); + } + + /** + * Get property by ID. If there are multiple properties of this type (can be with Subscription ID) + * then return the first one. + * + * @param propertyId ID of the property + * @return a property if it is set, null otherwise + */ + public MqttProperty getProperty(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + //special handling to keep compatibility with earlier versions + List userProperties = this.userProperties; + if (userProperties == null) { + return null; + } + return UserProperties.fromUserPropertyCollection(userProperties); + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null || subscriptionIds.isEmpty()) { + return null; + } + return subscriptionIds.get(0); + } + Map props = this.props; + return props == null ? null : props.get(propertyId); + } + + /** + * Get properties by ID. + * Some properties (Subscription ID and User Properties) may occur multiple times, + * this method returns all their values in order. + * + * @param propertyId ID of the property + * @return all properties having specified ID + */ + public List getProperties(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + return userProperties == null ? Collections.emptyList() : userProperties; + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + return subscriptionIds == null ? Collections.emptyList() : subscriptionIds; + } + Map props = this.props; + return (props == null || !props.containsKey(propertyId)) ? + Collections.emptyList() : + Collections.singletonList(props.get(propertyId)); + } +} -- Gitee From 7d0736265c8bf9afdd99ac4447e996240c543d16 Mon Sep 17 00:00:00 2001 From: suntw <781128> Date: Mon, 2 Jan 2023 16:24:18 +0800 Subject: [PATCH 2/2] =?UTF-8?q?update:=20mqtt5=20conn=E3=80=81connAck?= =?UTF-8?q?=E3=80=81=E6=B6=88=E6=81=AF=E4=BD=93=E7=9B=B8=E5=85=B3=EF=BC=8C?= =?UTF-8?q?MqttIdPropertyMessage=E6=B6=88=E6=81=AF=E4=BD=93=E5=B0=81?= =?UTF-8?q?=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/mqtt/common/QosPublisher.java | 14 ++-- .../common/enums/MqttConnectReturnCode.java | 25 +----- .../mqtt/common/enums/MqttReasonCode.java | 78 +++++++++++++++++++ .../common/message/MqttConnAckMessage.java | 3 +- .../common/message/MqttConnectMessage.java | 7 +- .../common/message/MqttIdPropertyMessage.java | 55 +++++++++++++ .../mqtt/common/message/MqttMessage.java | 4 +- .../message/MqttPacketIdVariableHeader.java | 6 ++ .../message/MqttPacketIdentifierMessage.java | 3 +- .../common/message/MqttPubCompMessage.java | 4 +- .../message/MqttPubReplyVariableHeader.java | 24 ++++++ .../common/message/MqttPublishMessage.java | 3 +- .../message/MqttPublishVariableHeader.java | 8 ++ .../mqtt/common/protocol/DecodeUnit.java | 13 ++++ .../mqtt/common/protocol/MqttProtocol.java | 9 +-- 15 files changed, 209 insertions(+), 47 deletions(-) create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttIdPropertyMessage.java create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java create mode 100644 smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/DecodeUnit.java diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java index 80519e34..5bf0c088 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java @@ -2,13 +2,9 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; -import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.*; import org.smartboot.mqtt.common.util.ValidateUtils; import java.util.Objects; @@ -26,7 +22,7 @@ public abstract class QosPublisher { //至少一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubAckMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBACK, "invalid message type"); future.complete(true); session.responseConsumers.remove(cacheKey); LOGGER.info("Qos1消息发送成功..."); @@ -44,13 +40,13 @@ public abstract class QosPublisher { CompletableFuture publishFuture = new CompletableFuture<>(); //只有一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubRecMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBREC, "invalid message type"); ValidateUtils.isTrue(Objects.equals(message.getVariableHeader().getPacketId(), publishMessage.getVariableHeader().getPacketId()), "invalid packetId"); publishFuture.complete(true); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(message.getVariableHeader().getPacketId()); CompletableFuture pubRelFuture = new CompletableFuture<>(); session.responseConsumers.put(cacheKey, new AckMessage(pubRelMessage, compMessage -> { - ValidateUtils.isTrue(compMessage instanceof MqttPubCompMessage, "invalid message type"); + ValidateUtils.isTrue(compMessage.getFixedHeader().getMessageType() == MqttMessageType.PUBCOMP, "invalid message type"); ValidateUtils.isTrue(Objects.equals(compMessage.getVariableHeader().getPacketId(), pubRelMessage.getVariableHeader().getPacketId()), "invalid packetId"); pubRelFuture.complete(true); LOGGER.info("Qos2消息发送成功..."); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java index 63eb04fe..aa9554bc 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java @@ -10,30 +10,7 @@ public enum MqttConnectReturnCode { CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02, "客户端标识符是正确的 UTF-8 编码,但服务 端不允许使用"), CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03, "网络连接已建立,但 MQTT 服务不可用"), CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04, "用户名或密码的数据格式无效"), - CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"), - - //MQTT5 - CONNECTION_REFUSED_UNSPECIFIED_ERROR((byte) 0x80, "未识别的错误"), - CONNECTION_REFUSED_MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), - CONNECTION_REFUSED_PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), - CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC((byte) 0x83, "服务端拒绝了连接"), - CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), - CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), - CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), - CONNECTION_REFUSED_NOT_AUTHORIZED_5((byte) 0x87, "未授权"), - CONNECTION_REFUSED_SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), - CONNECTION_REFUSED_SERVER_BUSY((byte) 0x89, "服务端繁忙中"), - CONNECTION_REFUSED_BANNED((byte) 0x8A, "客户端被禁用"), - CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), - CONNECTION_REFUSED_TOPIC_NAME_INVALID((byte) 0x90, "topic不被允许"), - CONNECTION_REFUSED_PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), - CONNECTION_REFUSED_QUOTA_EXCEEDED((byte) 0x97, "已超限"), - CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), - CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), - CONNECTION_REFUSED_QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), - CONNECTION_REFUSED_USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), - CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), - CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"); + CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"); private final byte code; private final String desc; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java new file mode 100644 index 00000000..aecd578d --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java @@ -0,0 +1,78 @@ +package org.smartboot.mqtt.common.enums; + +/** + * + */ +public enum MqttReasonCode { + //MQTT5 + SUCCESS((byte) 0x00, "成功"), + NORMAL_DISCONNECTION((byte) 0x00, "断开连接"), + GRANTED_QOS0((byte) 0x00, "最大允许qos为0"), + GRANTED_QOS1((byte) 0x01, "最大允许qos为1"), + GRANTED_QOS2((byte) 0x02, "最大允许qos为2"), + DISCONNECT_WITH_WILL_MESSAGE((byte) 0x04, "客户端需要断开连接后发送遗嘱消息"), + NO_MATCHING_SUBSCRIBERS((byte) 0X10, "无匹配的订阅者"), + N0_SUBSCRIPTION_EXISTED((byte) 0X11, ""), + CONTINUE_AUTHENTICATION((byte) 0X18, ""), + RE_AUTHENTICATE((byte) 0X19, ""), + UNSPECIFIED_ERROR((byte) 0x80, "未指明的错误"), + MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), + PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), + IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83, "接收者不接受"), + UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), + CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), + BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), + NOT_AUTHORIZED((byte) 0x87, "未授权"), + SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), + SERVER_BUSY((byte) 0x89, "服务端繁忙中"), + BANNED((byte) 0x8A, "客户端被禁用"), + SERVER_SHUTTING_DOWN((byte) 0x8B, ""), + BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), + KEEP_ALIVE_TIMEOUT((byte) 0x8D, ""), + SESSION_TAKEN_OVER((byte) 0x8E, "相同客户端id上线导致被踢出下线"), + TOPIC_FILTER_INVALID((byte) 0x8F, "消息过滤非法"), + TOPIC_NAME_INVALID((byte) 0x90, "topic名非法"), + PACKET_IDENTIFIER_IN_USE((byte) 0x91, "packetId已被使用"), + PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92, ""), + RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93, ""), + TOPIC_ALIAS_INVALID((byte) 0x94, ""), + PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), + MESSAGE_RATE_TOO_HIGH((byte) 0x96, ""), + QUOTA_EXCEEDED((byte) 0x97, "已超限"), + ADMINISTRATIVE_ACTION((byte) 0x98, ""), + PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), + RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), + QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), + USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), + SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), + SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E, ""), + CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"), + MAXIMUM_CONNECT_TIME((byte) 0xA0, ""), + SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1, ""), + WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2, ""); + + private final byte code; + private final String desc; + + MqttReasonCode(byte code, String desc) { + this.code = code; + this.desc = desc; + } + + public static MqttReasonCode valueOf(byte b) { + for (MqttReasonCode v : values()) { + if (b == v.code) { + return v; + } + } + throw new IllegalArgumentException("unknown reason code: " + (b & 0xFF)); + } + + public byte getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java index 0a8193f2..cb5a0f95 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java @@ -2,6 +2,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.socket.util.BufferUtils; import java.nio.ByteBuffer; @@ -22,7 +23,7 @@ public class MqttConnAckMessage extends MqttVariableMessage { + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader) { + super(mqttFixedHeader); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId) { + this(mqttFixedHeader, packetId, (byte)0, null); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId, byte reasonCode, MqttProperties mqttProperties) { + super(mqttFixedHeader); + setVariableHeader(new MqttPubReplyVariableHeader(packetId, reasonCode, mqttProperties)); + } + + @Override + public final void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { + int packetId = buffer.getShort(); + MqttPubReplyVariableHeader header; + if (unit.mqttVersion == MqttVersion.MQTT_5){ + byte reasonCode = buffer.get(); + byte propertyLen = buffer.get(); + header = new MqttPubReplyVariableHeader(packetId, reasonCode, null); + }else { + header = new MqttPubReplyVariableHeader(packetId, (byte)0, null); + } + setVariableHeader(header); + } + + + @Override + public void writeTo(MqttWriter mqttWriter) throws IOException { + MqttPubReplyVariableHeader variableHeader = getVariableHeader(); + int variableHeaderBufferSize = 2; // variable part only has a message id + mqttWriter.writeByte(getFixedHeaderByte1(fixedHeader)); + writeVariableLengthInt(mqttWriter, variableHeaderBufferSize); + mqttWriter.writeShort((short) variableHeader.getPacketId()); + mqttWriter.writeByte(variableHeader.getReasonCode()); + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java index 34fde45b..0b4b5a9e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java @@ -3,6 +3,8 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.exception.MqttProcessException; +import org.smartboot.mqtt.common.protocol.DecodeUnit; +import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.socket.util.BufferUtils; import org.smartboot.socket.util.DecoderException; @@ -141,7 +143,7 @@ public class MqttMessage extends ToString { * * @param buffer */ - public void decodeVariableHeader(ByteBuffer buffer) { + public void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java index 64ea394a..ed85e1b9 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java @@ -10,6 +10,12 @@ public class MqttPacketIdVariableHeader extends MqttVariableHeader { */ private int packetId; + public MqttPacketIdVariableHeader() { + } + + public MqttPacketIdVariableHeader(int packetId) { + this.packetId = packetId; + } public void setPacketId(int packetId) { this.packetId = packetId; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java index ff4fd6a4..ee86a217 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,7 +29,7 @@ public class MqttPacketIdentifierMessage extends MqttVariableMessage 0xffff) { + throw new IllegalArgumentException("packetId: " + packetId + " (should be: 1 ~ 65535)"); + } + this.reasonCode = reasonCode; + this.properties = properties; + } + + public byte getReasonCode() { + return reasonCode; + } + + public MqttProperties getProperties() { + return properties; + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java index d6280122..39e508d1 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.socket.util.DecoderException; @@ -25,7 +26,7 @@ public class MqttPublishMessage extends MqttVariableMessage { if (payloadBuffer.remaining() < remainingLength) { break; } - unit.mqttMessage.decodeVariableHeader(payloadBuffer); + unit.mqttMessage.decodeVariableHeader(unit, payloadBuffer); unit.state = READ_PAYLOAD; @@ -181,10 +182,4 @@ public class MqttProtocol implements Protocol { READ_FIXED_HEADER, READ_VARIABLE_HEADER, READ_PAYLOAD, FINISH, } - class DecodeUnit { - DecoderState state; - MqttMessage mqttMessage; - - ByteBuffer disposableBuffer; - } } -- Gitee