diff --git a/ballcat-common/ballcat-common-websocket/pom.xml b/ballcat-common/ballcat-common-websocket/pom.xml index 7a4ef783f251399b292b08d14a334b81888a5548..ae4b76135d40e9299fa68afedc3787e8acf7d45f 100644 --- a/ballcat-common/ballcat-common-websocket/pom.xml +++ b/ballcat-common/ballcat-common-websocket/pom.xml @@ -35,6 +35,11 @@ jakarta.annotation jakarta.annotation-api + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + - \ No newline at end of file + diff --git a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java new file mode 100644 index 0000000000000000000000000000000000000000..b30fa475f76c1db5efad2c9709176a3415607e3a --- /dev/null +++ b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java @@ -0,0 +1,76 @@ +package com.hccake.ballcat.common.websocket.distribute; + +import com.hccake.ballcat.common.util.JsonUtils; +import com.hccake.ballcat.common.websocket.exception.ErrorJsonMessageException; +import com.hccake.ballcat.common.websocket.session.WebSocketSessionStore; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; + +import java.nio.charset.StandardCharsets; + +/** + * @ClassName RocketmqMessageDistributor.java + * @Author liu_yx + * @Version 1.0.0 + * @Description MQ发送消息,接收到消息时进行推送, 广播模式 + * @CreateTime 2022年06月30日 14:10:10 + */ +@Slf4j +@RocketMQMessageListener( + consumerGroup = "${spring.application.name:default-ballcat-application}-${spring.profiles.active:dev}", + topic = "${spring.application.name:default-ballcat-application}-${spring.profiles.active:dev}", + selectorExpression = "${ballcat.websocket.mq.tag}", messageModel = MessageModel.BROADCASTING) +public class RocketmqMessageDistributor extends AbstractMessageDistributor implements RocketMQListener { + + @Value("${spring.application.name}") + private String appName; + + @Value("${ballcat.websocket.mq.tag}") + private String tag; + + private final RocketMQTemplate template; + + public RocketmqMessageDistributor(WebSocketSessionStore webSocketSessionStore, RocketMQTemplate template) { + super(webSocketSessionStore); + this.template = template; + } + + /** + * 消息分发 + * @param messageDO 发送的消息 + */ + @Override + public void distribute(MessageDO messageDO) { + log.info("the send message body is [{}]", messageDO); + String destination = this.appName + ":" + this.tag; + SendResult sendResult = this.template.sendAndReceive(destination, JsonUtils.toJson(messageDO), + SendResult.class); + if (log.isDebugEnabled()) { + log.debug("send message to `{}` finished. result:{}", destination, sendResult); + } + } + + /** + * 消息消费 + * @param message 接收的消息 + */ + @Override + public void onMessage(MessageExt message) { + String body = new String(message.getBody(), StandardCharsets.UTF_8); + MessageDO event = JsonUtils.toObj(body, MessageDO.class); + log.info("the content is [{}]", event); + try { + this.doSend(event); + } catch (Exception e) { + log.error("MQ消费信息处理异常: {}", e.getMessage(), e); + throw new ErrorJsonMessageException("MQ消费信息处理异常, " + e.getMessage()); + } + } + +} diff --git a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java index 5d2b20afba478add14b1fd8dd17b42963d646c96..8f80a951ed3bc6421c12d1a1e6ba88aef165f7a9 100644 --- a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java +++ b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -81,7 +82,12 @@ public class DefaultWebSocketSessionStore implements WebSocketSessionStore { */ @Override public Collection getSessions(Object sessionKey) { - return sessionKeyToWsSessions.get(sessionKey).values(); + Map sessions = this.sessionKeyToWsSessions.get(sessionKey); + if (sessions != null) { + return this.sessionKeyToWsSessions.get(sessionKey).values(); + } + log.warn("根据指定的sessionKey: {} 获取对应的wsSessions为空!", sessionKey); + return Collections.emptyList(); } /** diff --git a/ballcat-dependencies/pom.xml b/ballcat-dependencies/pom.xml index 2c8f6268fc5a53bde9b336ec4ae0548228706cf2..611726e69d404a9d2a522d1f88db889d1a9ee038 100644 --- a/ballcat-dependencies/pom.xml +++ b/ballcat-dependencies/pom.xml @@ -63,6 +63,7 @@ 0.4.2 4.11.28.ALL 2.17.154 + 2.2.0 @@ -588,6 +589,11 @@ nimbus-jose-jwt 9.15.2 + + org.apache.rocketmq + rocketmq-spring-boot-starter + ${rocketmq.version} + diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java index 7236f80cf14511b9413bf9ac8d614b6bdb3e6f95..0614af3d9e2b85e771e8244f1350dc09295dcef3 100644 --- a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java +++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java @@ -20,6 +20,11 @@ public final class MessageDistributorTypeConstants { */ public static final String REDIS = "redis"; + /** + * 基于 rocketmq 广播 + */ + public static final String ROCKETMQ = "rocketmq"; + /** * 自定义 */ diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java index 8cfc33ee501f211cc6bdd9b2c70daa63ba44cd8b..a8fca9f22bb5d2978f34bc885d29d32563a86611 100644 --- a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java +++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java @@ -1,6 +1,7 @@ package com.hccake.ballcat.autoconfigure.websocket; import com.hccake.ballcat.autoconfigure.websocket.config.LocalMessageDistributorConfig; +import com.hccake.ballcat.autoconfigure.websocket.config.RocketMqMessageDistributorConfig; import com.hccake.ballcat.autoconfigure.websocket.config.RedisMessageDistributorConfig; import com.hccake.ballcat.autoconfigure.websocket.config.WebSocketHandlerConfig; import com.hccake.ballcat.common.websocket.handler.JsonMessageHandler; @@ -30,7 +31,7 @@ import java.util.List; * @author Yakir Hccake */ @AutoConfiguration -@Import({ WebSocketHandlerConfig.class, LocalMessageDistributorConfig.class, RedisMessageDistributorConfig.class }) +@Import({ WebSocketHandlerConfig.class, LocalMessageDistributorConfig.class, RedisMessageDistributorConfig.class, RocketMqMessageDistributorConfig.class}) @EnableWebSocket @RequiredArgsConstructor @EnableConfigurationProperties(WebSocketProperties.class) diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..5042d2a2a187b03e29d546c8a342516f1e4df325 --- /dev/null +++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java @@ -0,0 +1,36 @@ +package com.hccake.ballcat.autoconfigure.websocket.config; + +import com.hccake.ballcat.autoconfigure.websocket.MessageDistributorTypeConstants; +import com.hccake.ballcat.autoconfigure.websocket.WebSocketProperties; +import com.hccake.ballcat.common.websocket.distribute.MessageDistributor; +import com.hccake.ballcat.common.websocket.distribute.RocketmqMessageDistributor; +import com.hccake.ballcat.common.websocket.session.WebSocketSessionStore; +import lombok.RequiredArgsConstructor; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @ClassName RocketMqMessageDistributorConfig.java + * @Author liu_yx + * @Version 1.0.0 + * @Description MQ的消息分发器配置 + * @CreateTime 2022年06月30日 14:11:34 + */ +@ConditionalOnProperty(prefix = WebSocketProperties.PREFIX, name = "message-distributor", + havingValue = MessageDistributorTypeConstants.ROCKETMQ, matchIfMissing = true) +@Configuration(proxyBeanMethods = false) +@RequiredArgsConstructor +public class RocketMqMessageDistributorConfig { + + private final WebSocketSessionStore webSocketSessionStore; + + @Bean + @ConditionalOnMissingBean(MessageDistributor.class) + public RocketmqMessageDistributor messageDistributor(RocketMQTemplate template) { + return new RocketmqMessageDistributor(webSocketSessionStore, template); + } + +}