From 60220b43db7a7fa97ea619b3909e20d2995000c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 4 Apr 2023 20:53:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E5=8F=91=200.17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/mqtt/broker/TopicSubscriber.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 0082251c..9a784498 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -91,17 +91,17 @@ public class TopicSubscriber { if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } + + InflightQueue inflightQueue = mqttSession.getInflightQueue(); + long offset = persistenceMessage.getOffset(); + nextConsumerOffset = offset + 1; + brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); publish0(brokerContext, depth + 1); return; } - InflightQueue inflightQueue = mqttSession.getInflightQueue(); - long offset = persistenceMessage.getOffset(); - nextConsumerOffset = offset + 1; - brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); - InflightMessage suc; if (depth == 0) { suc = inflightQueue.offer(publishBuilder, (mqtt) -> { -- Gitee