diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java index 80519e345b0d1105f5b338801dce603ab1053a61..5bf0c08817d127196289740afe91da248c0e064e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java @@ -2,13 +2,9 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; -import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.*; import org.smartboot.mqtt.common.util.ValidateUtils; import java.util.Objects; @@ -26,7 +22,7 @@ public abstract class QosPublisher { //至少一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubAckMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBACK, "invalid message type"); future.complete(true); session.responseConsumers.remove(cacheKey); LOGGER.info("Qos1消息发送成功..."); @@ -44,13 +40,13 @@ public abstract class QosPublisher { CompletableFuture publishFuture = new CompletableFuture<>(); //只有一次 session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubRecMessage, "invalid message type"); + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBREC, "invalid message type"); ValidateUtils.isTrue(Objects.equals(message.getVariableHeader().getPacketId(), publishMessage.getVariableHeader().getPacketId()), "invalid packetId"); publishFuture.complete(true); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(message.getVariableHeader().getPacketId()); CompletableFuture pubRelFuture = new CompletableFuture<>(); session.responseConsumers.put(cacheKey, new AckMessage(pubRelMessage, compMessage -> { - ValidateUtils.isTrue(compMessage instanceof MqttPubCompMessage, "invalid message type"); + ValidateUtils.isTrue(compMessage.getFixedHeader().getMessageType() == MqttMessageType.PUBCOMP, "invalid message type"); ValidateUtils.isTrue(Objects.equals(compMessage.getVariableHeader().getPacketId(), pubRelMessage.getVariableHeader().getPacketId()), "invalid packetId"); pubRelFuture.complete(true); LOGGER.info("Qos2消息发送成功..."); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java index 8084318396a1ef2bd045a7dd7e1302d9651aa129..aa9554bc8f3b9f14548f6bf995ac84c5e9e4673c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttConnectReturnCode.java @@ -4,6 +4,7 @@ package org.smartboot.mqtt.common.enums; * */ public enum MqttConnectReturnCode { + //MQTT3 CONNECTION_ACCEPTED((byte) 0x00, "连接已被服务端接受"), CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01, "服务端不支持客户端请求的 MQTT 协议级别"), CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02, "客户端标识符是正确的 UTF-8 编码,但服务 端不允许使用"), @@ -11,7 +12,6 @@ public enum MqttConnectReturnCode { CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04, "用户名或密码的数据格式无效"), CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05, "客户端未被授权连接到此服务器"); - private final byte code; private final String desc; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java index cfd7a6c9d972b2c4473740396b03ea2181dd4728..94fcc5c5ef686136b95e6151b99037b48bfe4f56 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttProtocolEnum.java @@ -5,7 +5,7 @@ package org.smartboot.mqtt.common.enums; * @version V1.0 , 2022/3/23 */ public enum MqttProtocolEnum { - MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"); + MQTT_3_1("MQIsdp"), MQTT_3_1_1("MQTT"), MQTT_5("MQTT"); /** * 协议名 */ diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java new file mode 100644 index 0000000000000000000000000000000000000000..aecd578d8306062ba3266cfcea5521219d9ce5cf --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttReasonCode.java @@ -0,0 +1,78 @@ +package org.smartboot.mqtt.common.enums; + +/** + * + */ +public enum MqttReasonCode { + //MQTT5 + SUCCESS((byte) 0x00, "成功"), + NORMAL_DISCONNECTION((byte) 0x00, "断开连接"), + GRANTED_QOS0((byte) 0x00, "最大允许qos为0"), + GRANTED_QOS1((byte) 0x01, "最大允许qos为1"), + GRANTED_QOS2((byte) 0x02, "最大允许qos为2"), + DISCONNECT_WITH_WILL_MESSAGE((byte) 0x04, "客户端需要断开连接后发送遗嘱消息"), + NO_MATCHING_SUBSCRIBERS((byte) 0X10, "无匹配的订阅者"), + N0_SUBSCRIPTION_EXISTED((byte) 0X11, ""), + CONTINUE_AUTHENTICATION((byte) 0X18, ""), + RE_AUTHENTICATE((byte) 0X19, ""), + UNSPECIFIED_ERROR((byte) 0x80, "未指明的错误"), + MALFORMED_PACKET((byte) 0x81, "数据未正确解析"), + PROTOCOL_ERROR((byte) 0x82, "协议版本错误"), + IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83, "接收者不接受"), + UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84, "服务端不支持此版本协议"), + CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85, "不允许的客户端id"), + BAD_USERNAME_OR_PASSWORD((byte) 0x86, "不接受的用户名或密码"), + NOT_AUTHORIZED((byte) 0x87, "未授权"), + SERVER_UNAVAILABLE_5((byte) 0x88, "服务端不可用"), + SERVER_BUSY((byte) 0x89, "服务端繁忙中"), + BANNED((byte) 0x8A, "客户端被禁用"), + SERVER_SHUTTING_DOWN((byte) 0x8B, ""), + BAD_AUTHENTICATION_METHOD((byte) 0x8C, "错误的认证方法"), + KEEP_ALIVE_TIMEOUT((byte) 0x8D, ""), + SESSION_TAKEN_OVER((byte) 0x8E, "相同客户端id上线导致被踢出下线"), + TOPIC_FILTER_INVALID((byte) 0x8F, "消息过滤非法"), + TOPIC_NAME_INVALID((byte) 0x90, "topic名非法"), + PACKET_IDENTIFIER_IN_USE((byte) 0x91, "packetId已被使用"), + PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92, ""), + RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93, ""), + TOPIC_ALIAS_INVALID((byte) 0x94, ""), + PACKET_TOO_LARGE((byte) 0x95, "包大小超限"), + MESSAGE_RATE_TOO_HIGH((byte) 0x96, ""), + QUOTA_EXCEEDED((byte) 0x97, "已超限"), + ADMINISTRATIVE_ACTION((byte) 0x98, ""), + PAYLOAD_FORMAT_INVALID((byte) 0x99, "数据格式非法"), + RETAIN_NOT_SUPPORTED((byte) 0x9A, "不支持保留消息"), + QOS_NOT_SUPPORTED((byte) 0x9B, "不支持此qos"), + USE_ANOTHER_SERVER((byte) 0x9C, "客户端需要暂时使用另一节点"), + SERVER_MOVED((byte) 0x9D, "客户端需要永久使用另一节点"), + SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E, ""), + CONNECTION_RATE_EXCEEDED((byte) 0x9F, "连接速率超限"), + MAXIMUM_CONNECT_TIME((byte) 0xA0, ""), + SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1, ""), + WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2, ""); + + private final byte code; + private final String desc; + + MqttReasonCode(byte code, String desc) { + this.code = code; + this.desc = desc; + } + + public static MqttReasonCode valueOf(byte b) { + for (MqttReasonCode v : values()) { + if (b == v.code) { + return v; + } + } + throw new IllegalArgumentException("unknown reason code: " + (b & 0xFF)); + } + + public byte getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java index 5aa5040fd73a786dd097c92b5788a4a0aff94d9b..58ee607c299686775274b08f7f5d4f364d543660 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttVersion.java @@ -1,7 +1,7 @@ package org.smartboot.mqtt.common.enums; public enum MqttVersion { - MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4); + MQTT_3_1(MqttProtocolEnum.MQTT_3_1, (byte) 3), MQTT_3_1_1(MqttProtocolEnum.MQTT_3_1_1, (byte) 4), MQTT_5(MqttProtocolEnum.MQTT_5, (byte) 5); private final MqttProtocolEnum protocol; private final byte level; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java index 0a8193f2a460a15afc9303136461868d11dad95b..cb5a0f95e517d9c21a58828bdc2cee20f58a63a3 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnAckMessage.java @@ -2,6 +2,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.socket.util.BufferUtils; import java.nio.ByteBuffer; @@ -22,7 +23,7 @@ public class MqttConnAckMessage extends MqttVariableMessage { + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader) { + super(mqttFixedHeader); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId) { + this(mqttFixedHeader, packetId, (byte)0, null); + } + + public MqttIdPropertyMessage(MqttFixedHeader mqttFixedHeader, int packetId, byte reasonCode, MqttProperties mqttProperties) { + super(mqttFixedHeader); + setVariableHeader(new MqttPubReplyVariableHeader(packetId, reasonCode, mqttProperties)); + } + + @Override + public final void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { + int packetId = buffer.getShort(); + MqttPubReplyVariableHeader header; + if (unit.mqttVersion == MqttVersion.MQTT_5){ + byte reasonCode = buffer.get(); + byte propertyLen = buffer.get(); + header = new MqttPubReplyVariableHeader(packetId, reasonCode, null); + }else { + header = new MqttPubReplyVariableHeader(packetId, (byte)0, null); + } + setVariableHeader(header); + } + + + @Override + public void writeTo(MqttWriter mqttWriter) throws IOException { + MqttPubReplyVariableHeader variableHeader = getVariableHeader(); + int variableHeaderBufferSize = 2; // variable part only has a message id + mqttWriter.writeByte(getFixedHeaderByte1(fixedHeader)); + writeVariableLengthInt(mqttWriter, variableHeaderBufferSize); + mqttWriter.writeShort((short) variableHeader.getPacketId()); + mqttWriter.writeByte(variableHeader.getReasonCode()); + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java index 34fde45b05ac5e1f358b14d7bff9439da9585651..0b4b5a9e3207655ea2af8df4b4a16650d38be7e0 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java @@ -3,6 +3,8 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.exception.MqttProcessException; +import org.smartboot.mqtt.common.protocol.DecodeUnit; +import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.socket.util.BufferUtils; import org.smartboot.socket.util.DecoderException; @@ -141,7 +143,7 @@ public class MqttMessage extends ToString { * * @param buffer */ - public void decodeVariableHeader(ByteBuffer buffer) { + public void decodeVariableHeader(DecodeUnit unit, ByteBuffer buffer) { } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java index 64ea394a58f497f9eba454871a95fd083fcd7623..ed85e1b970c850277fde68b8ddf5de4fd8a9b223 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdVariableHeader.java @@ -10,6 +10,12 @@ public class MqttPacketIdVariableHeader extends MqttVariableHeader { */ private int packetId; + public MqttPacketIdVariableHeader() { + } + + public MqttPacketIdVariableHeader(int packetId) { + this.packetId = packetId; + } public void setPacketId(int packetId) { this.packetId = packetId; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java index ff4fd6a4c7655c8bd3099bef33ea6ed43508cbc7..ee86a2176b48c69cc040ab04f7cfa2aafe7cacd3 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPacketIdentifierMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,7 +29,7 @@ public class MqttPacketIdentifierMessage extends MqttVariableMessage property type + */ + public abstract static class MqttProperty { + final T value; + final int propertyId; + + protected MqttProperty(int propertyId, T value) { + this.propertyId = propertyId; + this.value = value; + } + + /** + * Get MQTT property value + * + * @return property value + */ + public T value() { + return value; + } + + /** + * Get MQTT property ID + * @return property ID + */ + public int propertyId() { + return propertyId; + } + + @Override + public int hashCode() { + return propertyId + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + MqttProperty that = (MqttProperty) obj; + return this.propertyId == that.propertyId && this.value.equals(that.value); + } + } + + public static final class IntegerProperty extends MqttProperty { + + public IntegerProperty(int propertyId, Integer value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "IntegerProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringProperty extends MqttProperty { + + public StringProperty(int propertyId, String value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "StringProperty(" + propertyId + ", " + value + ")"; + } + } + + public static final class StringPair { + public final String key; + public final String value; + + public StringPair(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public int hashCode() { + return key.hashCode() + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + StringPair that = (StringPair) obj; + + return that.key.equals(this.key) && that.value.equals(this.value); + } + } + + //User properties are the only properties that may be included multiple times and + //are the only properties where ordering is required. Therefore, they need a special handling + public static final class UserProperties extends MqttProperty> { + public UserProperties() { + super(MqttPropertyType.USER_PROPERTY.value, new ArrayList()); + } + + /** + * Create user properties from the collection of the String pair values + * + * @param values string pairs. Collection entries are copied, collection itself isn't shared + */ + public UserProperties(Collection values) { + this(); + this.value.addAll(values); + } + + private static UserProperties fromUserPropertyCollection(Collection properties) { + UserProperties userProperties = new UserProperties(); + for (UserProperty property: properties) { + userProperties.add(new StringPair(property.value.key, property.value.value)); + } + return userProperties; + } + + public void add(StringPair pair) { + this.value.add(pair); + } + + public void add(String key, String value) { + this.value.add(new StringPair(key, value)); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("UserProperties("); + boolean first = true; + for (StringPair pair: value) { + if (!first) { + builder.append(", "); + } + builder.append(pair.key + "->" + pair.value); + first = false; + } + builder.append(")"); + return builder.toString(); + } + } + + public static final class UserProperty extends MqttProperty { + public UserProperty(String key, String value) { + super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value)); + } + + @Override + public String toString() { + return "UserProperty(" + value.key + ", " + value.value + ")"; + } + } + + public static final class BinaryProperty extends MqttProperty { + + public BinaryProperty(int propertyId, byte[] value) { + super(propertyId, value); + } + + @Override + public String toString() { + return "BinaryProperty(" + propertyId + ", " + value.length + " bytes)"; + } + } + + + private Map props; + private List userProperties; + private List subscriptionIds; + + public void add(MqttProperty property) { + Map props = this.props; + if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) { + List userProperties = this.userProperties; + if (userProperties == null) { + userProperties = new ArrayList(1); + this.userProperties = userProperties; + } + if (property instanceof UserProperty) { + userProperties.add((UserProperty) property); + } else if (property instanceof UserProperties) { + for (StringPair pair: ((UserProperties) property).value) { + userProperties.add(new UserProperty(pair.key, pair.value)); + } + } else { + throw new IllegalArgumentException("User property must be of UserProperty or UserProperties type"); + } + } else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null) { + subscriptionIds = new ArrayList(1); + this.subscriptionIds = subscriptionIds; + } + if (property instanceof IntegerProperty) { + subscriptionIds.add((IntegerProperty) property); + } else { + throw new IllegalArgumentException("Subscription ID must be an integer property"); + } + } else { + if (props == null) { + props = new HashMap<>(); + this.props = props; + } + props.put(property.propertyId, property); + } + } + + public Collection listAll() { + Map props = this.props; + if (props == null && subscriptionIds == null && userProperties == null) { + return Collections.emptyList(); + } + if (subscriptionIds == null && userProperties == null) { + return props.values(); + } + if (props == null && userProperties == null) { + return subscriptionIds; + } + List propValues = new ArrayList(props != null ? props.size() : 1); + if (props != null) { + propValues.addAll(props.values()); + } + if (subscriptionIds != null) { + propValues.addAll(subscriptionIds); + } + if (userProperties != null) { + propValues.add(UserProperties.fromUserPropertyCollection(userProperties)); + } + return propValues; + } + + public boolean isEmpty() { + Map props = this.props; + return props == null || props.isEmpty(); + } + + /** + * Get property by ID. If there are multiple properties of this type (can be with Subscription ID) + * then return the first one. + * + * @param propertyId ID of the property + * @return a property if it is set, null otherwise + */ + public MqttProperty getProperty(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + //special handling to keep compatibility with earlier versions + List userProperties = this.userProperties; + if (userProperties == null) { + return null; + } + return UserProperties.fromUserPropertyCollection(userProperties); + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + List subscriptionIds = this.subscriptionIds; + if (subscriptionIds == null || subscriptionIds.isEmpty()) { + return null; + } + return subscriptionIds.get(0); + } + Map props = this.props; + return props == null ? null : props.get(propertyId); + } + + /** + * Get properties by ID. + * Some properties (Subscription ID and User Properties) may occur multiple times, + * this method returns all their values in order. + * + * @param propertyId ID of the property + * @return all properties having specified ID + */ + public List getProperties(int propertyId) { + if (propertyId == MqttPropertyType.USER_PROPERTY.value) { + return userProperties == null ? Collections.emptyList() : userProperties; + } + if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) { + return subscriptionIds == null ? Collections.emptyList() : subscriptionIds; + } + Map props = this.props; + return (props == null || !props.containsKey(propertyId)) ? + Collections.emptyList() : + Collections.singletonList(props.get(propertyId)); + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java index 670a9ed9f1e679a9ac16aa33653af5c0734f3e18..f1b8f364631cea60bf8b2811bf65dc01b3deffff 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubCompMessage.java @@ -9,7 +9,7 @@ public class MqttPubCompMessage extends MqttPacketIdentifierMessage { super(mqttFixedHeader); } - public MqttPubCompMessage(int mqttPacketIdVariableHeader) { - super(MqttFixedHeader.PUB_COMP_HEADER, mqttPacketIdVariableHeader); + public MqttPubCompMessage(int packetId) { + super(MqttFixedHeader.PUB_COMP_HEADER, packetId); } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..7b72dd97b2fb08e31d26070c6460c3e906f6bae3 --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubReplyVariableHeader.java @@ -0,0 +1,24 @@ +package org.smartboot.mqtt.common.message; + +public class MqttPubReplyVariableHeader extends MqttPacketIdVariableHeader{ + + private final byte reasonCode; + private final MqttProperties properties; + + public MqttPubReplyVariableHeader(int packetId, byte reasonCode, MqttProperties properties) { + super(packetId); + if (packetId < 1 || packetId > 0xffff) { + throw new IllegalArgumentException("packetId: " + packetId + " (should be: 1 ~ 65535)"); + } + this.reasonCode = reasonCode; + this.properties = properties; + } + + public byte getReasonCode() { + return reasonCode; + } + + public MqttProperties getProperties() { + return properties; + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java index d6280122bef726fbfdc0043ca9cec9148943591e..39e508d1580df8b689b1ec3550fe2c968071a54c 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java @@ -1,6 +1,7 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; +import org.smartboot.mqtt.common.protocol.DecodeUnit; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.socket.util.DecoderException; @@ -25,7 +26,7 @@ public class MqttPublishMessage extends MqttVariableMessage { if (payloadBuffer.remaining() < remainingLength) { break; } - unit.mqttMessage.decodeVariableHeader(payloadBuffer); + unit.mqttMessage.decodeVariableHeader(unit, payloadBuffer); unit.state = READ_PAYLOAD; @@ -181,10 +182,4 @@ public class MqttProtocol implements Protocol { READ_FIXED_HEADER, READ_VARIABLE_HEADER, READ_PAYLOAD, FINISH, } - class DecodeUnit { - DecoderState state; - MqttMessage mqttMessage; - - ByteBuffer disposableBuffer; - } }