From 67cf6cc91831ea4eb9814716bb9affbb9a838d85 Mon Sep 17 00:00:00 2001 From: 17712858268 <419479707@qq.com> Date: Wed, 29 Mar 2023 20:12:54 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=85=B1=E4=BA=AB?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- smqttx-common/pom.xml | 5 ++ .../integrate/cluster/IntegrateCluster.java | 11 +++- .../integrate/topic/IntegrateTopics.java | 6 +- .../integrate/topic/TopicFilterType.java | 57 +++++++++++++++++++ .../common/message/mqtt/ClusterMessage.java | 2 +- .../core/protocol/PublishProtocol.java | 1 + smqttx-integrate/pom.xml | 10 ++++ .../interate/IgniteIntegrateCluster.java | 48 ++++++++++++++++ .../interate/IgniteIntegrateTopics.java | 52 +++++++++++++++-- .../sharedstrategy/RandomStrategy.java | 20 +++++++ .../sharedstrategy/SharedStrategy.java | 13 +++++ .../sharedstrategy/SharedStrategyFactory.java | 17 ++++++ .../sharedstrategy/SharedStrategyType.java | 23 ++++++++ 13 files changed, 254 insertions(+), 11 deletions(-) create mode 100644 smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/TopicFilterType.java create mode 100644 smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/RandomStrategy.java create mode 100644 smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategy.java create mode 100644 smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyFactory.java create mode 100644 smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyType.java diff --git a/smqttx-common/pom.xml b/smqttx-common/pom.xml index dad780af..dc13b57a 100644 --- a/smqttx-common/pom.xml +++ b/smqttx-common/pom.xml @@ -18,6 +18,11 @@ + + org.apache.commons + commons-lang3 + 3.4 + cn.hutool hutool-core diff --git a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java index 0f2bc8c9..ad178880 100644 --- a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java +++ b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java @@ -2,8 +2,7 @@ package io.github.quickmsg.common.integrate.cluster; import io.github.quickmsg.common.integrate.IntegrateGetter; import io.github.quickmsg.common.message.mqtt.ClusterMessage; -import io.github.quickmsg.common.message.mqtt.PublishMessage; -import reactor.core.publisher.Mono; +import org.apache.ignite.cluster.ClusterGroup; import java.util.Set; @@ -27,7 +26,7 @@ public interface IntegrateCluster extends IntegrateGetter { */ Set getOtherClusterNode(); - + ClusterGroup getClusterNodeByConsistentId(String consistentId); /** * acquire local node id * @@ -65,4 +64,10 @@ public interface IntegrateCluster extends IntegrateGetter { */ void sendCluster(String topic,ClusterMessage clusterMessage); + /** + * 集群消息 + * @param topic TOPIC + * @param clusterMessage {@link ClusterMessage} + */ + void sendSharedSubscribe(String topic,ClusterMessage clusterMessage); } diff --git a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java index 1d770f6e..3c172a3f 100644 --- a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java +++ b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java @@ -1,9 +1,9 @@ package io.github.quickmsg.common.integrate.topic; import io.github.quickmsg.common.channel.MqttChannel; -import io.github.quickmsg.common.integrate.SubscribeTopic; import io.github.quickmsg.common.integrate.IntegrateGetter; -import io.github.quickmsg.common.message.mqtt.ClusterMessage; +import io.github.quickmsg.common.integrate.SubscribeTopic; +import org.apache.commons.lang3.tuple.ImmutableTriple; import java.util.List; import java.util.Map; @@ -67,7 +67,9 @@ public interface IntegrateTopics extends IntegrateGetter { Set getWildcardTopics(String topic); + Set> getQueueSubscribeTopics(String topic); + Set> getShareSubscribeTopics(String topic); /** * get all subscribers * @return Map diff --git a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/TopicFilterType.java b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/TopicFilterType.java new file mode 100644 index 00000000..626df23f --- /dev/null +++ b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/TopicFilterType.java @@ -0,0 +1,57 @@ +package io.github.quickmsg.common.integrate.topic; +/** + * @author wuzhong + */ +public enum TopicFilterType { + /** + * 默认 TopicFilter + */ + NONE , + /** + * $queue/ 为前缀的共享订阅 + */ + QUEUE , + + /** + * $share/{group-name}/ 为前缀的分组订阅 + */ + SHARE; + /** + * 共享订阅的 topic + */ + public static final String SHARE_QUEUE_PREFIX = "$queue/"; + public static final String SHARE_GROUP_PREFIX = "$share/"; + + /** + * 获取 topicFilter 类型 + * + * @param topicFilter topicFilter + * @return TopicFilterType + */ + public static TopicFilterType getType(String topicFilter) { + if (topicFilter.startsWith(TopicFilterType.SHARE_QUEUE_PREFIX)) { + return TopicFilterType.QUEUE; + } else if (topicFilter.startsWith(TopicFilterType.SHARE_GROUP_PREFIX)) { + return TopicFilterType.SHARE; + } else { + return TopicFilterType.NONE; + } + } + /** + * 读取共享订阅的分组名 + * + * @param topicFilter topicFilter + * @return 共享订阅分组名 + */ + public static String getShareGroupName(String topicFilter) { + int prefixLength = TopicFilterType.SHARE_GROUP_PREFIX.length(); + int topicFilterLength = topicFilter.length(); + for (int i = prefixLength; i < topicFilterLength; i++) { + char ch = topicFilter.charAt(i); + if ('/' == ch) { + return topicFilter.substring(prefixLength, i); + } + } + throw new IllegalArgumentException("Share subscription topicFilter: " + topicFilter + " not conform to the $share//xxx"); + } +} diff --git a/smqttx-common/src/main/java/io/github/quickmsg/common/message/mqtt/ClusterMessage.java b/smqttx-common/src/main/java/io/github/quickmsg/common/message/mqtt/ClusterMessage.java index a0e176d2..96ffde88 100644 --- a/smqttx-common/src/main/java/io/github/quickmsg/common/message/mqtt/ClusterMessage.java +++ b/smqttx-common/src/main/java/io/github/quickmsg/common/message/mqtt/ClusterMessage.java @@ -43,8 +43,8 @@ public class ClusterMessage{ public PublishMessage toPublishMessage() { PublishMessage publishMessage = new PublishMessage(); - publishMessage.setTopic(this.originTopic); publishMessage.setQos(this.qos); + publishMessage.setTopic(this.originTopic); publishMessage.setRetain(this.retain); publishMessage.setBody(this.body); return publishMessage; diff --git a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java index 3357a6c7..d803246c 100644 --- a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java +++ b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java @@ -65,6 +65,7 @@ public class PublishProtocol implements Protocol { logManager.printInfo(mqttChannel, LogEvent.PUBLISH, LogStatus.SUCCESS, JacksonUtil.bean2Json(message)); ClusterMessage clusterMessage = new ClusterMessage(message); integrateCluster.sendCluster(clusterMessage.getTopic(), clusterMessage); + integrateCluster.sendSharedSubscribe(clusterMessage.getTopic(), clusterMessage); Set wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic()); if (wildcardTopics != null && wildcardTopics.size() > 0) { wildcardTopics.forEach(tp -> { diff --git a/smqttx-integrate/pom.xml b/smqttx-integrate/pom.xml index e48d33d4..6b17d0c9 100644 --- a/smqttx-integrate/pom.xml +++ b/smqttx-integrate/pom.xml @@ -17,6 +17,16 @@ 2.11.0 + + org.apache.commons + commons-lang3 + 3.4 + + + commons-collections + commons-collections + 3.2.2 + org.apache.commons commons-jexl3 diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java index 93b5e862..d343887f 100644 --- a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java @@ -4,10 +4,19 @@ import io.github.quickmsg.common.integrate.Integrate; import io.github.quickmsg.common.integrate.cluster.IntegrateCluster; import io.github.quickmsg.common.message.mqtt.ClusterMessage; import io.github.quickmsg.common.utils.ServerUtils; +import io.github.quickmsg.interate.sharedstrategy.SharedStrategy; +import io.github.quickmsg.interate.sharedstrategy.SharedStrategyFactory; +import io.github.quickmsg.interate.sharedstrategy.SharedStrategyType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.lang.IgniteBiPredicate; import java.io.Serializable; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -58,6 +67,16 @@ public class IgniteIntegrateCluster implements IntegrateCluster, Serializable { .collect(Collectors.toSet()); } + @Override + public ClusterGroup getClusterNodeByConsistentId(String consistentId) { + ClusterNode node =igniteCluster + .nodes() + .stream() + .filter(clusterNode -> StringUtils.equals(consistentId, clusterNode.consistentId().toString())) + .findFirst().get(); + return igniteCluster.forHost(node); + } + @Override public String getLocalNode() { return igniteIntegrate.getIgnite().cluster().localNode().addresses().stream().findFirst().orElse(ServerUtils.serverIp); @@ -89,6 +108,35 @@ public class IgniteIntegrateCluster implements IntegrateCluster, Serializable { message.send(topic, clusterMessage); } + @Override + public void sendSharedSubscribe(String topic, ClusterMessage clusterMessage) { + //如果有共享訂閱時,共享訂閱模式發送 + Set> queueSubscribeTopics = this.igniteIntegrate.getTopics().getQueueSubscribeTopics(topic); + if(CollectionUtils.isNotEmpty(queueSubscribeTopics)){ + sendSharedMessage(clusterMessage, queueSubscribeTopics, SharedStrategyType.random); + } + //如果有分組訂閱時,分組訂閱模式發送 + Set> shareSubscribeTopics = this.igniteIntegrate.getTopics().getShareSubscribeTopics(clusterMessage.getTopic()); + if(CollectionUtils.isNotEmpty(shareSubscribeTopics)){ + Map>> shareSubscribeMap = new ConcurrentHashMap<>(); + for (ImmutableTriple tr: shareSubscribeTopics) { + shareSubscribeMap.computeIfAbsent(tr.middle, s -> new HashSet<>()).add(tr); + } + for (Map.Entry>> entry: shareSubscribeMap.entrySet()) { + sendSharedMessage(clusterMessage, entry.getValue(), SharedStrategyType.random); + } + } + } + + private void sendSharedMessage(ClusterMessage clusterMessage, Set> subscribeTopics, SharedStrategyType strategyType) { + SharedStrategy sharedStrategy = SharedStrategyFactory.createSharedStrategy(strategyType); + ImmutableTriple tr = sharedStrategy.searchSharedSubscribe(subscribeTopics); + IgniteMessaging igniteMessaging = this.igniteIntegrate.getIgnite().message(this.igniteIntegrate.getCluster().getClusterNodeByConsistentId(tr.left)); + clusterMessage.setTopic(tr.right); + clusterMessage.setOriginTopic(tr.right); + igniteMessaging.send(tr.right, clusterMessage); + } + @Override public Integrate getIntegrate() { diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java index b1c16bd6..abae326f 100644 --- a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java @@ -5,20 +5,22 @@ import io.github.quickmsg.common.context.ContextHolder; import io.github.quickmsg.common.integrate.Integrate; import io.github.quickmsg.common.integrate.SubscribeTopic; import io.github.quickmsg.common.integrate.topic.IntegrateTopics; +import io.github.quickmsg.common.integrate.topic.TopicFilterType; import io.github.quickmsg.common.metric.CounterType; -import io.github.quickmsg.common.utils.IPUtils; import io.github.quickmsg.common.utils.TopicRegexUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CollectionConfiguration; -import java.util.*; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -38,6 +40,8 @@ public class IgniteIntegrateTopics implements IntegrateTopics { protected static final String MORE_SYMBOL = "#"; + private final IgniteSet> queueSubscribeCache; + private final IgniteSet> shareSubscribeCache; public boolean checkFilter(String topicFilter) { return topicFilter.contains(ONE_SYMBOL); @@ -50,6 +54,14 @@ public class IgniteIntegrateTopics implements IntegrateTopics { .setAtomicityMode(CacheAtomicityMode.ATOMIC) .setCollocated(true)); this.topicSubscribers = new ConcurrentHashMap<>(); + this.queueSubscribeCache = integrate.getIgnite().set("queueSubscribe", + new CollectionConfiguration().setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setCollocated(true)); + this.shareSubscribeCache = integrate.getIgnite().set("shareSubscribe", + new CollectionConfiguration().setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setCollocated(true)); } @Override @@ -66,7 +78,12 @@ public class IgniteIntegrateTopics implements IntegrateTopics { ContextHolder.getReceiveContext().getMetricManager().getMetricRegistry().getMetricCounter(CounterType.SUBSCRIBE_EVENT).increment(); integrate.getCluster().listenTopic(topic); mqttChannel.getTopics().add(subscribeTopic); - if (isWildcard(topic)) { + TopicFilterType filterType = TopicFilterType.getType(topic); + if (TopicFilterType.QUEUE == filterType) { + queueSubscribeCache.add(new ImmutableTriple<>(integrate.getCluster().getLocalNodeConsistentId(), null, topic)); + } else if (TopicFilterType.SHARE == filterType) { + shareSubscribeCache.add(new ImmutableTriple<>(integrate.getCluster().getLocalNodeConsistentId(), TopicFilterType.getShareGroupName(topic), topic)); + } else if (isWildcard(topic)) { shareCache.add(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); } } @@ -95,7 +112,12 @@ public class IgniteIntegrateTopics implements IntegrateTopics { private void clearCache(String topic) { integrate.getCluster().stopListenTopic(topic); - if (isWildcard(topic)) { + TopicFilterType filterType = TopicFilterType.getType(topic); + if (TopicFilterType.QUEUE == filterType) { + queueSubscribeCache.remove(new ImmutableTriple<>(integrate.getCluster().getLocalNodeConsistentId(), null, topic)); + } else if (TopicFilterType.SHARE == filterType) { + shareSubscribeCache.remove(new ImmutableTriple<>(integrate.getCluster().getLocalNodeConsistentId(), TopicFilterType.getShareGroupName(topic), topic)); + } else if (isWildcard(topic)) { shareCache.remove(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); } } @@ -136,6 +158,26 @@ public class IgniteIntegrateTopics implements IntegrateTopics { }).filter(tp->topic.matches(TopicRegexUtils.regexTopic(tp))).collect(Collectors.toSet()); } + @Override + public Set> getQueueSubscribeTopics(String topic) { + char ch = topic.charAt(0); + if ('/' == ch) { + topic = topic.substring(1); + } + String shareQueue = String.format("%s%s", TopicFilterType.SHARE_QUEUE_PREFIX, topic); + return queueSubscribeCache.stream().filter(tr->shareQueue.matches(TopicRegexUtils.regexTopic(tr.right))).collect(Collectors.toSet()); + } + + @Override + public Set> getShareSubscribeTopics(String topic) { + char ch = topic.charAt(0); + if ('/' == ch) { + topic = topic.substring(1); + } + String shareGroup = String.format("%s%s", TopicFilterType.SHARE_GROUP_PREFIX, topic); + return shareSubscribeCache.stream().filter(tr->shareGroup.matches(TopicRegexUtils.regexTopic(tr.right.replaceFirst(tr.middle+"\\/", "")))).collect(Collectors.toSet()); + } + @Override public Integrate getIntegrate() { return this.integrate; diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/RandomStrategy.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/RandomStrategy.java new file mode 100644 index 00000000..28ca56a8 --- /dev/null +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/RandomStrategy.java @@ -0,0 +1,20 @@ +package io.github.quickmsg.interate.sharedstrategy; + +import org.apache.commons.lang3.tuple.ImmutableTriple; + +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * @author wuzhong + */ +public class RandomStrategy implements SharedStrategy{ + + @Override + public ImmutableTriple searchSharedSubscribe(Set> shareSubscribes) { + ImmutableTriple[] subscribes = shareSubscribes.toArray(new ImmutableTriple[0]); + Random random = ThreadLocalRandom.current(); + return subscribes[random.nextInt(subscribes.length)]; + } +} diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategy.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategy.java new file mode 100644 index 00000000..a9e054a8 --- /dev/null +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategy.java @@ -0,0 +1,13 @@ +package io.github.quickmsg.interate.sharedstrategy; + +import org.apache.commons.lang3.tuple.ImmutableTriple; + +import java.util.Set; + +/** + * @author wuzhong + */ +public interface SharedStrategy { + + ImmutableTriple searchSharedSubscribe(Set> shareSubscribes); +} diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyFactory.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyFactory.java new file mode 100644 index 00000000..888dfb26 --- /dev/null +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyFactory.java @@ -0,0 +1,17 @@ +package io.github.quickmsg.interate.sharedstrategy; + +/** + * @author wuzhong + */ +public class SharedStrategyFactory { + + private static RandomStrategy randomStrategy = new RandomStrategy(); + + public static SharedStrategy createSharedStrategy(SharedStrategyType type){ + if(SharedStrategyType.random == type){ + return randomStrategy; + }else{ + return randomStrategy; + } + } +} diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyType.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyType.java new file mode 100644 index 00000000..25c473e3 --- /dev/null +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/sharedstrategy/SharedStrategyType.java @@ -0,0 +1,23 @@ +package io.github.quickmsg.interate.sharedstrategy; + +/** + * @author wuzhong + */ +public enum SharedStrategyType { + /** + * 在所有订阅者中随机选择 + */ + random, + /** + * 按照订阅顺序 + */ + round_robin, + /** + * 一直发往上次选取的订阅者 + */ + sticky, + /** + * 按照发布者 ClientID 的哈希值 + */ + hash; +} -- Gitee From 1634a2c5dbf6a94270656b05e8ef6600bcb987a3 Mon Sep 17 00:00:00 2001 From: 17712858268 <419479707@qq.com> Date: Wed, 29 Mar 2023 22:17:22 +0800 Subject: [PATCH 2/2] refactor --- .../integrate/topic/IntegrateTopics.java | 3 +- .../core/protocol/PublishProtocol.java | 7 +-- .../core/protocol/PublishRelProtocol.java | 7 +-- .../interate/IgniteIntegrateTopics.java | 45 ++++++++----------- 4 files changed, 29 insertions(+), 33 deletions(-) diff --git a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java index 3c172a3f..65af2422 100644 --- a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java +++ b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/topic/IntegrateTopics.java @@ -3,6 +3,7 @@ package io.github.quickmsg.common.integrate.topic; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.integrate.IntegrateGetter; import io.github.quickmsg.common.integrate.SubscribeTopic; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutableTriple; import java.util.List; @@ -65,7 +66,7 @@ public interface IntegrateTopics extends IntegrateGetter { = */ boolean isWildcard(String topic); - Set getWildcardTopics(String topic); + Set> getWildcardTopics(String topic); Set> getQueueSubscribeTopics(String topic); diff --git a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java index d803246c..6f43fd42 100644 --- a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java +++ b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java @@ -21,6 +21,7 @@ import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MqttMessageUtils; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.ImmutablePair; import reactor.core.publisher.Mono; import reactor.util.context.ContextView; @@ -66,11 +67,11 @@ public class PublishProtocol implements Protocol { ClusterMessage clusterMessage = new ClusterMessage(message); integrateCluster.sendCluster(clusterMessage.getTopic(), clusterMessage); integrateCluster.sendSharedSubscribe(clusterMessage.getTopic(), clusterMessage); - Set wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic()); + Set> wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic()); if (wildcardTopics != null && wildcardTopics.size() > 0) { wildcardTopics.forEach(tp -> { - clusterMessage.setTopic(tp); - integrateCluster.sendCluster(tp, clusterMessage); + clusterMessage.setTopic(tp.right); + integrateCluster.sendCluster(tp.right, clusterMessage); }); } diff --git a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java index 6584e565..a8017e61 100644 --- a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java +++ b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java @@ -15,6 +15,7 @@ import io.github.quickmsg.common.metric.CounterType; import io.github.quickmsg.common.protocol.Protocol; import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MqttMessageUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import reactor.util.context.ContextView; import java.util.Set; @@ -39,11 +40,11 @@ public class PublishRelProtocol implements Protocol { integrateCluster.sendCluster(clusterMessage.getTopic(), clusterMessage); IntegrateTopics topics=receiveContext.getIntegrate().getTopics(); if(topics.isWildcard(clusterMessage.getTopic())){ - Set wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic()); + Set> wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic()); if (wildcardTopics != null && wildcardTopics.size() > 0) { wildcardTopics.forEach(tp -> { - clusterMessage.setTopic(tp); - integrateCluster.sendCluster(tp, clusterMessage); + clusterMessage.setTopic(tp.right); + integrateCluster.sendCluster(tp.right, clusterMessage); }); } } diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java index abae326f..5680580f 100644 --- a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java @@ -1,5 +1,19 @@ package io.github.quickmsg.interate; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.ignite.IgniteSet; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CollectionConfiguration; + import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.context.ContextHolder; import io.github.quickmsg.common.integrate.Integrate; @@ -10,18 +24,6 @@ import io.github.quickmsg.common.metric.CounterType; import io.github.quickmsg.common.utils.TopicRegexUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.ignite.IgniteSet; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.configuration.CollectionConfiguration; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.stream.Collectors; /** * @author luxurong @@ -31,7 +33,7 @@ public class IgniteIntegrateTopics implements IntegrateTopics { private final IgniteIntegrate integrate; - private final IgniteSet shareCache; + private final IgniteSet> shareCache; @Getter private final Map> topicSubscribers; @@ -84,7 +86,7 @@ public class IgniteIntegrateTopics implements IntegrateTopics { } else if (TopicFilterType.SHARE == filterType) { shareSubscribeCache.add(new ImmutableTriple<>(integrate.getCluster().getLocalNodeConsistentId(), TopicFilterType.getShareGroupName(topic), topic)); } else if (isWildcard(topic)) { - shareCache.add(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); + shareCache.add(new ImmutablePair<>(integrate.getCluster().getLocalNodeConsistentId(), topic)); } } } @@ -118,7 +120,7 @@ public class IgniteIntegrateTopics implements IntegrateTopics { } else if (TopicFilterType.SHARE == filterType) { shareSubscribeCache.remove(new ImmutableTriple<>(integrate.getCluster().getLocalNodeConsistentId(), TopicFilterType.getShareGroupName(topic), topic)); } else if (isWildcard(topic)) { - shareCache.remove(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); + shareCache.remove(new ImmutablePair<>(integrate.getCluster().getLocalNodeConsistentId(), topic)); } } @@ -145,17 +147,8 @@ public class IgniteIntegrateTopics implements IntegrateTopics { } @Override - public Set getWildcardTopics(String topic) { - Set nodeIds = integrate.getCluster().getClusterNode(); - return shareCache.stream().map(tp -> { - for (String id : nodeIds) { - if(tp.contains(id)){ - tp = tp.replaceFirst(id, ""); - break; - } - } - return tp; - }).filter(tp->topic.matches(TopicRegexUtils.regexTopic(tp))).collect(Collectors.toSet()); + public Set> getWildcardTopics(String topic) { + return shareCache.stream().filter(tp->topic.matches(TopicRegexUtils.regexTopic(tp.right))).collect(Collectors.toSet()); } @Override -- Gitee