diff --git a/pom.xml b/pom.xml index dd9ff4f03d4613468096461d6271b6ba7be8aa14..a9aca2c0cd934e1117a0398e9762332063c1ed5d 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.16 + 0.17 4.0.0 mqtt broker - 0.16 + 0.17 1.5.25 1.1.22 2.6 diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index 6be6788478bf8572d221d0159384caebfc66ac5a..aff412d15e96f3f07835fdf8df2bc66ebbdee8c3 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.16 + 0.17 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index c06ab594e5b6b898d081a66381671326204679de..bcf1c70c67999106ceef54a1344b437a9d9ae415 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -28,7 +28,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.16"; + public static final String VERSION = "v0.17"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 7a198172800ed495fce239b3edebf44254ebede9..deb6611524105da9535c8f4130474d4a2e20b4a8 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -18,7 +18,7 @@ import org.smartboot.mqtt.broker.provider.impl.ConfiguredConnectAuthenticationPr import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; -import org.smartboot.mqtt.common.MqttMessageBuilders; +import org.smartboot.mqtt.common.QosRetryPlugin; import org.smartboot.mqtt.common.enums.MqttMetricEnum; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; @@ -32,6 +32,7 @@ import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.to.MetricItemTO; +import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.buffer.BufferPagePool; @@ -141,6 +142,7 @@ public class BrokerContextImpl implements BrokerContext { } }); pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); + processor.addPlugin(new QosRetryPlugin()); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); server.start(asynchronousChannelGroup); @@ -293,7 +295,7 @@ public class BrokerContextImpl implements BrokerContext { try { subscriber.batchPublish(BrokerContextImpl.this); } catch (Exception e) { - LOGGER.error("batch publish exception:{}", e.getMessage()); + LOGGER.error("batch publish exception:{}", e.getMessage(), e); } } brokerTopic.getSemaphore().release(); @@ -370,11 +372,13 @@ public class BrokerContextImpl implements BrokerContext { publishBuilder.publishProperties(new PublishProperties()); } InflightQueue inflightQueue = session.getInflightQueue(); - inflightQueue.offer(publishBuilder, offset -> { + long offset = storedMessage.getOffset(); + // retain消息逐个推送 + inflightQueue.offer(publishBuilder, (mqtt) -> { LOGGER.info("publish retain to client:{} success ", session.getClientId()); subscriber.setRetainConsumerOffset(offset + 1); retainPushThreadPool.execute(task); - }, storedMessage.getOffset()); + }); session.flush(); } }); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 048732d41541778f69ef5dc87240932f7538dbfe..6dfbcc104875eb71d21669fb227ad282a3038310 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -103,9 +103,9 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor getOnlineSessions() { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 4a4cb57968ad580b4d66023e9125c90c6bd8b2ea..dc3783e12eac7d87d8ea3f9d713b5424e0d2c148 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -209,6 +209,7 @@ public class MqttSession extends AbstractSession { TopicSubscriber oldOffset = topicFilterSubscriber.getTopicSubscribers().remove(topic.getTopic()); if (oldOffset != null) { TopicSubscriber consumerOffset = oldOffset.getTopic().getConsumeOffsets().remove(this); + consumerOffset.disable(); LOGGER.info("remove topic:{} {},", topic, oldOffset == consumerOffset ? "success" : "fail"); } }); @@ -244,6 +245,7 @@ public class MqttSession extends AbstractSession { TopicSubscriber removeSubscriber = subscriber.getTopic().getConsumeOffsets().remove(this); retainOffsetCache.put(subscriber.getTopic(), subscriber.getRetainConsumerOffset()); if (subscriber == removeSubscriber) { + removeSubscriber.disable(); LOGGER.debug("remove subscriber:{} success!", subscriber.getTopic().getTopic()); } else { LOGGER.error("remove subscriber:{} error!", removeSubscriber); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 2e2ce334a96e9801ab616021d4fd268f1ffe46ce..8bf03ebe47b7eedfe14c75d4a953501bd3fd667c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -5,12 +5,12 @@ import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.provider.PersistenceProvider; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; import org.smartboot.mqtt.common.InflightQueue; -import org.smartboot.mqtt.common.MqttMessageBuilders; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; +import org.smartboot.mqtt.common.util.MqttMessageBuilders; import java.util.concurrent.Semaphore; @@ -50,6 +50,7 @@ public class TopicSubscriber { private TopicToken topicFilterToken; private final Semaphore semaphore = new Semaphore(0); + private boolean enable = true; public TopicSubscriber(BrokerTopic topic, MqttSession session, MqttQoS mqttQoS, long nextConsumerOffset, long retainConsumerOffset) { this.topic = topic; @@ -60,7 +61,7 @@ public class TopicSubscriber { } public void batchPublish(BrokerContext brokerContext) { - if (mqttSession.isDisconnect()) { + if (mqttSession.isDisconnect() || !enable) { return; } semaphore.release(); @@ -88,7 +89,8 @@ public class TopicSubscriber { } InflightQueue inflightQueue = mqttSession.getInflightQueue(); - boolean suc = inflightQueue.offer(publishBuilder, offset -> { + long offset = persistenceMessage.getOffset(); + boolean suc = inflightQueue.offer(publishBuilder, (mqtt) -> { if (mqttQoS == MqttQoS.AT_MOST_ONCE) { nextConsumerOffset = persistenceMessage.getOffset() + 1; } @@ -98,10 +100,8 @@ public class TopicSubscriber { setRetainConsumerOffset(getRetainConsumerOffset() + 1); } commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); -// if (inflightQueue.getCount() == 0) { publish0(brokerContext, 0); -// } - }, persistenceMessage.getOffset()); + }); // 飞行队列已满 if (!suc) { // LOGGER.info("queue is full..." + expectConsumerOffset); @@ -165,4 +165,8 @@ public class TopicSubscriber { public void setTopicFilterToken(TopicToken topicFilterToken) { this.topicFilterToken = topicFilterToken; } + + public void disable() { + this.enable = false; + } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 6f57c96e424411e70bb35daf5a5beef3918a202d..4d1ceb57d74827a58eb01e3ad2454c17514681d9 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -10,7 +10,6 @@ import org.smartboot.mqtt.broker.eventbus.ServerEventType; import org.smartboot.mqtt.broker.provider.SessionStateProvider; import org.smartboot.mqtt.broker.provider.impl.session.SessionState; import org.smartboot.mqtt.common.InflightQueue; -import org.smartboot.mqtt.common.MqttMessageBuilders; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; import org.smartboot.mqtt.common.enums.MqttProtocolEnum; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -25,6 +24,7 @@ import org.smartboot.mqtt.common.message.variable.MqttConnAckVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ConnectAckProperties; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; +import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index d997d6ab8fe746df9e532a3e5bb42c85967c5f03..2d27cd859fc956e2532ea68b8ed018c91375de3c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -13,9 +13,11 @@ import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; +import org.smartboot.mqtt.common.util.ValidateUtils; /** * 发布Topic @@ -101,6 +103,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor { + ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message"); //发送pubRel消息。 //todo MqttPubQosVariableHeader qosVariableHeader; diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java index a9e6904cf63343c6fa9008077dd766c370c46943..a382b5f6f0d4f94a122f22b3af2c35136d644592 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java @@ -1,6 +1,6 @@ package org.smartboot.mqtt.broker.provider.impl.session; -import org.smartboot.mqtt.common.AckMessage; +import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.enums.MqttQoS; import java.util.HashMap; @@ -11,11 +11,11 @@ import java.util.Map; * @version V1.0 , 2022/4/15 */ public class SessionState { - protected final Map responseConsumers = new HashMap<>(); + protected final Map responseConsumers = new HashMap<>(); private final Map subscribers = new HashMap<>(); - public Map getResponseConsumers() { + public Map getResponseConsumers() { return responseConsumers; } diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index 4b2a1983dd6271c2539fd921c4f9c0ef48354f77..3715956da47803f2fe8179334b7749c2b22053d3 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.16 + 0.17 ../pom.xml 4.0.0 @@ -16,6 +16,12 @@ org.smartboot.mqtt smart-mqtt-common + + org.slf4j + slf4j-simple + 2.0.0 + test + junit junit diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index e42ba2a2d25309f4f6df2a79f66da0c78dc07138..e13c6c1b60f78544fbf664409b639b6219da0a6e 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -6,18 +6,18 @@ import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.DefaultMqttWriter; import org.smartboot.mqtt.common.InflightQueue; -import org.smartboot.mqtt.common.MqttMessageBuilders; +import org.smartboot.mqtt.common.QosRetryPlugin; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttConnectReturnCode; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBusImpl; import org.smartboot.mqtt.common.eventbus.EventType; +import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; import org.smartboot.mqtt.common.message.MqttDisconnectMessage; import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPingReqMessage; import org.smartboot.mqtt.common.message.MqttPingRespMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; @@ -25,16 +25,15 @@ import org.smartboot.mqtt.common.message.MqttSubAckMessage; import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttTopicSubscription; import org.smartboot.mqtt.common.message.MqttUnsubAckMessage; -import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.mqtt.common.message.payload.MqttConnectPayload; import org.smartboot.mqtt.common.message.payload.WillMessage; import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader; -import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; import org.smartboot.mqtt.common.message.variable.properties.SubscribeProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; +import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; @@ -206,6 +205,7 @@ public class MqttClient extends AbstractSession { }, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS); } // messageProcessor.addPlugin(new StreamMonitorPlugin<>()); + messageProcessor.addPlugin(new QosRetryPlugin()); client = new AioQuickClient(clientConfigure.getHost(), clientConfigure.getPort(), new MqttProtocol(clientConfigure.getMaxPacketSize()), messageProcessor); try { if (bufferPagePool != null) { @@ -260,7 +260,7 @@ public class MqttClient extends AbstractSession { return this; } - public void unsubscribe0(String[] topics) { + private void unsubscribe0(String[] topics) { Set unsubscribedTopics = new HashSet<>(topics.length); for (String unsubscribedTopic : topics) { if (subscribes.containsKey(unsubscribedTopic)) { @@ -273,27 +273,26 @@ public class MqttClient extends AbstractSession { return; } - MqttMessageBuilders.UnsubscribeBuilder unsubscribeBuilder = MqttMessageBuilders.unsubscribe().packetId(newPacketId()); + MqttMessageBuilders.UnsubscribeBuilder unsubscribeBuilder = MqttMessageBuilders.unsubscribe(); unsubscribedTopics.forEach(unsubscribeBuilder::addTopicFilter); //todo - ReasonProperties properties = null; if (getMqttVersion() == MqttVersion.MQTT_5) { - properties = new ReasonProperties(); + ReasonProperties properties = new ReasonProperties(); + unsubscribeBuilder.properties(properties); } - MqttUnsubscribeMessage unsubscribedMessage = unsubscribeBuilder.build(properties); // wait ack message. - responseConsumers.put(unsubscribedMessage.getVariableHeader().getPacketId(), new Consumer>() { - @Override - public void accept(MqttPacketIdentifierMessage message) { + try { + getInflightQueue().put(unsubscribeBuilder, (message) -> { ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); for (String unsubscribedTopic : unsubscribedTopics) { subscribes.remove(unsubscribedTopic); wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter())); } - } - }); - write(unsubscribedMessage); + }); + } catch (InterruptedException e) { + throw new MqttException("unsubscribe topic exception", e); + } } public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer consumer) { @@ -305,7 +304,7 @@ public class MqttClient extends AbstractSession { } public MqttClient subscribe(String[] topics, MqttQoS[] qos, BiConsumer consumer) { - subscribe0(topics, qos, consumer, (mqttClient, mqttQoS) -> { + subscribe(topics, qos, consumer, (mqttClient, mqttQoS) -> { }); return this; } @@ -320,7 +319,7 @@ public class MqttClient extends AbstractSession { } private void subscribe0(String[] topic, MqttQoS[] qos, BiConsumer consumer, BiConsumer subAckConsumer) { - MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe().packetId(newPacketId()); + MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe(); for (int i = 0; i < topic.length; i++) { subscribeBuilder.addSubscription(qos[i], topic[i]); } @@ -329,13 +328,10 @@ public class MqttClient extends AbstractSession { subscribeBuilder.subscribeProperties(new SubscribeProperties()); } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - - responseConsumers.put(subscribeMessage.getVariableHeader().getPacketId(), new Consumer>() { - @Override - public void accept(MqttPacketIdentifierMessage message) { + try { + getInflightQueue().put(subscribeBuilder, (message) -> { List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); - ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); int i = 0; for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) { MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++))); @@ -352,9 +348,10 @@ public class MqttClient extends AbstractSession { } subAckConsumer.accept(MqttClient.this, minQos); } - } - }); - write(subscribeMessage); + }); + } catch (InterruptedException e) { + throw new MqttException("subscribe topic exception", e); + } } public void notifyResponse(MqttConnAckMessage connAckMessage) { @@ -394,26 +391,18 @@ public class MqttClient extends AbstractSession { private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) { InflightQueue inflightQueue = getInflightQueue(); - boolean suc = inflightQueue.offer(publishBuilder, offset -> { - consumer.accept(publishBuilder.getPacketId()); - //最早发送的消息若收到响应,则更新点位 - if (offset != -1) { + try { + inflightQueue.put(publishBuilder, (message) -> { + consumer.accept(message.getVariableHeader().getPacketId()); + //最早发送的消息若收到响应,则更新点位 synchronized (MqttClient.this) { MqttClient.this.notifyAll(); } - } - }, 1); - flush(); - if (!suc) { - try { - synchronized (this) { - wait(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - publish(publishBuilder, consumer); + }); + } catch (InterruptedException e) { + throw new MqttException("publish message exception", e); } + flush(); } public MqttClientConfigure getClientConfigure() { diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java index dd45017987b38a5549f01d4f0a36b4335a104c8f..cf3b1e87e8c14f0c920a2a534bda3d1d5fcb027c 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java @@ -10,11 +10,13 @@ import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; import org.smartboot.mqtt.common.util.TopicTokenUtil; +import org.smartboot.mqtt.common.util.ValidateUtils; /** * 发布Topic @@ -93,6 +95,7 @@ public class PublishProcessor implements MqttProcessor { MqttPubRecMessage pubRecMessage = new MqttPubRecMessage(variableHeader); session.write(pubRecMessage, message -> { + ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message"); //todo ReasonProperties reasonProperties = null; if (mqttPublishMessage.getVersion() == MqttVersion.MQTT_5) { diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 6203e97b48a5242c242d94538b43ea303c464c88..6c75836c2044ee7e10cd7aebbc04999988185ae5 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.16 + 0.17 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index 5ab9f5294b542b024eaabc17824c1376fe3ea890..e27c960bb472cca716f5f40038cb483708e4a592 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -1,7 +1,5 @@ package org.smartboot.mqtt.common; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; @@ -9,18 +7,21 @@ import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; -import org.smartboot.mqtt.common.message.MqttPubQosMessage; +import org.smartboot.mqtt.common.message.MqttPubRecMessage; +import org.smartboot.mqtt.common.message.MqttSubscribeMessage; +import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.transport.AioSession; import org.smartboot.socket.util.Attachment; +import org.smartboot.socket.util.QuickTimerTask; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -28,12 +29,6 @@ import java.util.function.Consumer; * @version V1.0 , 2022/4/12 */ public abstract class AbstractSession { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class); - private static final int QOS0_PACKET_ID = 0; - /** - * 用于生成当前会话的报文标识符 - */ - private final AtomicInteger packetIdCreator = new AtomicInteger(1); private final EventBus eventBus; protected String clientId; protected AioSession session; @@ -55,22 +50,41 @@ public abstract class AbstractSession { private MqttVersion mqttVersion; private InflightQueue inflightQueue; - protected Map>> responseConsumers = new ConcurrentHashMap<>(); + private final Map ackMessageCacheMap = new ConcurrentHashMap<>(); public AbstractSession(EventBus eventBus) { this.eventBus = eventBus; } public final void write(MqttPacketIdentifierMessage mqttMessage, Consumer> consumer) { - responseConsumers.put(mqttMessage.getVariableHeader().getPacketId(), consumer); + QosMessage ackMessage = new QosMessage(mqttMessage, consumer); + switch (mqttMessage.getFixedHeader().getQosLevel()) { + case AT_MOST_ONCE: + ValidateUtils.isTrue(mqttMessage instanceof MqttPubRecMessage, "invalid message instance"); + //超时移除即可, + break; + case AT_LEAST_ONCE: + ValidateUtils.isTrue(mqttMessage instanceof MqttSubscribeMessage || mqttMessage instanceof MqttUnsubscribeMessage, "invalid message instance"); + //重新发送subscribe或unSubscribe消息 + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(() -> { + if (!ackMessage.isCommit()) { + write(mqttMessage, consumer); + } + }, 1, TimeUnit.SECONDS); + default: + throw new UnsupportedOperationException(); + } + ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), ackMessage); write(mqttMessage, false); } public final void notifyResponse(MqttPacketIdentifierMessage message) { - if (message instanceof MqttPubQosMessage && message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) { - inflightQueue.notify((MqttPubQosMessage) message); + if (message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) { + inflightQueue.notify(message); } else { - responseConsumers.remove(message.getVariableHeader().getPacketId()).accept(message); + QosMessage qosMessage = ackMessageCacheMap.remove(message.getVariableHeader().getPacketId()); + qosMessage.setCommit(true); + qosMessage.getConsumer().accept(message); } } @@ -119,18 +133,6 @@ public abstract class AbstractSession { return clientId; } - /** - * 生成大于0的 packetId - */ - public int newPacketId() { - int packageId = packetIdCreator.getAndIncrement(); - if (packageId <= 0) { - packetIdCreator.set(0); - return newPacketId(); - } - return packageId; - } - public InetSocketAddress getRemoteAddress() throws IOException { return session.getRemoteAddress(); } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java deleted file mode 100644 index 9d480c6d9d37d930e8e6233c7e3f371d1ca80f3f..0000000000000000000000000000000000000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.smartboot.mqtt.common; - -import org.smartboot.mqtt.common.enums.MqttMessageType; -import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.message.MqttPublishMessage; - -import java.util.function.Consumer; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2022/4/14 - */ -public class AckMessage { - /** - * 原始消息 - */ - private final MqttPublishMessage originalMessage; - - private MqttMessageType expectMessageType; - /** - * 回调事件 - */ - private final Consumer consumer; - - private final long offset; - - private boolean commit; - - private final int packetId; - - public AckMessage(MqttPublishMessage originalMessage, int packetId, Consumer consumer, long offset) { - this.originalMessage = originalMessage; - this.consumer = consumer; - this.offset = offset; - this.packetId = packetId; - if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_LEAST_ONCE) { - this.expectMessageType = MqttMessageType.PUBACK; - } else if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.EXACTLY_ONCE) { - this.expectMessageType = MqttMessageType.PUBREC; - } - } - - public MqttPublishMessage getOriginalMessage() { - return originalMessage; - } - - - public Consumer getConsumer() { - return consumer; - } - - public MqttMessageType getExpectMessageType() { - return expectMessageType; - } - - public void setExpectMessageType(MqttMessageType expectMessageType) { - this.expectMessageType = expectMessageType; - } - - public long getOffset() { - return offset; - } - - public boolean isCommit() { - return commit; - } - - public void setCommit(boolean commit) { - this.commit = commit; - } - - public int getPacketId() { - return packetId; - } -} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..2431fb30dc1b1b0c0dda2103edd4e54e705a5925 --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java @@ -0,0 +1,113 @@ +package org.smartboot.mqtt.common; + +import org.smartboot.mqtt.common.enums.MqttMessageType; +import org.smartboot.mqtt.common.enums.MqttQoS; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; +import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.MqttSubscribeMessage; +import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; + +import java.util.function.Consumer; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 2022/4/14 + */ +public class InflightMessage { + /** + * 原始消息 + */ + private final MqttPacketIdentifierMessage originalMessage; + private MqttPacketIdentifierMessage responseMessage; + + /** + * 飞行队列为其分配的packetId + */ + private final int assignedPacketId; + + private MqttMessageType expectMessageType; + + private boolean commit; + + private final Consumer> consumer; + + private int retryCount; + + private long latestTime; + + public InflightMessage(int packetId, MqttPacketIdentifierMessage originalMessage, Consumer> consumer) { + this.assignedPacketId = packetId; + this.originalMessage = originalMessage; + this.consumer = consumer; + if (originalMessage instanceof MqttSubscribeMessage) { + this.expectMessageType = MqttMessageType.SUBACK; + } else if (originalMessage instanceof MqttUnsubscribeMessage) { + this.expectMessageType = MqttMessageType.UNSUBACK; + } else if (originalMessage instanceof MqttPublishMessage) { + if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_LEAST_ONCE) { + this.expectMessageType = MqttMessageType.PUBACK; + } else if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.EXACTLY_ONCE) { + this.expectMessageType = MqttMessageType.PUBREC; + } + } else { + throw new UnsupportedOperationException(); + } + + this.latestTime = System.currentTimeMillis(); + } + + public MqttPacketIdentifierMessage getOriginalMessage() { + return originalMessage; + } + + + public MqttMessageType getExpectMessageType() { + return expectMessageType; + } + + public void setExpectMessageType(MqttMessageType expectMessageType) { + this.expectMessageType = expectMessageType; + } + + public boolean isCommit() { + return commit; + } + + public void setCommit(boolean commit) { + this.commit = commit; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public long getLatestTime() { + return latestTime; + } + + public void setLatestTime(long latestTime) { + this.latestTime = latestTime; + } + + public final Consumer> getConsumer() { + return consumer; + } + + public int getAssignedPacketId() { + return assignedPacketId; + } + + public MqttPacketIdentifierMessage getResponseMessage() { + return responseMessage; + } + + public void setResponseMessage(MqttPacketIdentifierMessage responseMessage) { + this.responseMessage = responseMessage; + } + +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index fa38e953040ce61fcfc95f6ee6ab083e8debd241..7a6f88098d36b4ad5dfa8bb16e2713cdb03cf4e9 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -5,13 +5,19 @@ import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; -import org.smartboot.mqtt.common.message.MqttPubQosMessage; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.MqttVariableMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; +import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.ValidateUtils; +import org.smartboot.socket.util.AttachKey; +import org.smartboot.socket.util.Attachment; +import org.smartboot.socket.util.QuickTimerTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -21,7 +27,9 @@ import java.util.function.Consumer; */ public class InflightQueue { private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class); - private final AckMessage[] queue; + static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf("retryTask"); + private static final int TIMEOUT = 3; + private final InflightMessage[] queue; private int takeIndex; private int putIndex; private int count; @@ -29,57 +37,129 @@ public class InflightQueue { private final AtomicInteger packetId = new AtomicInteger(0); private final AbstractSession session; + private boolean lock; public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); - this.queue = new AckMessage[size]; + this.queue = new InflightMessage[size]; this.session = session; } - public boolean offer(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer, long offset) { - int id = 0; - MqttPublishMessage mqttMessage; + public synchronized void put(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) throws InterruptedException { + while (count == queue.length) { + lock = true; + session.flush(); + wait(); + } + offer(publishBuilder, consumer); + } + + public boolean offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { + InflightMessage inflightMessage; synchronized (this) { if (count == queue.length) { return false; } - id = packetId.incrementAndGet(); + int id = packetId.incrementAndGet(); // 16位无符号最大值65535 if (id > 65535) { id = id % queue.length + queue.length; packetId.set(id); } - publishBuilder.packetId(id); - mqttMessage = publishBuilder.build(); - queue[putIndex++] = new AckMessage(mqttMessage, id, consumer, offset); + MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build(); + inflightMessage = new InflightMessage(id, mqttMessage, consumer); + queue[putIndex++] = inflightMessage; if (putIndex == queue.length) { putIndex = 0; } count++; + + //启动消息质量监测 + if (count == 1 && mqttMessage.getFixedHeader().getQosLevel().value() > 0) { + retry(inflightMessage); + } // System.out.println("publish..."); } - session.write(mqttMessage, false); - // QOS直接响应 - if (mqttMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { - long offset1 = commit(id); -// ValidateUtils.isTrue(offset1 == -1 || offset1 == offset, "invalid offset"); - consumer.accept(offset); + session.write(inflightMessage.getOriginalMessage(), false); + // QOS0直接响应 + if (inflightMessage.getOriginalMessage().getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { + inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); + commit(inflightMessage); } return true; } - public void notify(MqttPubQosMessage message) { - AckMessage ackMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; - ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == ackMessage.getExpectMessageType(), "invalid message type"); + /** + * 超时重发 + */ + void retry(InflightMessage inflightMessage) { + if (inflightMessage.isCommit() || session.isDisconnect()) { + return; + } + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() { + @Override + public void run() { + if (inflightMessage.isCommit()) { +// System.out.println("message has commit,ignore retry monitor"); + return; + } + if (session.isDisconnect()) { + LOGGER.debug("session is disconnect , pause qos monitor."); + return; + } + long delay = System.currentTimeMillis() - inflightMessage.getLatestTime(); + if (delay > 0) { + LOGGER.info("the time is not up, try again in {} milliseconds ", delay); + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, delay, TimeUnit.MILLISECONDS); + return; + } + inflightMessage.setLatestTime(System.currentTimeMillis()); + LOGGER.info("time out,retry..."); + switch (inflightMessage.getExpectMessageType()) { + case PUBACK: + case PUBREC: + session.write(inflightMessage.getOriginalMessage()); + break; + case PUBCOMP: + ReasonProperties properties = null; + if (inflightMessage.getOriginalMessage().getVersion() == MqttVersion.MQTT_5) { + properties = new ReasonProperties(); + } + MqttVariableMessage message = inflightMessage.getOriginalMessage(); + MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); + MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); + session.write(pubRelMessage); + break; + default: + throw new UnsupportedOperationException("invalid message type: " + inflightMessage.getExpectMessageType()); + } + inflightMessage.setRetryCount(inflightMessage.getRetryCount() + 1); + //不断重试直至完成 + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, TIMEOUT, TimeUnit.SECONDS); + } + }, TimeUnit.SECONDS.toMillis(TIMEOUT) - (System.currentTimeMillis() - inflightMessage.getLatestTime()), TimeUnit.MILLISECONDS); + } + + /** + * 理论上该方法只会被读回调线程触发 + */ + public void notify(MqttPacketIdentifierMessage message) { + InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type"); + ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId"); + inflightMessage.setResponseMessage(message); + inflightMessage.setLatestTime(System.currentTimeMillis()); switch (message.getFixedHeader().getMessageType()) { - case PUBACK: { - long offset = commit(message.getVariableHeader().getPacketId()); - ackMessage.getConsumer().accept(offset); + case SUBACK: + case UNSUBACK: + case PUBACK: + case PUBCOMP: { + commit(inflightMessage); break; } case PUBREC: - ackMessage.setExpectMessageType(MqttMessageType.PUBCOMP); + inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP); //todo ReasonProperties properties = null; if (message.getVersion() == MqttVersion.MQTT_5) { @@ -89,38 +169,44 @@ public class InflightQueue { MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); session.write(pubRelMessage, false); break; - case PUBCOMP: - long offset = commit(message.getVariableHeader().getPacketId()); - ackMessage.getConsumer().accept(offset); - break; default: throw new RuntimeException(); } } - private synchronized long commit(int packetId) { - int commitIndex = (packetId - 1) % queue.length; - AckMessage ackMessage = queue[commitIndex]; - ValidateUtils.isTrue(ackMessage.getPacketId() == packetId, "invalid message"); - ackMessage.setCommit(true); + private synchronized void commit(InflightMessage inflightMessage) { + MqttVariableMessage originalMessage = inflightMessage.getOriginalMessage(); + ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message"); + inflightMessage.setCommit(true); - if (commitIndex != takeIndex) { - return -1; + if ((inflightMessage.getAssignedPacketId() - 1) % queue.length != takeIndex) { + return; } queue[takeIndex++] = null; count--; if (takeIndex == queue.length) { takeIndex = 0; } + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); while (count > 0 && queue[takeIndex].isCommit()) { - ackMessage = queue[takeIndex]; + inflightMessage = queue[takeIndex]; + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; } count--; } - return ackMessage.getOffset(); + if (lock) { + lock = false; + notifyAll(); + } + if (count > 0) { + //注册超时监听任务 + Attachment attachment = session.session.getAttachment(); + InflightMessage monitorMessage = queue[takeIndex]; + attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); + } } public int getCount() { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..aaea1551b44c46ad61d3d8329c987c4df5ad7393 --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java @@ -0,0 +1,37 @@ +package org.smartboot.mqtt.common; + +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; + +import java.util.function.Consumer; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 2023/3/27 + */ +public class QosMessage { + private final MqttPacketIdentifierMessage message; + private final Consumer> consumer; + private boolean commit; + + public QosMessage(MqttPacketIdentifierMessage message, Consumer> consumer) { + this.message = message; + this.consumer = consumer; + } + + public MqttPacketIdentifierMessage getMessage() { + return message; + } + + public Consumer> getConsumer() { + return consumer; + } + + public boolean isCommit() { + return commit; + } + + public void setCommit(boolean commit) { + this.commit = commit; + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosRetryPlugin.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosRetryPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..445451bb479e22bd0080c7342521d03dde76d06d --- /dev/null +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosRetryPlugin.java @@ -0,0 +1,29 @@ +package org.smartboot.mqtt.common; + +import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.socket.extension.plugins.AbstractPlugin; +import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.util.Attachment; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 2023/3/24 + */ +public class QosRetryPlugin extends AbstractPlugin { + + @Override + public void beforeRead(AioSession session) { + Attachment attachment = session.getAttachment(); + if (attachment == null) { + return; + } + Runnable runnable = attachment.get(InflightQueue.RETRY_TASK_ATTACH_KEY); + if (runnable != null) { + try { + runnable.run(); + } finally { + attachment.remove(InflightQueue.RETRY_TASK_ATTACH_KEY); + } + } + } +} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java index 2172884513861d052cde4604a4716dc5279f241f..4d6f8293c0555d44408d3f21bba3c402fc660f62 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java @@ -13,7 +13,7 @@ import java.nio.ByteBuffer; * @author 三刀 * @version V1.0 , 2018/4/22 */ -public class MqttPublishMessage extends MqttVariableMessage { +public class MqttPublishMessage extends MqttPacketIdentifierMessage { private static final MqttPublishPayload EMPTY_BYTES = new MqttPublishPayload(new byte[0]); private MqttPublishPayload payload; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java similarity index 85% rename from smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java rename to smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java index f70d9843199c96a15498a4b746262aacbfee3e38..14a055f91d0ef1e74ba012e24175ab9c72545965 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java @@ -1,14 +1,16 @@ -package org.smartboot.mqtt.common; +package org.smartboot.mqtt.common.util; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.message.MqttFixedHeader; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttTopicSubscription; import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.mqtt.common.message.payload.MqttSubscribePayload; import org.smartboot.mqtt.common.message.payload.MqttUnsubscribePayload; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttReasonVariableHeader; @@ -22,6 +24,12 @@ import java.util.List; public final class MqttMessageBuilders { + public interface MessageBuilder> { + MessageBuilder packetId(int packetId); + + T build(); + } + private MqttMessageBuilders() { } @@ -38,7 +46,7 @@ public final class MqttMessageBuilders { return new UnsubscribeBuilder(); } - public static final class PublishBuilder { + public static final class PublishBuilder implements MessageBuilder { private String topic; private boolean retained; private MqttQoS qos; @@ -69,8 +77,9 @@ public final class MqttMessageBuilders { return this; } - public void packetId(int packetId) { + public PublishBuilder packetId(int packetId) { this.packetId = packetId; + return this; } public PublishBuilder publishProperties(PublishProperties publishProperties) { @@ -92,7 +101,7 @@ public final class MqttMessageBuilders { } } - public static final class SubscribeBuilder { + public static final class SubscribeBuilder implements MessageBuilder { private List subscriptions; private int packetId; @@ -131,10 +140,11 @@ public final class MqttMessageBuilders { } } - public static final class UnsubscribeBuilder { + public static final class UnsubscribeBuilder implements MessageBuilder { private List topicFilters; private int packetId; + private ReasonProperties properties; UnsubscribeBuilder() { } @@ -152,7 +162,11 @@ public final class MqttMessageBuilders { return this; } - public MqttUnsubscribeMessage build(ReasonProperties properties) { + public void properties(ReasonProperties properties) { + this.properties = properties; + } + + public MqttUnsubscribeMessage build() { MqttUnsubscribePayload mqttSubscribePayload = new MqttUnsubscribePayload(topicFilters); MqttReasonVariableHeader variableHeader = new MqttPubQosVariableHeader(packetId, properties); return new MqttUnsubscribeMessage(MqttFixedHeader.UNSUBSCRIBE_HEADER, variableHeader, mqttSubscribePayload);