diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 0082251cc9f8651a7e11eaf7f299e448d980f81a..9a7844981962693677d52af9b0a4fd5396812401 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -91,17 +91,17 @@ public class TopicSubscriber { if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } + + InflightQueue inflightQueue = mqttSession.getInflightQueue(); + long offset = persistenceMessage.getOffset(); + nextConsumerOffset = offset + 1; + brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); publish0(brokerContext, depth + 1); return; } - InflightQueue inflightQueue = mqttSession.getInflightQueue(); - long offset = persistenceMessage.getOffset(); - nextConsumerOffset = offset + 1; - brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); - InflightMessage suc; if (depth == 0) { suc = inflightQueue.offer(publishBuilder, (mqtt) -> {