diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 8bf03ebe47b7eedfe14c75d4a953501bd3fd667c..0082251cc9f8651a7e11eaf7f299e448d980f81a 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -4,6 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.broker.provider.PersistenceProvider; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; +import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -69,7 +70,7 @@ public class TopicSubscriber { mqttSession.flush(); } - private void publish0(BrokerContext brokerContext, int depth) { + private void publish0(BrokerContext brokerContext, final int depth) { PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider(); PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset); if (persistenceMessage == null) { @@ -79,7 +80,10 @@ public class TopicSubscriber { return; } if (depth > 16) { -// LOGGER.info("退出递归..."); + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + topic.getVersion().incrementAndGet(); + } return; } @@ -87,38 +91,46 @@ public class TopicSubscriber { if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } - - InflightQueue inflightQueue = mqttSession.getInflightQueue(); - long offset = persistenceMessage.getOffset(); - boolean suc = inflightQueue.offer(publishBuilder, (mqtt) -> { - if (mqttQoS == MqttQoS.AT_MOST_ONCE) { - nextConsumerOffset = persistenceMessage.getOffset() + 1; - } - //最早发送的消息若收到响应,则更新点位 - commitNextConsumerOffset(offset + 1); - if (persistenceMessage.isRetained()) { - setRetainConsumerOffset(getRetainConsumerOffset() + 1); - } - commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); - publish0(brokerContext, 0); - }); - // 飞行队列已满 - if (!suc) { -// LOGGER.info("queue is full..." + expectConsumerOffset); + //Qos0直接发送 + if (mqttQoS == MqttQoS.AT_MOST_ONCE) { + mqttSession.write(publishBuilder.build(), false); + publish0(brokerContext, depth + 1); return; } - long start = System.currentTimeMillis(); - if (mqttQoS != MqttQoS.AT_MOST_ONCE) { - nextConsumerOffset = persistenceMessage.getOffset() + 1; + InflightQueue inflightQueue = mqttSession.getInflightQueue(); + long offset = persistenceMessage.getOffset(); + nextConsumerOffset = offset + 1; + brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); + + InflightMessage suc; + if (depth == 0) { + suc = inflightQueue.offer(publishBuilder, (mqtt) -> { + //最早发送的消息若收到响应,则更新点位 + commitNextConsumerOffset(offset + 1); + if (persistenceMessage.isRetained()) { + setRetainConsumerOffset(getRetainConsumerOffset() + 1); + } + commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); + publish0(brokerContext, 1); + }, () -> publish0(brokerContext, 0)); + } else { + suc = inflightQueue.offer(publishBuilder, (mqtt) -> { + //最早发送的消息若收到响应,则更新点位 + commitNextConsumerOffset(offset + 1); + if (persistenceMessage.isRetained()) { + setRetainConsumerOffset(getRetainConsumerOffset() + 1); + } + commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); + publish0(brokerContext, 1); + }); } - long cost = System.currentTimeMillis() - start; - if (cost > 100) { - System.out.println("publish busy ,cost: " + cost); + // 飞行队列已满 + if (suc != null) { + //递归处理下一个消息 + publish0(brokerContext, depth + 1); } - brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); - //递归处理下一个消息 - publish0(brokerContext, ++depth); + } public BrokerTopic getTopic() { diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index eb8311cc6872e5b4de08ffe2495409c2ec72d655..19918d9d8df91dfcffb8ec39a2ceacc06f1132a6 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.DefaultMqttWriter; +import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; import org.smartboot.mqtt.common.TopicToken; @@ -237,8 +238,8 @@ public class MqttClient extends AbstractSession { } private void consumeTask() { - Runnable runnable; - while ((runnable = registeredTasks.poll()) != null) { + Runnable runnable = registeredTasks.poll(); + if (runnable != null) { runnable.run(); } } @@ -285,7 +286,7 @@ public class MqttClient extends AbstractSession { unsubscribeBuilder.properties(properties); } // wait ack message. - boolean suc = getInflightQueue().offer(unsubscribeBuilder, (message) -> { + getInflightQueue().offer(unsubscribeBuilder, (message) -> { ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); for (String unsubscribedTopic : unsubscribedTopics) { subscribes.remove(unsubscribedTopic); @@ -293,12 +294,7 @@ public class MqttClient extends AbstractSession { } consumeTask(); }); - if (suc) { - flush(); - } else { - registeredTasks.offer(() -> unsubscribe0(topics)); - } - + flush(); } public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer consumer) { @@ -334,7 +330,7 @@ public class MqttClient extends AbstractSession { subscribeBuilder.subscribeProperties(new SubscribeProperties()); } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - boolean suc = getInflightQueue().offer(subscribeBuilder, (message) -> { + getInflightQueue().offer(subscribeBuilder, (message) -> { List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); int i = 0; @@ -352,14 +348,10 @@ public class MqttClient extends AbstractSession { LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); } subAckConsumer.accept(MqttClient.this, minQos); - consumeTask(); } + consumeTask(); }); - if (suc) { - flush(); - } else { - registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); - } + flush(); } public void notifyResponse(MqttConnAckMessage connAckMessage) { @@ -398,17 +390,25 @@ public class MqttClient extends AbstractSession { } private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) { + if (publishBuilder.qos() == MqttQoS.AT_MOST_ONCE) { + write(publishBuilder.build()); + consumer.accept(0); + return; + } InflightQueue inflightQueue = getInflightQueue(); - boolean suc = inflightQueue.offer(publishBuilder, (message) -> { + InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> { consumer.accept(message.getVariableHeader().getPacketId()); //最早发送的消息若收到响应,则更新点位 synchronized (MqttClient.this) { MqttClient.this.notifyAll(); } - consumeTask(); }); - if (suc) { + if (inflightMessage != null) { flush(); + if (publishBuilder.qos() == MqttQoS.AT_MOST_ONCE) { + inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); + inflightQueue.commit(inflightMessage); + } } else { try { synchronized (this) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index e27c960bb472cca716f5f40038cb483708e4a592..9089142433cc5bfb4847af040c9c5b5747983fe5 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -1,5 +1,7 @@ package org.smartboot.mqtt.common; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; @@ -29,6 +31,7 @@ import java.util.function.Consumer; * @version V1.0 , 2022/4/12 */ public abstract class AbstractSession { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class); private final EventBus eventBus; protected String clientId; protected AioSession session; @@ -83,8 +86,12 @@ public abstract class AbstractSession { inflightQueue.notify(message); } else { QosMessage qosMessage = ackMessageCacheMap.remove(message.getVariableHeader().getPacketId()); - qosMessage.setCommit(true); - qosMessage.getConsumer().accept(message); + if (qosMessage != null) { + qosMessage.setCommit(true); + qosMessage.getConsumer().accept(message); + } else { + LOGGER.info("message is null"); + } } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 6f71f75d2e7998e7f652f384e20a5d72f8d60645..3dd3cfb14ef3bee55f455897b18c084d7b86538d 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -3,7 +3,6 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; -import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; @@ -17,6 +16,7 @@ import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; import org.smartboot.socket.util.QuickTimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -37,6 +37,8 @@ public class InflightQueue { private final AtomicInteger packetId = new AtomicInteger(0); private final AbstractSession session; + private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); + public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); @@ -44,11 +46,18 @@ public class InflightQueue { this.session = session; } - public boolean offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { + public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { + return offer(publishBuilder, consumer, null); + } + + public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer, Runnable runnable) { InflightMessage inflightMessage; synchronized (this) { if (count == queue.length) { - return false; + if (runnable != null) { + runnables.offer(runnable); + } + return null; } int id = packetId.incrementAndGet(); // 16位无符号最大值65535 @@ -65,19 +74,12 @@ public class InflightQueue { count++; //启动消息质量监测 - if (count == 1 && mqttMessage.getFixedHeader().getQosLevel().value() > 0) { + if (count == 1) { retry(inflightMessage); } -// System.out.println("publish..."); - - } - session.write(inflightMessage.getOriginalMessage(), false); - // QOS0直接响应 - if (inflightMessage.getOriginalMessage().getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { - inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage()); - commit(inflightMessage); } - return true; + session.write(inflightMessage.getOriginalMessage(), count == queue.length); + return inflightMessage; } /** @@ -105,7 +107,7 @@ public class InflightQueue { return; } inflightMessage.setLatestTime(System.currentTimeMillis()); - LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage()); + LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage().getFixedHeader()); switch (inflightMessage.getExpectMessageType()) { case PUBACK: case PUBREC: @@ -137,7 +139,7 @@ public class InflightQueue { public void notify(MqttPacketIdentifierMessage message) { InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type"); - ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId"); + ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); inflightMessage.setResponseMessage(message); inflightMessage.setLatestTime(System.currentTimeMillis()); switch (message.getFixedHeader().getMessageType()) { @@ -164,7 +166,7 @@ public class InflightQueue { } } - private synchronized void commit(InflightMessage inflightMessage) { + public synchronized void commit(InflightMessage inflightMessage) { MqttVariableMessage originalMessage = inflightMessage.getOriginalMessage(); ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message"); inflightMessage.setCommit(true); @@ -187,15 +189,20 @@ public class InflightQueue { } count--; } + if (count > 0) { //注册超时监听任务 Attachment attachment = session.session.getAttachment(); InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } - } - - public int getCount() { - return count; + while (count < queue.length) { + Runnable runnable = runnables.poll(); + if (runnable != null) { + runnable.run(); + } else { + break; + } + } } } \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java index 4e152214bb2a5f8cd1635c8473e20d8ea11994ca..18f11280a86aed9619026f6323af67e1c6a27516 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java @@ -1,5 +1,6 @@ package org.smartboot.mqtt.common.message; +import org.smartboot.mqtt.common.ToString; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttQoS; @@ -19,7 +20,7 @@ import org.smartboot.mqtt.common.enums.MqttQoS; * @author 三刀 * @version V1.0 , 2018/4/22 */ -public class MqttFixedHeader { +public class MqttFixedHeader extends ToString { public static final MqttFixedHeader CONNECT_HEADER = new MqttFixedHeader(MqttMessageType.CONNECT, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader CONN_ACK_HEADER = new MqttFixedHeader(MqttMessageType.CONNACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.PUBACK, MqttQoS.AT_MOST_ONCE); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java index 14a055f91d0ef1e74ba012e24175ab9c72545965..0410f2ad249baf33a82f90d676e1f97251d81b18 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java @@ -27,6 +27,8 @@ public final class MqttMessageBuilders { public interface MessageBuilder> { MessageBuilder packetId(int packetId); + MqttQoS qos(); + T build(); } @@ -82,6 +84,11 @@ public final class MqttMessageBuilders { return this; } + @Override + public MqttQoS qos() { + return qos; + } + public PublishBuilder publishProperties(PublishProperties publishProperties) { this.publishProperties = publishProperties; return this; @@ -94,7 +101,7 @@ public final class MqttMessageBuilders { public MqttPublishMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained); if (qos != MqttQoS.AT_LEAST_ONCE && qos != MqttQoS.EXACTLY_ONCE) { - packetId = -1; + packetId = -packetId; } MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(packetId, topic, publishProperties); return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload); @@ -126,6 +133,11 @@ public final class MqttMessageBuilders { return this; } + @Override + public MqttQoS qos() { + return MqttQoS.AT_LEAST_ONCE; + } + public SubscribeBuilder subscribeProperties(SubscribeProperties subscribeProperties) { this.subscribeProperties = subscribeProperties; return this; @@ -162,6 +174,11 @@ public final class MqttMessageBuilders { return this; } + @Override + public MqttQoS qos() { + return MqttQoS.AT_LEAST_ONCE; + } + public void properties(ReasonProperties properties) { this.properties = properties; }