diff --git a/pom.xml b/pom.xml
index dd9ff4f03d4613468096461d6271b6ba7be8aa14..a9aca2c0cd934e1117a0398e9762332063c1ed5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,12 +4,12 @@
org.smartboot.mqtt
smart-mqtt
smart-mqtt
- 0.16
+ 0.17
4.0.0
mqtt broker
- 0.16
+ 0.17
1.5.25
1.1.22
2.6
diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml
index 6be6788478bf8572d221d0159384caebfc66ac5a..aff412d15e96f3f07835fdf8df2bc66ebbdee8c3 100644
--- a/smart-mqtt-broker/pom.xml
+++ b/smart-mqtt-broker/pom.xml
@@ -5,7 +5,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.16
+ 0.17
../pom.xml
4.0.0
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
index c06ab594e5b6b898d081a66381671326204679de..bcf1c70c67999106ceef54a1344b437a9d9ae415 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
@@ -28,7 +28,7 @@ public class BrokerConfigure extends ToString {
/**
* 当前smart-mqtt
*/
- public static final String VERSION = "v0.16";
+ public static final String VERSION = "v0.17";
static final Map SystemEnvironments = new HashMap<>();
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
index 7a198172800ed495fce239b3edebf44254ebede9..deb6611524105da9535c8f4130474d4a2e20b4a8 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
@@ -18,7 +18,7 @@ import org.smartboot.mqtt.broker.provider.impl.ConfiguredConnectAuthenticationPr
import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.InflightQueue;
-import org.smartboot.mqtt.common.MqttMessageBuilders;
+import org.smartboot.mqtt.common.QosRetryPlugin;
import org.smartboot.mqtt.common.enums.MqttMetricEnum;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
@@ -32,6 +32,7 @@ import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.to.MetricItemTO;
+import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
@@ -141,6 +142,7 @@ public class BrokerContextImpl implements BrokerContext {
}
});
pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true);
+ processor.addPlugin(new QosRetryPlugin());
server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor);
server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum()));
server.start(asynchronousChannelGroup);
@@ -293,7 +295,7 @@ public class BrokerContextImpl implements BrokerContext {
try {
subscriber.batchPublish(BrokerContextImpl.this);
} catch (Exception e) {
- LOGGER.error("batch publish exception:{}", e.getMessage());
+ LOGGER.error("batch publish exception:{}", e.getMessage(), e);
}
}
brokerTopic.getSemaphore().release();
@@ -370,11 +372,13 @@ public class BrokerContextImpl implements BrokerContext {
publishBuilder.publishProperties(new PublishProperties());
}
InflightQueue inflightQueue = session.getInflightQueue();
- inflightQueue.offer(publishBuilder, offset -> {
+ long offset = storedMessage.getOffset();
+ // retain消息逐个推送
+ inflightQueue.offer(publishBuilder, (mqtt) -> {
LOGGER.info("publish retain to client:{} success ", session.getClientId());
subscriber.setRetainConsumerOffset(offset + 1);
retainPushThreadPool.execute(task);
- }, storedMessage.getOffset());
+ });
session.flush();
}
});
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
index 048732d41541778f69ef5dc87240932f7538dbfe..6dfbcc104875eb71d21669fb227ad282a3038310 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
@@ -103,9 +103,9 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor getOnlineSessions() {
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
index 4a4cb57968ad580b4d66023e9125c90c6bd8b2ea..dc3783e12eac7d87d8ea3f9d713b5424e0d2c148 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
@@ -209,6 +209,7 @@ public class MqttSession extends AbstractSession {
TopicSubscriber oldOffset = topicFilterSubscriber.getTopicSubscribers().remove(topic.getTopic());
if (oldOffset != null) {
TopicSubscriber consumerOffset = oldOffset.getTopic().getConsumeOffsets().remove(this);
+ consumerOffset.disable();
LOGGER.info("remove topic:{} {},", topic, oldOffset == consumerOffset ? "success" : "fail");
}
});
@@ -244,6 +245,7 @@ public class MqttSession extends AbstractSession {
TopicSubscriber removeSubscriber = subscriber.getTopic().getConsumeOffsets().remove(this);
retainOffsetCache.put(subscriber.getTopic(), subscriber.getRetainConsumerOffset());
if (subscriber == removeSubscriber) {
+ removeSubscriber.disable();
LOGGER.debug("remove subscriber:{} success!", subscriber.getTopic().getTopic());
} else {
LOGGER.error("remove subscriber:{} error!", removeSubscriber);
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
index 2e2ce334a96e9801ab616021d4fd268f1ffe46ce..8bf03ebe47b7eedfe14c75d4a953501bd3fd667c 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
@@ -5,12 +5,12 @@ import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.provider.PersistenceProvider;
import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
import org.smartboot.mqtt.common.InflightQueue;
-import org.smartboot.mqtt.common.MqttMessageBuilders;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
+import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import java.util.concurrent.Semaphore;
@@ -50,6 +50,7 @@ public class TopicSubscriber {
private TopicToken topicFilterToken;
private final Semaphore semaphore = new Semaphore(0);
+ private boolean enable = true;
public TopicSubscriber(BrokerTopic topic, MqttSession session, MqttQoS mqttQoS, long nextConsumerOffset, long retainConsumerOffset) {
this.topic = topic;
@@ -60,7 +61,7 @@ public class TopicSubscriber {
}
public void batchPublish(BrokerContext brokerContext) {
- if (mqttSession.isDisconnect()) {
+ if (mqttSession.isDisconnect() || !enable) {
return;
}
semaphore.release();
@@ -88,7 +89,8 @@ public class TopicSubscriber {
}
InflightQueue inflightQueue = mqttSession.getInflightQueue();
- boolean suc = inflightQueue.offer(publishBuilder, offset -> {
+ long offset = persistenceMessage.getOffset();
+ boolean suc = inflightQueue.offer(publishBuilder, (mqtt) -> {
if (mqttQoS == MqttQoS.AT_MOST_ONCE) {
nextConsumerOffset = persistenceMessage.getOffset() + 1;
}
@@ -98,10 +100,8 @@ public class TopicSubscriber {
setRetainConsumerOffset(getRetainConsumerOffset() + 1);
}
commitRetainConsumerTimestamp(persistenceMessage.getCreateTime());
-// if (inflightQueue.getCount() == 0) {
publish0(brokerContext, 0);
-// }
- }, persistenceMessage.getOffset());
+ });
// 飞行队列已满
if (!suc) {
// LOGGER.info("queue is full..." + expectConsumerOffset);
@@ -165,4 +165,8 @@ public class TopicSubscriber {
public void setTopicFilterToken(TopicToken topicFilterToken) {
this.topicFilterToken = topicFilterToken;
}
+
+ public void disable() {
+ this.enable = false;
+ }
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
index 6f57c96e424411e70bb35daf5a5beef3918a202d..4d1ceb57d74827a58eb01e3ad2454c17514681d9 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
@@ -10,7 +10,6 @@ import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.provider.SessionStateProvider;
import org.smartboot.mqtt.broker.provider.impl.session.SessionState;
import org.smartboot.mqtt.common.InflightQueue;
-import org.smartboot.mqtt.common.MqttMessageBuilders;
import org.smartboot.mqtt.common.enums.MqttConnectReturnCode;
import org.smartboot.mqtt.common.enums.MqttProtocolEnum;
import org.smartboot.mqtt.common.enums.MqttQoS;
@@ -25,6 +24,7 @@ import org.smartboot.mqtt.common.message.variable.MqttConnAckVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ConnectAckProperties;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
+import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
index d997d6ab8fe746df9e532a3e5bb42c85967c5f03..2d27cd859fc956e2532ea68b8ed018c91375de3c 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
@@ -13,9 +13,11 @@ import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttPubAckMessage;
import org.smartboot.mqtt.common.message.MqttPubCompMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
+import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
+import org.smartboot.mqtt.common.util.ValidateUtils;
/**
* 发布Topic
@@ -101,6 +103,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor {
+ ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message");
//发送pubRel消息。
//todo
MqttPubQosVariableHeader qosVariableHeader;
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java
index a9e6904cf63343c6fa9008077dd766c370c46943..a382b5f6f0d4f94a122f22b3af2c35136d644592 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/impl/session/SessionState.java
@@ -1,6 +1,6 @@
package org.smartboot.mqtt.broker.provider.impl.session;
-import org.smartboot.mqtt.common.AckMessage;
+import org.smartboot.mqtt.common.InflightMessage;
import org.smartboot.mqtt.common.enums.MqttQoS;
import java.util.HashMap;
@@ -11,11 +11,11 @@ import java.util.Map;
* @version V1.0 , 2022/4/15
*/
public class SessionState {
- protected final Map responseConsumers = new HashMap<>();
+ protected final Map responseConsumers = new HashMap<>();
private final Map subscribers = new HashMap<>();
- public Map getResponseConsumers() {
+ public Map getResponseConsumers() {
return responseConsumers;
}
diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml
index 4b2a1983dd6271c2539fd921c4f9c0ef48354f77..3715956da47803f2fe8179334b7749c2b22053d3 100644
--- a/smart-mqtt-client/pom.xml
+++ b/smart-mqtt-client/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.16
+ 0.17
../pom.xml
4.0.0
@@ -16,6 +16,12 @@
org.smartboot.mqtt
smart-mqtt-common
+
+ org.slf4j
+ slf4j-simple
+ 2.0.0
+ test
+
junit
junit
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
index e42ba2a2d25309f4f6df2a79f66da0c78dc07138..e13c6c1b60f78544fbf664409b639b6219da0a6e 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
@@ -6,18 +6,18 @@ import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.AbstractSession;
import org.smartboot.mqtt.common.DefaultMqttWriter;
import org.smartboot.mqtt.common.InflightQueue;
-import org.smartboot.mqtt.common.MqttMessageBuilders;
+import org.smartboot.mqtt.common.QosRetryPlugin;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttConnectReturnCode;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBusImpl;
import org.smartboot.mqtt.common.eventbus.EventType;
+import org.smartboot.mqtt.common.exception.MqttException;
import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
import org.smartboot.mqtt.common.message.MqttDisconnectMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPingReqMessage;
import org.smartboot.mqtt.common.message.MqttPingRespMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
@@ -25,16 +25,15 @@ import org.smartboot.mqtt.common.message.MqttSubAckMessage;
import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
import org.smartboot.mqtt.common.message.MqttTopicSubscription;
import org.smartboot.mqtt.common.message.MqttUnsubAckMessage;
-import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
import org.smartboot.mqtt.common.message.payload.MqttConnectPayload;
import org.smartboot.mqtt.common.message.payload.WillMessage;
import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
-import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import org.smartboot.mqtt.common.message.variable.properties.SubscribeProperties;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
+import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
@@ -206,6 +205,7 @@ public class MqttClient extends AbstractSession {
}, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS);
}
// messageProcessor.addPlugin(new StreamMonitorPlugin<>());
+ messageProcessor.addPlugin(new QosRetryPlugin());
client = new AioQuickClient(clientConfigure.getHost(), clientConfigure.getPort(), new MqttProtocol(clientConfigure.getMaxPacketSize()), messageProcessor);
try {
if (bufferPagePool != null) {
@@ -260,7 +260,7 @@ public class MqttClient extends AbstractSession {
return this;
}
- public void unsubscribe0(String[] topics) {
+ private void unsubscribe0(String[] topics) {
Set unsubscribedTopics = new HashSet<>(topics.length);
for (String unsubscribedTopic : topics) {
if (subscribes.containsKey(unsubscribedTopic)) {
@@ -273,27 +273,26 @@ public class MqttClient extends AbstractSession {
return;
}
- MqttMessageBuilders.UnsubscribeBuilder unsubscribeBuilder = MqttMessageBuilders.unsubscribe().packetId(newPacketId());
+ MqttMessageBuilders.UnsubscribeBuilder unsubscribeBuilder = MqttMessageBuilders.unsubscribe();
unsubscribedTopics.forEach(unsubscribeBuilder::addTopicFilter);
//todo
- ReasonProperties properties = null;
if (getMqttVersion() == MqttVersion.MQTT_5) {
- properties = new ReasonProperties();
+ ReasonProperties properties = new ReasonProperties();
+ unsubscribeBuilder.properties(properties);
}
- MqttUnsubscribeMessage unsubscribedMessage = unsubscribeBuilder.build(properties);
// wait ack message.
- responseConsumers.put(unsubscribedMessage.getVariableHeader().getPacketId(), new Consumer>() {
- @Override
- public void accept(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
+ try {
+ getInflightQueue().put(unsubscribeBuilder, (message) -> {
ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type.");
for (String unsubscribedTopic : unsubscribedTopics) {
subscribes.remove(unsubscribedTopic);
wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter()));
}
- }
- });
- write(unsubscribedMessage);
+ });
+ } catch (InterruptedException e) {
+ throw new MqttException("unsubscribe topic exception", e);
+ }
}
public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer consumer) {
@@ -305,7 +304,7 @@ public class MqttClient extends AbstractSession {
}
public MqttClient subscribe(String[] topics, MqttQoS[] qos, BiConsumer consumer) {
- subscribe0(topics, qos, consumer, (mqttClient, mqttQoS) -> {
+ subscribe(topics, qos, consumer, (mqttClient, mqttQoS) -> {
});
return this;
}
@@ -320,7 +319,7 @@ public class MqttClient extends AbstractSession {
}
private void subscribe0(String[] topic, MqttQoS[] qos, BiConsumer consumer, BiConsumer subAckConsumer) {
- MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe().packetId(newPacketId());
+ MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe();
for (int i = 0; i < topic.length; i++) {
subscribeBuilder.addSubscription(qos[i], topic[i]);
}
@@ -329,13 +328,10 @@ public class MqttClient extends AbstractSession {
subscribeBuilder.subscribeProperties(new SubscribeProperties());
}
MqttSubscribeMessage subscribeMessage = subscribeBuilder.build();
-
- responseConsumers.put(subscribeMessage.getVariableHeader().getPacketId(), new Consumer>() {
- @Override
- public void accept(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
+ try {
+ getInflightQueue().put(subscribeBuilder, (message) -> {
List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels();
ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response");
- ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response");
int i = 0;
for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) {
MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++)));
@@ -352,9 +348,10 @@ public class MqttClient extends AbstractSession {
}
subAckConsumer.accept(MqttClient.this, minQos);
}
- }
- });
- write(subscribeMessage);
+ });
+ } catch (InterruptedException e) {
+ throw new MqttException("subscribe topic exception", e);
+ }
}
public void notifyResponse(MqttConnAckMessage connAckMessage) {
@@ -394,26 +391,18 @@ public class MqttClient extends AbstractSession {
private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) {
InflightQueue inflightQueue = getInflightQueue();
- boolean suc = inflightQueue.offer(publishBuilder, offset -> {
- consumer.accept(publishBuilder.getPacketId());
- //最早发送的消息若收到响应,则更新点位
- if (offset != -1) {
+ try {
+ inflightQueue.put(publishBuilder, (message) -> {
+ consumer.accept(message.getVariableHeader().getPacketId());
+ //最早发送的消息若收到响应,则更新点位
synchronized (MqttClient.this) {
MqttClient.this.notifyAll();
}
- }
- }, 1);
- flush();
- if (!suc) {
- try {
- synchronized (this) {
- wait();
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- publish(publishBuilder, consumer);
+ });
+ } catch (InterruptedException e) {
+ throw new MqttException("publish message exception", e);
}
+ flush();
}
public MqttClientConfigure getClientConfigure() {
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
index dd45017987b38a5549f01d4f0a36b4335a104c8f..cf3b1e87e8c14f0c920a2a534bda3d1d5fcb027c 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
@@ -10,11 +10,13 @@ import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttPubAckMessage;
import org.smartboot.mqtt.common.message.MqttPubCompMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
+import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import org.smartboot.mqtt.common.util.TopicTokenUtil;
+import org.smartboot.mqtt.common.util.ValidateUtils;
/**
* 发布Topic
@@ -93,6 +95,7 @@ public class PublishProcessor implements MqttProcessor {
MqttPubRecMessage pubRecMessage = new MqttPubRecMessage(variableHeader);
session.write(pubRecMessage, message -> {
+ ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message");
//todo
ReasonProperties reasonProperties = null;
if (mqttPublishMessage.getVersion() == MqttVersion.MQTT_5) {
diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml
index 6203e97b48a5242c242d94538b43ea303c464c88..6c75836c2044ee7e10cd7aebbc04999988185ae5 100644
--- a/smart-mqtt-common/pom.xml
+++ b/smart-mqtt-common/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.16
+ 0.17
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
index 5ab9f5294b542b024eaabc17824c1376fe3ea890..e27c960bb472cca716f5f40038cb483708e4a592 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
@@ -1,7 +1,5 @@
package org.smartboot.mqtt.common;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
@@ -9,18 +7,21 @@ import org.smartboot.mqtt.common.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
-import org.smartboot.mqtt.common.message.MqttPubQosMessage;
+import org.smartboot.mqtt.common.message.MqttPubRecMessage;
+import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
+import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.transport.AioSession;
import org.smartboot.socket.util.Attachment;
+import org.smartboot.socket.util.QuickTimerTask;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
@@ -28,12 +29,6 @@ import java.util.function.Consumer;
* @version V1.0 , 2022/4/12
*/
public abstract class AbstractSession {
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class);
- private static final int QOS0_PACKET_ID = 0;
- /**
- * 用于生成当前会话的报文标识符
- */
- private final AtomicInteger packetIdCreator = new AtomicInteger(1);
private final EventBus eventBus;
protected String clientId;
protected AioSession session;
@@ -55,22 +50,41 @@ public abstract class AbstractSession {
private MqttVersion mqttVersion;
private InflightQueue inflightQueue;
- protected Map>> responseConsumers = new ConcurrentHashMap<>();
+ private final Map ackMessageCacheMap = new ConcurrentHashMap<>();
public AbstractSession(EventBus eventBus) {
this.eventBus = eventBus;
}
public final void write(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> mqttMessage, Consumer> consumer) {
- responseConsumers.put(mqttMessage.getVariableHeader().getPacketId(), consumer);
+ QosMessage ackMessage = new QosMessage(mqttMessage, consumer);
+ switch (mqttMessage.getFixedHeader().getQosLevel()) {
+ case AT_MOST_ONCE:
+ ValidateUtils.isTrue(mqttMessage instanceof MqttPubRecMessage, "invalid message instance");
+ //超时移除即可,
+ break;
+ case AT_LEAST_ONCE:
+ ValidateUtils.isTrue(mqttMessage instanceof MqttSubscribeMessage || mqttMessage instanceof MqttUnsubscribeMessage, "invalid message instance");
+ //重新发送subscribe或unSubscribe消息
+ QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(() -> {
+ if (!ackMessage.isCommit()) {
+ write(mqttMessage, consumer);
+ }
+ }, 1, TimeUnit.SECONDS);
+ default:
+ throw new UnsupportedOperationException();
+ }
+ ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), ackMessage);
write(mqttMessage, false);
}
public final void notifyResponse(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
- if (message instanceof MqttPubQosMessage && message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) {
- inflightQueue.notify((MqttPubQosMessage) message);
+ if (message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) {
+ inflightQueue.notify(message);
} else {
- responseConsumers.remove(message.getVariableHeader().getPacketId()).accept(message);
+ QosMessage qosMessage = ackMessageCacheMap.remove(message.getVariableHeader().getPacketId());
+ qosMessage.setCommit(true);
+ qosMessage.getConsumer().accept(message);
}
}
@@ -119,18 +133,6 @@ public abstract class AbstractSession {
return clientId;
}
- /**
- * 生成大于0的 packetId
- */
- public int newPacketId() {
- int packageId = packetIdCreator.getAndIncrement();
- if (packageId <= 0) {
- packetIdCreator.set(0);
- return newPacketId();
- }
- return packageId;
- }
-
public InetSocketAddress getRemoteAddress() throws IOException {
return session.getRemoteAddress();
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java
deleted file mode 100644
index 9d480c6d9d37d930e8e6233c7e3f371d1ca80f3f..0000000000000000000000000000000000000000
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package org.smartboot.mqtt.common;
-
-import org.smartboot.mqtt.common.enums.MqttMessageType;
-import org.smartboot.mqtt.common.enums.MqttQoS;
-import org.smartboot.mqtt.common.message.MqttPublishMessage;
-
-import java.util.function.Consumer;
-
-/**
- * @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2022/4/14
- */
-public class AckMessage {
- /**
- * 原始消息
- */
- private final MqttPublishMessage originalMessage;
-
- private MqttMessageType expectMessageType;
- /**
- * 回调事件
- */
- private final Consumer consumer;
-
- private final long offset;
-
- private boolean commit;
-
- private final int packetId;
-
- public AckMessage(MqttPublishMessage originalMessage, int packetId, Consumer consumer, long offset) {
- this.originalMessage = originalMessage;
- this.consumer = consumer;
- this.offset = offset;
- this.packetId = packetId;
- if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_LEAST_ONCE) {
- this.expectMessageType = MqttMessageType.PUBACK;
- } else if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.EXACTLY_ONCE) {
- this.expectMessageType = MqttMessageType.PUBREC;
- }
- }
-
- public MqttPublishMessage getOriginalMessage() {
- return originalMessage;
- }
-
-
- public Consumer getConsumer() {
- return consumer;
- }
-
- public MqttMessageType getExpectMessageType() {
- return expectMessageType;
- }
-
- public void setExpectMessageType(MqttMessageType expectMessageType) {
- this.expectMessageType = expectMessageType;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public boolean isCommit() {
- return commit;
- }
-
- public void setCommit(boolean commit) {
- this.commit = commit;
- }
-
- public int getPacketId() {
- return packetId;
- }
-}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..2431fb30dc1b1b0c0dda2103edd4e54e705a5925
--- /dev/null
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java
@@ -0,0 +1,113 @@
+package org.smartboot.mqtt.common;
+
+import org.smartboot.mqtt.common.enums.MqttMessageType;
+import org.smartboot.mqtt.common.enums.MqttQoS;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
+import org.smartboot.mqtt.common.message.MqttPublishMessage;
+import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
+import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
+
+import java.util.function.Consumer;
+
+/**
+ * @author 三刀(zhengjunweimail@163.com)
+ * @version V1.0 , 2022/4/14
+ */
+public class InflightMessage {
+ /**
+ * 原始消息
+ */
+ private final MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> originalMessage;
+ private MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> responseMessage;
+
+ /**
+ * 飞行队列为其分配的packetId
+ */
+ private final int assignedPacketId;
+
+ private MqttMessageType expectMessageType;
+
+ private boolean commit;
+
+ private final Consumer> consumer;
+
+ private int retryCount;
+
+ private long latestTime;
+
+ public InflightMessage(int packetId, MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> originalMessage, Consumer> consumer) {
+ this.assignedPacketId = packetId;
+ this.originalMessage = originalMessage;
+ this.consumer = consumer;
+ if (originalMessage instanceof MqttSubscribeMessage) {
+ this.expectMessageType = MqttMessageType.SUBACK;
+ } else if (originalMessage instanceof MqttUnsubscribeMessage) {
+ this.expectMessageType = MqttMessageType.UNSUBACK;
+ } else if (originalMessage instanceof MqttPublishMessage) {
+ if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_LEAST_ONCE) {
+ this.expectMessageType = MqttMessageType.PUBACK;
+ } else if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.EXACTLY_ONCE) {
+ this.expectMessageType = MqttMessageType.PUBREC;
+ }
+ } else {
+ throw new UnsupportedOperationException();
+ }
+
+ this.latestTime = System.currentTimeMillis();
+ }
+
+ public MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> getOriginalMessage() {
+ return originalMessage;
+ }
+
+
+ public MqttMessageType getExpectMessageType() {
+ return expectMessageType;
+ }
+
+ public void setExpectMessageType(MqttMessageType expectMessageType) {
+ this.expectMessageType = expectMessageType;
+ }
+
+ public boolean isCommit() {
+ return commit;
+ }
+
+ public void setCommit(boolean commit) {
+ this.commit = commit;
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public long getLatestTime() {
+ return latestTime;
+ }
+
+ public void setLatestTime(long latestTime) {
+ this.latestTime = latestTime;
+ }
+
+ public final Consumer> getConsumer() {
+ return consumer;
+ }
+
+ public int getAssignedPacketId() {
+ return assignedPacketId;
+ }
+
+ public MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> getResponseMessage() {
+ return responseMessage;
+ }
+
+ public void setResponseMessage(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> responseMessage) {
+ this.responseMessage = responseMessage;
+ }
+
+}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
index fa38e953040ce61fcfc95f6ee6ab083e8debd241..7a6f88098d36b4ad5dfa8bb16e2713cdb03cf4e9 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
@@ -5,13 +5,19 @@ import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
-import org.smartboot.mqtt.common.message.MqttPubQosMessage;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPubRelMessage;
-import org.smartboot.mqtt.common.message.MqttPublishMessage;
+import org.smartboot.mqtt.common.message.MqttVariableMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
+import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.ValidateUtils;
+import org.smartboot.socket.util.AttachKey;
+import org.smartboot.socket.util.Attachment;
+import org.smartboot.socket.util.QuickTimerTask;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -21,7 +27,9 @@ import java.util.function.Consumer;
*/
public class InflightQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class);
- private final AckMessage[] queue;
+ static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf("retryTask");
+ private static final int TIMEOUT = 3;
+ private final InflightMessage[] queue;
private int takeIndex;
private int putIndex;
private int count;
@@ -29,57 +37,129 @@ public class InflightQueue {
private final AtomicInteger packetId = new AtomicInteger(0);
private final AbstractSession session;
+ private boolean lock;
public InflightQueue(AbstractSession session, int size) {
ValidateUtils.isTrue(size > 0, "inflight must >0");
- this.queue = new AckMessage[size];
+ this.queue = new InflightMessage[size];
this.session = session;
}
- public boolean offer(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer, long offset) {
- int id = 0;
- MqttPublishMessage mqttMessage;
+ public synchronized void put(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) throws InterruptedException {
+ while (count == queue.length) {
+ lock = true;
+ session.flush();
+ wait();
+ }
+ offer(publishBuilder, consumer);
+ }
+
+ public boolean offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) {
+ InflightMessage inflightMessage;
synchronized (this) {
if (count == queue.length) {
return false;
}
- id = packetId.incrementAndGet();
+ int id = packetId.incrementAndGet();
// 16位无符号最大值65535
if (id > 65535) {
id = id % queue.length + queue.length;
packetId.set(id);
}
- publishBuilder.packetId(id);
- mqttMessage = publishBuilder.build();
- queue[putIndex++] = new AckMessage(mqttMessage, id, consumer, offset);
+ MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build();
+ inflightMessage = new InflightMessage(id, mqttMessage, consumer);
+ queue[putIndex++] = inflightMessage;
if (putIndex == queue.length) {
putIndex = 0;
}
count++;
+
+ //启动消息质量监测
+ if (count == 1 && mqttMessage.getFixedHeader().getQosLevel().value() > 0) {
+ retry(inflightMessage);
+ }
// System.out.println("publish...");
}
- session.write(mqttMessage, false);
- // QOS直接响应
- if (mqttMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) {
- long offset1 = commit(id);
-// ValidateUtils.isTrue(offset1 == -1 || offset1 == offset, "invalid offset");
- consumer.accept(offset);
+ session.write(inflightMessage.getOriginalMessage(), false);
+ // QOS0直接响应
+ if (inflightMessage.getOriginalMessage().getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) {
+ inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage());
+ commit(inflightMessage);
}
return true;
}
- public void notify(MqttPubQosMessage message) {
- AckMessage ackMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length];
- ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == ackMessage.getExpectMessageType(), "invalid message type");
+ /**
+ * 超时重发
+ */
+ void retry(InflightMessage inflightMessage) {
+ if (inflightMessage.isCommit() || session.isDisconnect()) {
+ return;
+ }
+ QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (inflightMessage.isCommit()) {
+// System.out.println("message has commit,ignore retry monitor");
+ return;
+ }
+ if (session.isDisconnect()) {
+ LOGGER.debug("session is disconnect , pause qos monitor.");
+ return;
+ }
+ long delay = System.currentTimeMillis() - inflightMessage.getLatestTime();
+ if (delay > 0) {
+ LOGGER.info("the time is not up, try again in {} milliseconds ", delay);
+ QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, delay, TimeUnit.MILLISECONDS);
+ return;
+ }
+ inflightMessage.setLatestTime(System.currentTimeMillis());
+ LOGGER.info("time out,retry...");
+ switch (inflightMessage.getExpectMessageType()) {
+ case PUBACK:
+ case PUBREC:
+ session.write(inflightMessage.getOriginalMessage());
+ break;
+ case PUBCOMP:
+ ReasonProperties properties = null;
+ if (inflightMessage.getOriginalMessage().getVersion() == MqttVersion.MQTT_5) {
+ properties = new ReasonProperties();
+ }
+ MqttVariableMessage extends MqttPacketIdVariableHeader> message = inflightMessage.getOriginalMessage();
+ MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
+ MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader);
+ session.write(pubRelMessage);
+ break;
+ default:
+ throw new UnsupportedOperationException("invalid message type: " + inflightMessage.getExpectMessageType());
+ }
+ inflightMessage.setRetryCount(inflightMessage.getRetryCount() + 1);
+ //不断重试直至完成
+ QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, TIMEOUT, TimeUnit.SECONDS);
+ }
+ }, TimeUnit.SECONDS.toMillis(TIMEOUT) - (System.currentTimeMillis() - inflightMessage.getLatestTime()), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * 理论上该方法只会被读回调线程触发
+ */
+ public void notify(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
+ InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length];
+ ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type");
+ ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId");
+ inflightMessage.setResponseMessage(message);
+ inflightMessage.setLatestTime(System.currentTimeMillis());
switch (message.getFixedHeader().getMessageType()) {
- case PUBACK: {
- long offset = commit(message.getVariableHeader().getPacketId());
- ackMessage.getConsumer().accept(offset);
+ case SUBACK:
+ case UNSUBACK:
+ case PUBACK:
+ case PUBCOMP: {
+ commit(inflightMessage);
break;
}
case PUBREC:
- ackMessage.setExpectMessageType(MqttMessageType.PUBCOMP);
+ inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP);
//todo
ReasonProperties properties = null;
if (message.getVersion() == MqttVersion.MQTT_5) {
@@ -89,38 +169,44 @@ public class InflightQueue {
MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader);
session.write(pubRelMessage, false);
break;
- case PUBCOMP:
- long offset = commit(message.getVariableHeader().getPacketId());
- ackMessage.getConsumer().accept(offset);
- break;
default:
throw new RuntimeException();
}
}
- private synchronized long commit(int packetId) {
- int commitIndex = (packetId - 1) % queue.length;
- AckMessage ackMessage = queue[commitIndex];
- ValidateUtils.isTrue(ackMessage.getPacketId() == packetId, "invalid message");
- ackMessage.setCommit(true);
+ private synchronized void commit(InflightMessage inflightMessage) {
+ MqttVariableMessage extends MqttPacketIdVariableHeader> originalMessage = inflightMessage.getOriginalMessage();
+ ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message");
+ inflightMessage.setCommit(true);
- if (commitIndex != takeIndex) {
- return -1;
+ if ((inflightMessage.getAssignedPacketId() - 1) % queue.length != takeIndex) {
+ return;
}
queue[takeIndex++] = null;
count--;
if (takeIndex == queue.length) {
takeIndex = 0;
}
+ inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage());
while (count > 0 && queue[takeIndex].isCommit()) {
- ackMessage = queue[takeIndex];
+ inflightMessage = queue[takeIndex];
+ inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage());
queue[takeIndex++] = null;
if (takeIndex == queue.length) {
takeIndex = 0;
}
count--;
}
- return ackMessage.getOffset();
+ if (lock) {
+ lock = false;
+ notifyAll();
+ }
+ if (count > 0) {
+ //注册超时监听任务
+ Attachment attachment = session.session.getAttachment();
+ InflightMessage monitorMessage = queue[takeIndex];
+ attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage));
+ }
}
public int getCount() {
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..aaea1551b44c46ad61d3d8329c987c4df5ad7393
--- /dev/null
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java
@@ -0,0 +1,37 @@
+package org.smartboot.mqtt.common;
+
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
+
+import java.util.function.Consumer;
+
+/**
+ * @author 三刀(zhengjunweimail@163.com)
+ * @version V1.0 , 2023/3/27
+ */
+public class QosMessage {
+ private final MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message;
+ private final Consumer> consumer;
+ private boolean commit;
+
+ public QosMessage(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message, Consumer> consumer) {
+ this.message = message;
+ this.consumer = consumer;
+ }
+
+ public MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> getMessage() {
+ return message;
+ }
+
+ public Consumer> getConsumer() {
+ return consumer;
+ }
+
+ public boolean isCommit() {
+ return commit;
+ }
+
+ public void setCommit(boolean commit) {
+ this.commit = commit;
+ }
+}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosRetryPlugin.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosRetryPlugin.java
new file mode 100644
index 0000000000000000000000000000000000000000..445451bb479e22bd0080c7342521d03dde76d06d
--- /dev/null
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosRetryPlugin.java
@@ -0,0 +1,29 @@
+package org.smartboot.mqtt.common;
+
+import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.socket.extension.plugins.AbstractPlugin;
+import org.smartboot.socket.transport.AioSession;
+import org.smartboot.socket.util.Attachment;
+
+/**
+ * @author 三刀(zhengjunweimail@163.com)
+ * @version V1.0 , 2023/3/24
+ */
+public class QosRetryPlugin extends AbstractPlugin {
+
+ @Override
+ public void beforeRead(AioSession session) {
+ Attachment attachment = session.getAttachment();
+ if (attachment == null) {
+ return;
+ }
+ Runnable runnable = attachment.get(InflightQueue.RETRY_TASK_ATTACH_KEY);
+ if (runnable != null) {
+ try {
+ runnable.run();
+ } finally {
+ attachment.remove(InflightQueue.RETRY_TASK_ATTACH_KEY);
+ }
+ }
+ }
+}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
index 2172884513861d052cde4604a4716dc5279f241f..4d6f8293c0555d44408d3f21bba3c402fc660f62 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
@@ -13,7 +13,7 @@ import java.nio.ByteBuffer;
* @author 三刀
* @version V1.0 , 2018/4/22
*/
-public class MqttPublishMessage extends MqttVariableMessage {
+public class MqttPublishMessage extends MqttPacketIdentifierMessage {
private static final MqttPublishPayload EMPTY_BYTES = new MqttPublishPayload(new byte[0]);
private MqttPublishPayload payload;
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
similarity index 85%
rename from smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java
rename to smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
index f70d9843199c96a15498a4b746262aacbfee3e38..14a055f91d0ef1e74ba012e24175ab9c72545965 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
@@ -1,14 +1,16 @@
-package org.smartboot.mqtt.common;
+package org.smartboot.mqtt.common.util;
import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.message.MqttFixedHeader;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
import org.smartboot.mqtt.common.message.MqttTopicSubscription;
import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
import org.smartboot.mqtt.common.message.payload.MqttSubscribePayload;
import org.smartboot.mqtt.common.message.payload.MqttUnsubscribePayload;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttReasonVariableHeader;
@@ -22,6 +24,12 @@ import java.util.List;
public final class MqttMessageBuilders {
+ public interface MessageBuilder> {
+ MessageBuilder packetId(int packetId);
+
+ T build();
+ }
+
private MqttMessageBuilders() {
}
@@ -38,7 +46,7 @@ public final class MqttMessageBuilders {
return new UnsubscribeBuilder();
}
- public static final class PublishBuilder {
+ public static final class PublishBuilder implements MessageBuilder {
private String topic;
private boolean retained;
private MqttQoS qos;
@@ -69,8 +77,9 @@ public final class MqttMessageBuilders {
return this;
}
- public void packetId(int packetId) {
+ public PublishBuilder packetId(int packetId) {
this.packetId = packetId;
+ return this;
}
public PublishBuilder publishProperties(PublishProperties publishProperties) {
@@ -92,7 +101,7 @@ public final class MqttMessageBuilders {
}
}
- public static final class SubscribeBuilder {
+ public static final class SubscribeBuilder implements MessageBuilder {
private List subscriptions;
private int packetId;
@@ -131,10 +140,11 @@ public final class MqttMessageBuilders {
}
}
- public static final class UnsubscribeBuilder {
+ public static final class UnsubscribeBuilder implements MessageBuilder {
private List topicFilters;
private int packetId;
+ private ReasonProperties properties;
UnsubscribeBuilder() {
}
@@ -152,7 +162,11 @@ public final class MqttMessageBuilders {
return this;
}
- public MqttUnsubscribeMessage build(ReasonProperties properties) {
+ public void properties(ReasonProperties properties) {
+ this.properties = properties;
+ }
+
+ public MqttUnsubscribeMessage build() {
MqttUnsubscribePayload mqttSubscribePayload = new MqttUnsubscribePayload(topicFilters);
MqttReasonVariableHeader variableHeader = new MqttPubQosVariableHeader(packetId, properties);
return new MqttUnsubscribeMessage(MqttFixedHeader.UNSUBSCRIBE_HEADER, variableHeader, mqttSubscribePayload);