From b12ad86a03ff1c0ed62b18aa3bcbfa48f56c1539 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Thu, 30 Mar 2023 22:44:25 +0800 Subject: [PATCH 01/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/client/MqttClient.java | 18 +++++++----------- .../smartboot/mqtt/common/InflightQueue.java | 7 ++----- .../mqtt/common/message/MqttFixedHeader.java | 3 ++- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index eb8311cc..3892f5f8 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -237,8 +237,8 @@ public class MqttClient extends AbstractSession { } private void consumeTask() { - Runnable runnable; - while ((runnable = registeredTasks.poll()) != null) { + Runnable runnable = registeredTasks.poll(); + if (runnable != null) { runnable.run(); } } @@ -293,12 +293,10 @@ public class MqttClient extends AbstractSession { } consumeTask(); }); - if (suc) { - flush(); - } else { + flush(); + if (!suc) { registeredTasks.offer(() -> unsubscribe0(topics)); } - } public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer consumer) { @@ -352,12 +350,11 @@ public class MqttClient extends AbstractSession { LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); } subAckConsumer.accept(MqttClient.this, minQos); - consumeTask(); } + consumeTask(); }); - if (suc) { - flush(); - } else { + flush(); + if (!suc) { registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); } } @@ -405,7 +402,6 @@ public class MqttClient extends AbstractSession { synchronized (MqttClient.this) { MqttClient.this.notifyAll(); } - consumeTask(); }); if (suc) { flush(); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 6f71f75d..86c59816 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -71,7 +71,7 @@ public class InflightQueue { // System.out.println("publish..."); } - session.write(inflightMessage.getOriginalMessage(), false); + session.write(inflightMessage.getOriginalMessage(), count == queue.length); // QOS0直接响应 if (inflightMessage.getOriginalMessage().getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); @@ -105,7 +105,7 @@ public class InflightQueue { return; } inflightMessage.setLatestTime(System.currentTimeMillis()); - LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage()); + LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage().getFixedHeader()); switch (inflightMessage.getExpectMessageType()) { case PUBACK: case PUBREC: @@ -195,7 +195,4 @@ public class InflightQueue { } } - public int getCount() { - return count; - } } \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java index 4e152214..18f11280 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java @@ -1,5 +1,6 @@ package org.smartboot.mqtt.common.message; +import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -19,7 +20,7 @@ import org.smartboot.mqtt.common.enums.MqttQoS; * @author 三刀 * @version V1.0 , 2018/4/22 */ -public class MqttFixedHeader { +public class MqttFixedHeader extends ToString { public static final MqttFixedHeader CONNECT_HEADER = new MqttFixedHeader(MqttMessageType.CONNECT, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader CONN_ACK_HEADER = new MqttFixedHeader(MqttMessageType.CONNACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.PUBACK, MqttQoS.AT_MOST_ONCE); -- Gitee From 4d16013fd6401e4bd6f4dd7805278ed3866c50c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 31 Mar 2023 22:21:17 +0800 Subject: [PATCH 02/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/processor/ConnectProcessor.java | 2 +- .../org/smartboot/mqtt/client/MqttClient.java | 2 +- .../smartboot/mqtt/common/AbstractSession.java | 11 +++++++++-- .../org/smartboot/mqtt/common/InflightQueue.java | 16 +++++++++++++--- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 4d1ceb57..43b3ed03 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -80,7 +80,7 @@ public class ConnectProcessor implements MqttProcessor { } else { receiveMaximum = context.getBrokerConfigure().getMaxInflight(); } - session.setInflightQueue(new InflightQueue(session, receiveMaximum)); + session.setInflightQueue(new InflightQueue(session, receiveMaximum,true)); //如果服务端收到清理会话(CleanSession)标志为 1 的连接,除了将 CONNACK 报文中的返回码设置为 0 之外, // 还必须将 CONNACK 报文中的当前会话设置(Session Present)标志为 0。 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index 3892f5f8..61d25cfd 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -153,7 +153,7 @@ public class MqttClient extends AbstractSession { //连接成功,注册订阅消息 if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - setInflightQueue(new InflightQueue(this, 16)); + setInflightQueue(new InflightQueue(this, 16, false)); connected = true; consumeTask(); //重连情况下重新触发订阅逻辑 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index e27c960b..90891424 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -1,5 +1,7 @@ package org.smartboot.mqtt.common; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; @@ -29,6 +31,7 @@ import java.util.function.Consumer; * @version V1.0 , 2022/4/12 */ public abstract class AbstractSession { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class); private final EventBus eventBus; protected String clientId; protected AioSession session; @@ -83,8 +86,12 @@ public abstract class AbstractSession { inflightQueue.notify(message); } else { QosMessage qosMessage = ackMessageCacheMap.remove(message.getVariableHeader().getPacketId()); - qosMessage.setCommit(true); - qosMessage.getConsumer().accept(message); + if (qosMessage != null) { + qosMessage.setCommit(true); + qosMessage.getConsumer().accept(message); + } else { + LOGGER.info("message is null"); + } } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 86c59816..3a637487 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -38,10 +38,13 @@ public class InflightQueue { private final AbstractSession session; - public InflightQueue(AbstractSession session, int size) { + private final boolean skipCommit; + + public InflightQueue(AbstractSession session, int size, boolean skipCommit) { ValidateUtils.isTrue(size > 0, "inflight must >0"); this.queue = new InflightMessage[size]; this.session = session; + this.skipCommit = skipCommit; } public boolean offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { @@ -177,16 +180,23 @@ public class InflightQueue { if (takeIndex == queue.length) { takeIndex = 0; } - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + if (!skipCommit) { + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + } while (count > 0 && queue[takeIndex].isCommit()) { inflightMessage = queue[takeIndex]; - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + if (!skipCommit) { + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + } queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; } count--; } + if (skipCommit) { + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + } if (count > 0) { //注册超时监听任务 Attachment attachment = session.session.getAttachment(); -- Gitee From 2e4f237aaed25163d79a2f4b516ff777b852737e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 31 Mar 2023 22:57:30 +0800 Subject: [PATCH 03/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/smartboot/mqtt/common/InflightQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 3a637487..170ec43f 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -74,7 +74,7 @@ public class InflightQueue { // System.out.println("publish..."); } - session.write(inflightMessage.getOriginalMessage(), count == queue.length); + session.write(inflightMessage.getOriginalMessage(), false); // QOS0直接响应 if (inflightMessage.getOriginalMessage().getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); -- Gitee From dad30bc286902118f1c734a857491858234dbe2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 1 Apr 2023 13:02:13 +0800 Subject: [PATCH 04/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/broker/TopicSubscriber.java | 8 ++------ .../java/org/smartboot/mqtt/common/InflightQueue.java | 7 +++++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 8bf03ebe..3d249136 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -90,10 +90,8 @@ public class TopicSubscriber { InflightQueue inflightQueue = mqttSession.getInflightQueue(); long offset = persistenceMessage.getOffset(); + nextConsumerOffset = persistenceMessage.getOffset() + 1; boolean suc = inflightQueue.offer(publishBuilder, (mqtt) -> { - if (mqttQoS == MqttQoS.AT_MOST_ONCE) { - nextConsumerOffset = persistenceMessage.getOffset() + 1; - } //最早发送的消息若收到响应,则更新点位 commitNextConsumerOffset(offset + 1); if (persistenceMessage.isRetained()) { @@ -104,13 +102,11 @@ public class TopicSubscriber { }); // 飞行队列已满 if (!suc) { + nextConsumerOffset -= 1; // LOGGER.info("queue is full..." + expectConsumerOffset); return; } long start = System.currentTimeMillis(); - if (mqttQoS != MqttQoS.AT_MOST_ONCE) { - nextConsumerOffset = persistenceMessage.getOffset() + 1; - } long cost = System.currentTimeMillis() - start; if (cost > 100) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 170ec43f..9e86ecc1 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -74,11 +74,14 @@ public class InflightQueue { // System.out.println("publish..."); } - session.write(inflightMessage.getOriginalMessage(), false); + session.write(inflightMessage.getOriginalMessage(), count == queue.length); // QOS0直接响应 if (inflightMessage.getOriginalMessage().getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); - commit(inflightMessage); + inflightMessage.setCommit(true); + if ((inflightMessage.getAssignedPacketId() - 1) % queue.length == takeIndex) { + commit(inflightMessage); + } } return true; } -- Gitee From fe9ef62ac0ef45670dae96d48a6aa9d9a570731c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 1 Apr 2023 13:04:07 +0800 Subject: [PATCH 05/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/smartboot/mqtt/broker/TopicSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 3d249136..22fd6214 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -90,7 +90,7 @@ public class TopicSubscriber { InflightQueue inflightQueue = mqttSession.getInflightQueue(); long offset = persistenceMessage.getOffset(); - nextConsumerOffset = persistenceMessage.getOffset() + 1; + nextConsumerOffset = offset + 1; boolean suc = inflightQueue.offer(publishBuilder, (mqtt) -> { //最早发送的消息若收到响应,则更新点位 commitNextConsumerOffset(offset + 1); -- Gitee From bb8877645582bd1e5ec3ea6436ada76c9d738e74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 1 Apr 2023 16:17:16 +0800 Subject: [PATCH 06/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/TopicSubscriber.java | 16 ++++------------ .../org/smartboot/mqtt/client/MqttClient.java | 11 ++--------- .../smartboot/mqtt/common/InflightQueue.java | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 22fd6214..7d6b8dd2 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -91,6 +91,7 @@ public class TopicSubscriber { InflightQueue inflightQueue = mqttSession.getInflightQueue(); long offset = persistenceMessage.getOffset(); nextConsumerOffset = offset + 1; + brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); boolean suc = inflightQueue.offer(publishBuilder, (mqtt) -> { //最早发送的消息若收到响应,则更新点位 commitNextConsumerOffset(offset + 1); @@ -101,20 +102,11 @@ public class TopicSubscriber { publish0(brokerContext, 0); }); // 飞行队列已满 - if (!suc) { - nextConsumerOffset -= 1; -// LOGGER.info("queue is full..." + expectConsumerOffset); - return; + if (suc) { + //递归处理下一个消息 + publish0(brokerContext, ++depth); } - long start = System.currentTimeMillis(); - long cost = System.currentTimeMillis() - start; - if (cost > 100) { - System.out.println("publish busy ,cost: " + cost); - } - brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); - //递归处理下一个消息 - publish0(brokerContext, ++depth); } public BrokerTopic getTopic() { diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index 61d25cfd..d33c35e5 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -285,7 +285,7 @@ public class MqttClient extends AbstractSession { unsubscribeBuilder.properties(properties); } // wait ack message. - boolean suc = getInflightQueue().offer(unsubscribeBuilder, (message) -> { + getInflightQueue().offer(unsubscribeBuilder, (message) -> { ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); for (String unsubscribedTopic : unsubscribedTopics) { subscribes.remove(unsubscribedTopic); @@ -294,9 +294,6 @@ public class MqttClient extends AbstractSession { consumeTask(); }); flush(); - if (!suc) { - registeredTasks.offer(() -> unsubscribe0(topics)); - } } public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer consumer) { @@ -332,7 +329,7 @@ public class MqttClient extends AbstractSession { subscribeBuilder.subscribeProperties(new SubscribeProperties()); } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - boolean suc = getInflightQueue().offer(subscribeBuilder, (message) -> { + getInflightQueue().offer(subscribeBuilder, (message) -> { List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); int i = 0; @@ -354,9 +351,6 @@ public class MqttClient extends AbstractSession { consumeTask(); }); flush(); - if (!suc) { - registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); - } } public void notifyResponse(MqttConnAckMessage connAckMessage) { @@ -413,7 +407,6 @@ public class MqttClient extends AbstractSession { } catch (InterruptedException e) { throw new RuntimeException(e); } - publish(publishBuilder, consumer); } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 9e86ecc1..8a4519ea 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -17,6 +17,7 @@ import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; import org.smartboot.socket.util.QuickTimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -40,6 +41,8 @@ public class InflightQueue { private final boolean skipCommit; + private final ConcurrentLinkedQueue pendingQueue = new ConcurrentLinkedQueue(); + public InflightQueue(AbstractSession session, int size, boolean skipCommit) { ValidateUtils.isTrue(size > 0, "inflight must >0"); this.queue = new InflightMessage[size]; @@ -51,6 +54,7 @@ public class InflightQueue { InflightMessage inflightMessage; synchronized (this) { if (count == queue.length) { + pendingQueue.add(new PendingUnit(publishBuilder, consumer)); return false; } int id = packetId.incrementAndGet(); @@ -197,6 +201,12 @@ public class InflightQueue { } count--; } + PendingUnit pendingUnit; + while ((pendingUnit = pendingQueue.poll()) != null) { + if (!offer(pendingUnit.publishBuilder, pendingUnit.consumer)) { + break; + } + } if (skipCommit) { inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); } @@ -208,4 +218,13 @@ public class InflightQueue { } } + class PendingUnit { + MqttMessageBuilders.MessageBuilder publishBuilder; + Consumer> consumer; + + public PendingUnit(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { + this.publishBuilder = publishBuilder; + this.consumer = consumer; + } + } } \ No newline at end of file -- Gitee From 493186015219532fb1ed95b8aabd8299a1620ad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 1 Apr 2023 17:12:20 +0800 Subject: [PATCH 07/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/smartboot/mqtt/broker/TopicSubscriber.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 7d6b8dd2..d811bf8d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -69,7 +69,7 @@ public class TopicSubscriber { mqttSession.flush(); } - private void publish0(BrokerContext brokerContext, int depth) { + private void publish0(BrokerContext brokerContext, final int depth) { PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider(); PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset); if (persistenceMessage == null) { @@ -104,7 +104,7 @@ public class TopicSubscriber { // 飞行队列已满 if (suc) { //递归处理下一个消息 - publish0(brokerContext, ++depth); + publish0(brokerContext, depth + 1); } } -- Gitee From 18570ad9c1a643488853373496092197e7fb7e79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 1 Apr 2023 21:16:33 +0800 Subject: [PATCH 08/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/common/InflightQueue.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 8a4519ea..989e1c3a 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -51,9 +51,13 @@ public class InflightQueue { } public boolean offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { + return extracted(publishBuilder, consumer, true); + } + + private boolean extracted(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer, boolean flag) { InflightMessage inflightMessage; synchronized (this) { - if (count == queue.length) { + if (count == queue.length || (flag && !pendingQueue.isEmpty())) { pendingQueue.add(new PendingUnit(publishBuilder, consumer)); return false; } @@ -147,7 +151,7 @@ public class InflightQueue { public void notify(MqttPacketIdentifierMessage message) { InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type"); - ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId"); + ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); inflightMessage.setResponseMessage(message); inflightMessage.setLatestTime(System.currentTimeMillis()); switch (message.getFixedHeader().getMessageType()) { @@ -203,7 +207,7 @@ public class InflightQueue { } PendingUnit pendingUnit; while ((pendingUnit = pendingQueue.poll()) != null) { - if (!offer(pendingUnit.publishBuilder, pendingUnit.consumer)) { + if (!extracted(pendingUnit.publishBuilder, pendingUnit.consumer, false)) { break; } } -- Gitee From e06aad3e345a345cd987146e1ad2d19d5b77d742 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sun, 2 Apr 2023 14:21:24 +0800 Subject: [PATCH 09/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/TopicSubscriber.java | 9 +++- .../org/smartboot/mqtt/client/MqttClient.java | 10 ++++- .../smartboot/mqtt/common/InflightQueue.java | 44 +++---------------- .../mqtt/common/util/MqttMessageBuilders.java | 19 +++++++- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index d811bf8d..fc4bb1cb 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -4,6 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.provider.PersistenceProvider; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; +import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -92,7 +93,7 @@ public class TopicSubscriber { long offset = persistenceMessage.getOffset(); nextConsumerOffset = offset + 1; brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); - boolean suc = inflightQueue.offer(publishBuilder, (mqtt) -> { + InflightMessage suc = inflightQueue.offer(publishBuilder, (mqtt) -> { //最早发送的消息若收到响应,则更新点位 commitNextConsumerOffset(offset + 1); if (persistenceMessage.isRetained()) { @@ -102,9 +103,13 @@ public class TopicSubscriber { publish0(brokerContext, 0); }); // 飞行队列已满 - if (suc) { + if (suc != null) { //递归处理下一个消息 publish0(brokerContext, depth + 1); + if (mqttQoS == MqttQoS.AT_MOST_ONCE) { + suc.setResponseMessage(suc.getOriginalMessage()); + inflightQueue.commit(suc); + } } } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index d33c35e5..07992a33 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.DefaultMqttWriter; +import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; import org.smartboot.mqtt.common.TopicToken; @@ -390,15 +391,19 @@ public class MqttClient extends AbstractSession { private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) { InflightQueue inflightQueue = getInflightQueue(); - boolean suc = inflightQueue.offer(publishBuilder, (message) -> { + InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> { consumer.accept(message.getVariableHeader().getPacketId()); //最早发送的消息若收到响应,则更新点位 synchronized (MqttClient.this) { MqttClient.this.notifyAll(); } }); - if (suc) { + if (inflightMessage != null) { flush(); + if (publishBuilder.qos() == MqttQoS.AT_MOST_ONCE) { + inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); + inflightQueue.commit(inflightMessage); + } } else { try { synchronized (this) { @@ -407,6 +412,7 @@ public class MqttClient extends AbstractSession { } catch (InterruptedException e) { throw new RuntimeException(e); } + publish(publishBuilder, consumer); } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 989e1c3a..3216f46e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -3,7 +3,6 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; -import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; @@ -17,7 +16,6 @@ import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; import org.smartboot.socket.util.QuickTimerTask; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -41,8 +39,6 @@ public class InflightQueue { private final boolean skipCommit; - private final ConcurrentLinkedQueue pendingQueue = new ConcurrentLinkedQueue(); - public InflightQueue(AbstractSession session, int size, boolean skipCommit) { ValidateUtils.isTrue(size > 0, "inflight must >0"); this.queue = new InflightMessage[size]; @@ -50,16 +46,11 @@ public class InflightQueue { this.skipCommit = skipCommit; } - public boolean offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { - return extracted(publishBuilder, consumer, true); - } - - private boolean extracted(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer, boolean flag) { + public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { InflightMessage inflightMessage; synchronized (this) { - if (count == queue.length || (flag && !pendingQueue.isEmpty())) { - pendingQueue.add(new PendingUnit(publishBuilder, consumer)); - return false; + if (count == queue.length) { + return null; } int id = packetId.incrementAndGet(); // 16位无符号最大值65535 @@ -83,15 +74,7 @@ public class InflightQueue { } session.write(inflightMessage.getOriginalMessage(), count == queue.length); - // QOS0直接响应 - if (inflightMessage.getOriginalMessage().getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { - inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); - inflightMessage.setCommit(true); - if ((inflightMessage.getAssignedPacketId() - 1) % queue.length == takeIndex) { - commit(inflightMessage); - } - } - return true; + return inflightMessage; } /** @@ -178,7 +161,7 @@ public class InflightQueue { } } - private synchronized void commit(InflightMessage inflightMessage) { + public synchronized void commit(InflightMessage inflightMessage) { MqttVariableMessage originalMessage = inflightMessage.getOriginalMessage(); ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message"); inflightMessage.setCommit(true); @@ -205,12 +188,7 @@ public class InflightQueue { } count--; } - PendingUnit pendingUnit; - while ((pendingUnit = pendingQueue.poll()) != null) { - if (!extracted(pendingUnit.publishBuilder, pendingUnit.consumer, false)) { - break; - } - } + if (skipCommit) { inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); } @@ -221,14 +199,4 @@ public class InflightQueue { attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } } - - class PendingUnit { - MqttMessageBuilders.MessageBuilder publishBuilder; - Consumer> consumer; - - public PendingUnit(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { - this.publishBuilder = publishBuilder; - this.consumer = consumer; - } - } } \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java index 14a055f9..0410f2ad 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java @@ -27,6 +27,8 @@ public final class MqttMessageBuilders { public interface MessageBuilder> { MessageBuilder packetId(int packetId); + MqttQoS qos(); + T build(); } @@ -82,6 +84,11 @@ public final class MqttMessageBuilders { return this; } + @Override + public MqttQoS qos() { + return qos; + } + public PublishBuilder publishProperties(PublishProperties publishProperties) { this.publishProperties = publishProperties; return this; @@ -94,7 +101,7 @@ public final class MqttMessageBuilders { public MqttPublishMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained); if (qos != MqttQoS.AT_LEAST_ONCE && qos != MqttQoS.EXACTLY_ONCE) { - packetId = -1; + packetId = -packetId; } MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(packetId, topic, publishProperties); return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload); @@ -126,6 +133,11 @@ public final class MqttMessageBuilders { return this; } + @Override + public MqttQoS qos() { + return MqttQoS.AT_LEAST_ONCE; + } + public SubscribeBuilder subscribeProperties(SubscribeProperties subscribeProperties) { this.subscribeProperties = subscribeProperties; return this; @@ -162,6 +174,11 @@ public final class MqttMessageBuilders { return this; } + @Override + public MqttQoS qos() { + return MqttQoS.AT_LEAST_ONCE; + } + public void properties(ReasonProperties properties) { this.properties = properties; } -- Gitee From 1716d335eb966914ad6c6b4ba172ac67a93759e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 3 Apr 2023 20:44:15 +0800 Subject: [PATCH 10/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/TopicSubscriber.java | 17 +++++++++----- .../broker/processor/ConnectProcessor.java | 2 +- .../org/smartboot/mqtt/client/MqttClient.java | 2 +- .../smartboot/mqtt/common/InflightQueue.java | 22 ++++++------------- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index fc4bb1cb..a7bade6e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -80,7 +80,10 @@ public class TopicSubscriber { return; } if (depth > 16) { -// LOGGER.info("退出递归..."); + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + topic.getVersion().incrementAndGet(); + } return; } @@ -88,11 +91,17 @@ public class TopicSubscriber { if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } - + //Qos0直接发送 + if (mqttQoS == MqttQoS.AT_MOST_ONCE) { + mqttSession.write(publishBuilder.build(),false); + publish0(brokerContext, depth+1); + return; + } InflightQueue inflightQueue = mqttSession.getInflightQueue(); long offset = persistenceMessage.getOffset(); nextConsumerOffset = offset + 1; brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); + InflightMessage suc = inflightQueue.offer(publishBuilder, (mqtt) -> { //最早发送的消息若收到响应,则更新点位 commitNextConsumerOffset(offset + 1); @@ -106,10 +115,6 @@ public class TopicSubscriber { if (suc != null) { //递归处理下一个消息 publish0(brokerContext, depth + 1); - if (mqttQoS == MqttQoS.AT_MOST_ONCE) { - suc.setResponseMessage(suc.getOriginalMessage()); - inflightQueue.commit(suc); - } } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 43b3ed03..4d1ceb57 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -80,7 +80,7 @@ public class ConnectProcessor implements MqttProcessor { } else { receiveMaximum = context.getBrokerConfigure().getMaxInflight(); } - session.setInflightQueue(new InflightQueue(session, receiveMaximum,true)); + session.setInflightQueue(new InflightQueue(session, receiveMaximum)); //如果服务端收到清理会话(CleanSession)标志为 1 的连接,除了将 CONNACK 报文中的返回码设置为 0 之外, // 还必须将 CONNACK 报文中的当前会话设置(Session Present)标志为 0。 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index 07992a33..bb7df152 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -154,7 +154,7 @@ public class MqttClient extends AbstractSession { //连接成功,注册订阅消息 if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - setInflightQueue(new InflightQueue(this, 16, false)); + setInflightQueue(new InflightQueue(this, 16)); connected = true; consumeTask(); //重连情况下重新触发订阅逻辑 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 3216f46e..0dce609b 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -37,13 +37,11 @@ public class InflightQueue { private final AbstractSession session; - private final boolean skipCommit; - public InflightQueue(AbstractSession session, int size, boolean skipCommit) { + public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); this.queue = new InflightMessage[size]; this.session = session; - this.skipCommit = skipCommit; } public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { @@ -67,11 +65,9 @@ public class InflightQueue { count++; //启动消息质量监测 - if (count == 1 && mqttMessage.getFixedHeader().getQosLevel().value() > 0) { + if (count == 1) { retry(inflightMessage); } -// System.out.println("publish..."); - } session.write(inflightMessage.getOriginalMessage(), count == queue.length); return inflightMessage; @@ -81,6 +77,9 @@ public class InflightQueue { * 超时重发 */ void retry(InflightMessage inflightMessage) { + if (true) { + return; + } if (inflightMessage.isCommit() || session.isDisconnect()) { return; } @@ -174,14 +173,10 @@ public class InflightQueue { if (takeIndex == queue.length) { takeIndex = 0; } - if (!skipCommit) { - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); - } + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); while (count > 0 && queue[takeIndex].isCommit()) { inflightMessage = queue[takeIndex]; - if (!skipCommit) { - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); - } + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; @@ -189,9 +184,6 @@ public class InflightQueue { count--; } - if (skipCommit) { - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); - } if (count > 0) { //注册超时监听任务 Attachment attachment = session.session.getAttachment(); -- Gitee From 900c24b88272ffb4873ee14d5a99d917725694f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 3 Apr 2023 21:00:18 +0800 Subject: [PATCH 11/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/TopicSubscriber.java | 36 +++++++++++++------ .../smartboot/mqtt/common/InflightQueue.java | 18 ++++++++++ 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index a7bade6e..0082251c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -93,8 +93,8 @@ public class TopicSubscriber { } //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { - mqttSession.write(publishBuilder.build(),false); - publish0(brokerContext, depth+1); + mqttSession.write(publishBuilder.build(), false); + publish0(brokerContext, depth + 1); return; } InflightQueue inflightQueue = mqttSession.getInflightQueue(); @@ -102,15 +102,29 @@ public class TopicSubscriber { nextConsumerOffset = offset + 1; brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); - InflightMessage suc = inflightQueue.offer(publishBuilder, (mqtt) -> { - //最早发送的消息若收到响应,则更新点位 - commitNextConsumerOffset(offset + 1); - if (persistenceMessage.isRetained()) { - setRetainConsumerOffset(getRetainConsumerOffset() + 1); - } - commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); - publish0(brokerContext, 0); - }); + InflightMessage suc; + if (depth == 0) { + suc = inflightQueue.offer(publishBuilder, (mqtt) -> { + //最早发送的消息若收到响应,则更新点位 + commitNextConsumerOffset(offset + 1); + if (persistenceMessage.isRetained()) { + setRetainConsumerOffset(getRetainConsumerOffset() + 1); + } + commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); + publish0(brokerContext, 1); + }, () -> publish0(brokerContext, 0)); + } else { + suc = inflightQueue.offer(publishBuilder, (mqtt) -> { + //最早发送的消息若收到响应,则更新点位 + commitNextConsumerOffset(offset + 1); + if (persistenceMessage.isRetained()) { + setRetainConsumerOffset(getRetainConsumerOffset() + 1); + } + commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); + publish0(brokerContext, 1); + }); + } + // 飞行队列已满 if (suc != null) { //递归处理下一个消息 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 0dce609b..a92a5db5 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -16,6 +16,7 @@ import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; import org.smartboot.socket.util.QuickTimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -36,6 +37,7 @@ public class InflightQueue { private final AtomicInteger packetId = new AtomicInteger(0); private final AbstractSession session; + private ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); public InflightQueue(AbstractSession session, int size) { @@ -45,9 +47,16 @@ public class InflightQueue { } public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { + return offer(publishBuilder, consumer, null); + } + + public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer, Runnable runnable) { InflightMessage inflightMessage; synchronized (this) { if (count == queue.length) { + if (runnable != null) { + runnables.offer(runnable); + } return null; } int id = packetId.incrementAndGet(); @@ -190,5 +199,14 @@ public class InflightQueue { InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } + int i = queue.length - count; + while (i-- > 0) { + Runnable runnable = runnables.poll(); + if (runnable != null) { + runnable.run(); + } else { + break; + } + } } } \ No newline at end of file -- Gitee From 334f0d38f1c699cf92c1f3c2773319efb131ce90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 3 Apr 2023 21:57:51 +0800 Subject: [PATCH 12/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/smartboot/mqtt/common/InflightQueue.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index a92a5db5..e91766dd 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -86,9 +86,6 @@ public class InflightQueue { * 超时重发 */ void retry(InflightMessage inflightMessage) { - if (true) { - return; - } if (inflightMessage.isCommit() || session.isDisconnect()) { return; } -- Gitee From 1b4b4f4cca90776366cb5d62fa021f263b69f13c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 3 Apr 2023 22:30:20 +0800 Subject: [PATCH 13/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/smartboot/mqtt/common/InflightQueue.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index e91766dd..09c193de 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -196,8 +196,7 @@ public class InflightQueue { InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } - int i = queue.length - count; - while (i-- > 0) { + while (count < queue.length) { Runnable runnable = runnables.poll(); if (runnable != null) { runnable.run(); -- Gitee From 651351d03a31f35dc1f882e7e49e21ea356563bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 4 Apr 2023 20:35:07 +0800 Subject: [PATCH 14/14] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/smartboot/mqtt/client/MqttClient.java | 5 +++++ .../main/java/org/smartboot/mqtt/common/InflightQueue.java | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index bb7df152..19918d9d 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -390,6 +390,11 @@ public class MqttClient extends AbstractSession { } private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) { + if (publishBuilder.qos() == MqttQoS.AT_MOST_ONCE) { + write(publishBuilder.build()); + consumer.accept(0); + return; + } InflightQueue inflightQueue = getInflightQueue(); InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> { consumer.accept(message.getVariableHeader().getPacketId()); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 09c193de..3dd3cfb1 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -37,7 +37,7 @@ public class InflightQueue { private final AtomicInteger packetId = new AtomicInteger(0); private final AbstractSession session; - private ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); public InflightQueue(AbstractSession session, int size) { -- Gitee