diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index e13c6c1b60f78544fbf664409b639b6219da0a6e..eb8311cc6872e5b4de08ffe2495409c2ec72d655 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -13,7 +13,6 @@ import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBusImpl; import org.smartboot.mqtt.common.eventbus.EventType; -import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; import org.smartboot.mqtt.common.message.MqttDisconnectMessage; @@ -156,10 +155,7 @@ public class MqttClient extends AbstractSession { if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { setInflightQueue(new InflightQueue(this, 16)); connected = true; - Runnable runnable; - while ((runnable = registeredTasks.poll()) != null) { - runnable.run(); - } + consumeTask(); //重连情况下重新触发订阅逻辑 subscribes.forEach((k, v) -> { subscribe(k, v.getQoS(), v.getConsumer()); @@ -240,6 +236,13 @@ public class MqttClient extends AbstractSession { } } + private void consumeTask() { + Runnable runnable; + while ((runnable = registeredTasks.poll()) != null) { + runnable.run(); + } + } + /** * 释放本地内存 */ @@ -282,17 +285,20 @@ public class MqttClient extends AbstractSession { unsubscribeBuilder.properties(properties); } // wait ack message. - try { - getInflightQueue().put(unsubscribeBuilder, (message) -> { - ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); - for (String unsubscribedTopic : unsubscribedTopics) { - subscribes.remove(unsubscribedTopic); - wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter())); - } - }); - } catch (InterruptedException e) { - throw new MqttException("unsubscribe topic exception", e); + boolean suc = getInflightQueue().offer(unsubscribeBuilder, (message) -> { + ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); + for (String unsubscribedTopic : unsubscribedTopics) { + subscribes.remove(unsubscribedTopic); + wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter())); + } + consumeTask(); + }); + if (suc) { + flush(); + } else { + registeredTasks.offer(() -> unsubscribe0(topics)); } + } public MqttClient subscribe(String topic, MqttQoS qos, BiConsumer consumer) { @@ -328,29 +334,31 @@ public class MqttClient extends AbstractSession { subscribeBuilder.subscribeProperties(new SubscribeProperties()); } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - try { - getInflightQueue().put(subscribeBuilder, (message) -> { - List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); - ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); - int i = 0; - for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) { - MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++))); - clientConfigure.getTopicListener().subscribe(subscription.getTopicFilter(), subscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : minQos); - if (subscription.getQualityOfService() != MqttQoS.FAILURE) { - subscribes.put(subscription.getTopicFilter(), new Subscribe(subscription.getTopicFilter(), minQos, consumer)); - //缓存统配匹配的topic - TopicToken topicToken = new TopicToken(subscription.getTopicFilter()); - if (topicToken.isWildcards()) { - wildcardsToken.add(topicToken); - } - } else { - LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); + boolean suc = getInflightQueue().offer(subscribeBuilder, (message) -> { + List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); + ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); + int i = 0; + for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) { + MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++))); + clientConfigure.getTopicListener().subscribe(subscription.getTopicFilter(), subscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : minQos); + if (subscription.getQualityOfService() != MqttQoS.FAILURE) { + subscribes.put(subscription.getTopicFilter(), new Subscribe(subscription.getTopicFilter(), minQos, consumer)); + //缓存统配匹配的topic + TopicToken topicToken = new TopicToken(subscription.getTopicFilter()); + if (topicToken.isWildcards()) { + wildcardsToken.add(topicToken); } - subAckConsumer.accept(MqttClient.this, minQos); + } else { + LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); } - }); - } catch (InterruptedException e) { - throw new MqttException("subscribe topic exception", e); + subAckConsumer.accept(MqttClient.this, minQos); + consumeTask(); + } + }); + if (suc) { + flush(); + } else { + registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); } } @@ -391,18 +399,27 @@ public class MqttClient extends AbstractSession { private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) { InflightQueue inflightQueue = getInflightQueue(); - try { - inflightQueue.put(publishBuilder, (message) -> { - consumer.accept(message.getVariableHeader().getPacketId()); - //最早发送的消息若收到响应,则更新点位 - synchronized (MqttClient.this) { - MqttClient.this.notifyAll(); + boolean suc = inflightQueue.offer(publishBuilder, (message) -> { + consumer.accept(message.getVariableHeader().getPacketId()); + //最早发送的消息若收到响应,则更新点位 + synchronized (MqttClient.this) { + MqttClient.this.notifyAll(); + } + consumeTask(); + }); + if (suc) { + flush(); + } else { + try { + synchronized (this) { + wait(); } - }); - } catch (InterruptedException e) { - throw new MqttException("publish message exception", e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + publish(publishBuilder, consumer); } - flush(); + } public MqttClientConfigure getClientConfigure() { diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java index 798f3c324a8e6e0413b84deab6b13dbb7454fc0e..b89238bb82fd3f07a8f430b52621f1217aa236fa 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java @@ -34,8 +34,7 @@ public class MqttClientProcessor extends AbstractMessageProcessor { private final MqttClient mqttClient; private static final Map, MqttProcessor> processors = new HashMap<>(); - public MqttClientProcessor(MqttClient mqttClient) { - this.mqttClient = mqttClient; + static { processors.put(MqttConnAckMessage.class, new ConnAckProcessor()); processors.put(MqttPubAckMessage.class, new MqttAckProcessor()); processors.put(MqttPublishMessage.class, new PublishProcessor()); @@ -46,6 +45,10 @@ public class MqttClientProcessor extends AbstractMessageProcessor { processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor()); } + public MqttClientProcessor(MqttClient mqttClient) { + this.mqttClient = mqttClient; + } + @Override public void process0(AioSession session, MqttMessage msg) { mqttClient.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(mqttClient, msg)); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 7a6f88098d36b4ad5dfa8bb16e2713cdb03cf4e9..6f71f75d2e7998e7f652f384e20a5d72f8d60645 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -37,7 +37,6 @@ public class InflightQueue { private final AtomicInteger packetId = new AtomicInteger(0); private final AbstractSession session; - private boolean lock; public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); @@ -45,15 +44,6 @@ public class InflightQueue { this.session = session; } - public synchronized void put(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) throws InterruptedException { - while (count == queue.length) { - lock = true; - session.flush(); - wait(); - } - offer(publishBuilder, consumer); - } - public boolean offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { InflightMessage inflightMessage; synchronized (this) { @@ -108,14 +98,14 @@ public class InflightQueue { LOGGER.debug("session is disconnect , pause qos monitor."); return; } - long delay = System.currentTimeMillis() - inflightMessage.getLatestTime(); + long delay = TimeUnit.SECONDS.toMillis(TIMEOUT) - System.currentTimeMillis() + inflightMessage.getLatestTime(); if (delay > 0) { LOGGER.info("the time is not up, try again in {} milliseconds ", delay); QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(this, delay, TimeUnit.MILLISECONDS); return; } inflightMessage.setLatestTime(System.currentTimeMillis()); - LOGGER.info("time out,retry..."); + LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage()); switch (inflightMessage.getExpectMessageType()) { case PUBACK: case PUBREC: @@ -197,10 +187,6 @@ public class InflightQueue { } count--; } - if (lock) { - lock = false; - notifyAll(); - } if (count > 0) { //注册超时监听任务 Attachment attachment = session.session.getAttachment();