diff --git a/ballcat-common/ballcat-common-websocket/pom.xml b/ballcat-common/ballcat-common-websocket/pom.xml
index 7a4ef783f251399b292b08d14a334b81888a5548..ae4b76135d40e9299fa68afedc3787e8acf7d45f 100644
--- a/ballcat-common/ballcat-common-websocket/pom.xml
+++ b/ballcat-common/ballcat-common-websocket/pom.xml
@@ -35,6 +35,11 @@
jakarta.annotation
jakarta.annotation-api
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ true
+
-
\ No newline at end of file
+
diff --git a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java
new file mode 100644
index 0000000000000000000000000000000000000000..b30fa475f76c1db5efad2c9709176a3415607e3a
--- /dev/null
+++ b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java
@@ -0,0 +1,76 @@
+package com.hccake.ballcat.common.websocket.distribute;
+
+import com.hccake.ballcat.common.util.JsonUtils;
+import com.hccake.ballcat.common.websocket.exception.ErrorJsonMessageException;
+import com.hccake.ballcat.common.websocket.session.WebSocketSessionStore;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Value;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @ClassName RocketmqMessageDistributor.java
+ * @Author liu_yx
+ * @Version 1.0.0
+ * @Description MQ发送消息,接收到消息时进行推送, 广播模式
+ * @CreateTime 2022年06月30日 14:10:10
+ */
+@Slf4j
+@RocketMQMessageListener(
+ consumerGroup = "${spring.application.name:default-ballcat-application}-${spring.profiles.active:dev}",
+ topic = "${spring.application.name:default-ballcat-application}-${spring.profiles.active:dev}",
+ selectorExpression = "${ballcat.websocket.mq.tag}", messageModel = MessageModel.BROADCASTING)
+public class RocketmqMessageDistributor extends AbstractMessageDistributor implements RocketMQListener {
+
+ @Value("${spring.application.name}")
+ private String appName;
+
+ @Value("${ballcat.websocket.mq.tag}")
+ private String tag;
+
+ private final RocketMQTemplate template;
+
+ public RocketmqMessageDistributor(WebSocketSessionStore webSocketSessionStore, RocketMQTemplate template) {
+ super(webSocketSessionStore);
+ this.template = template;
+ }
+
+ /**
+ * 消息分发
+ * @param messageDO 发送的消息
+ */
+ @Override
+ public void distribute(MessageDO messageDO) {
+ log.info("the send message body is [{}]", messageDO);
+ String destination = this.appName + ":" + this.tag;
+ SendResult sendResult = this.template.sendAndReceive(destination, JsonUtils.toJson(messageDO),
+ SendResult.class);
+ if (log.isDebugEnabled()) {
+ log.debug("send message to `{}` finished. result:{}", destination, sendResult);
+ }
+ }
+
+ /**
+ * 消息消费
+ * @param message 接收的消息
+ */
+ @Override
+ public void onMessage(MessageExt message) {
+ String body = new String(message.getBody(), StandardCharsets.UTF_8);
+ MessageDO event = JsonUtils.toObj(body, MessageDO.class);
+ log.info("the content is [{}]", event);
+ try {
+ this.doSend(event);
+ } catch (Exception e) {
+ log.error("MQ消费信息处理异常: {}", e.getMessage(), e);
+ throw new ErrorJsonMessageException("MQ消费信息处理异常, " + e.getMessage());
+ }
+ }
+
+}
diff --git a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java
index 5d2b20afba478add14b1fd8dd17b42963d646c96..8f80a951ed3bc6421c12d1a1e6ba88aef165f7a9 100644
--- a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java
+++ b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -81,7 +82,12 @@ public class DefaultWebSocketSessionStore implements WebSocketSessionStore {
*/
@Override
public Collection getSessions(Object sessionKey) {
- return sessionKeyToWsSessions.get(sessionKey).values();
+ Map sessions = this.sessionKeyToWsSessions.get(sessionKey);
+ if (sessions != null) {
+ return this.sessionKeyToWsSessions.get(sessionKey).values();
+ }
+ log.warn("根据指定的sessionKey: {} 获取对应的wsSessions为空!", sessionKey);
+ return Collections.emptyList();
}
/**
diff --git a/ballcat-dependencies/pom.xml b/ballcat-dependencies/pom.xml
index 2c8f6268fc5a53bde9b336ec4ae0548228706cf2..611726e69d404a9d2a522d1f88db889d1a9ee038 100644
--- a/ballcat-dependencies/pom.xml
+++ b/ballcat-dependencies/pom.xml
@@ -63,6 +63,7 @@
0.4.2
4.11.28.ALL
2.17.154
+ 2.2.0
@@ -588,6 +589,11 @@
nimbus-jose-jwt
9.15.2
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ ${rocketmq.version}
+
diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java
index 7236f80cf14511b9413bf9ac8d614b6bdb3e6f95..0614af3d9e2b85e771e8244f1350dc09295dcef3 100644
--- a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java
+++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java
@@ -20,6 +20,11 @@ public final class MessageDistributorTypeConstants {
*/
public static final String REDIS = "redis";
+ /**
+ * 基于 rocketmq 广播
+ */
+ public static final String ROCKETMQ = "rocketmq";
+
/**
* 自定义
*/
diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java
index 8cfc33ee501f211cc6bdd9b2c70daa63ba44cd8b..a8fca9f22bb5d2978f34bc885d29d32563a86611 100644
--- a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java
+++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java
@@ -1,6 +1,7 @@
package com.hccake.ballcat.autoconfigure.websocket;
import com.hccake.ballcat.autoconfigure.websocket.config.LocalMessageDistributorConfig;
+import com.hccake.ballcat.autoconfigure.websocket.config.RocketMqMessageDistributorConfig;
import com.hccake.ballcat.autoconfigure.websocket.config.RedisMessageDistributorConfig;
import com.hccake.ballcat.autoconfigure.websocket.config.WebSocketHandlerConfig;
import com.hccake.ballcat.common.websocket.handler.JsonMessageHandler;
@@ -30,7 +31,7 @@ import java.util.List;
* @author Yakir Hccake
*/
@AutoConfiguration
-@Import({ WebSocketHandlerConfig.class, LocalMessageDistributorConfig.class, RedisMessageDistributorConfig.class })
+@Import({ WebSocketHandlerConfig.class, LocalMessageDistributorConfig.class, RedisMessageDistributorConfig.class, RocketMqMessageDistributorConfig.class})
@EnableWebSocket
@RequiredArgsConstructor
@EnableConfigurationProperties(WebSocketProperties.class)
diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..5042d2a2a187b03e29d546c8a342516f1e4df325
--- /dev/null
+++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java
@@ -0,0 +1,36 @@
+package com.hccake.ballcat.autoconfigure.websocket.config;
+
+import com.hccake.ballcat.autoconfigure.websocket.MessageDistributorTypeConstants;
+import com.hccake.ballcat.autoconfigure.websocket.WebSocketProperties;
+import com.hccake.ballcat.common.websocket.distribute.MessageDistributor;
+import com.hccake.ballcat.common.websocket.distribute.RocketmqMessageDistributor;
+import com.hccake.ballcat.common.websocket.session.WebSocketSessionStore;
+import lombok.RequiredArgsConstructor;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @ClassName RocketMqMessageDistributorConfig.java
+ * @Author liu_yx
+ * @Version 1.0.0
+ * @Description MQ的消息分发器配置
+ * @CreateTime 2022年06月30日 14:11:34
+ */
+@ConditionalOnProperty(prefix = WebSocketProperties.PREFIX, name = "message-distributor",
+ havingValue = MessageDistributorTypeConstants.ROCKETMQ, matchIfMissing = true)
+@Configuration(proxyBeanMethods = false)
+@RequiredArgsConstructor
+public class RocketMqMessageDistributorConfig {
+
+ private final WebSocketSessionStore webSocketSessionStore;
+
+ @Bean
+ @ConditionalOnMissingBean(MessageDistributor.class)
+ public RocketmqMessageDistributor messageDistributor(RocketMQTemplate template) {
+ return new RocketmqMessageDistributor(webSocketSessionStore, template);
+ }
+
+}