# message-notification
**Repository Path**: ACENG/message-notification
## Basic Information
- **Project Name**: message-notification
- **Description**: 基于netty的websocket消息组件
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 2
- **Forks**: 1
- **Created**: 2022-08-18
- **Last Updated**: 2025-08-04
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# message-notification
消息推送小组件,常用于“我的消息”功能。该组件依赖消息队列中间间实现消息的生产与监听消费,同时借助netty框架搭建websocket服务,实现消息的实时推送。该组件包含以下特性:
- 支持rabbitmq、rocketmq、kafka三种消息队列的切换
- 支持websocket心跳检测
- 支持消息历史记录、已读记录查询
- 支持消息订阅主题的自定义配置
- 支持消息幂等处理
## 目录 contents
[1. 开发指南](#1开发指南)
[2. 概要设计](#2概要设计)
[3. 详细设计](#3详细设计)
[4. 前后端接口文档](#4前后端接口文档)
[5. 附录](#5附录)
# 1.开发指南
请参考demo module
## 1.1 环境依赖
- mysql(或其它关系型数据库)
- 消息队列mq(rabbitmq|rocketmq|kafka)
- maven3
- jdk1.8
## 1.2 集成方式
### 1.2.1独立启动
该项目支持单独启动,[启动方式](##14 启动方式)
### 1.2.2项目集成
要使用maven,请将此依赖项添加到pom.xml中:
```
* *
* * @author WECENG * @since 2021/2/1 17:57 */ @SpringBootTest(classes = SpringBootServerApplication.class) public class MessageProducerServiceImplTest { @Autowired private MessageProducerService messageProducerService; @Test public void sendMessage() { MessageDTO messageDTO = new MessageDTO(); messageDTO.setMessageId("123456111111"); messageDTO.setTopic("789"); messageDTO.setBroadcast(false); Map* channel的初始化类 *
* * @author WECENG * @since 2020/7/25 17:05 */ @Component("nettyWebsocketChannelInitializer") public class NettyWebsocketChannelInitializer extends ChannelInitializer* websocket处理器 *
* * @author WECENG * @since 2020/7/25 17:16 */ @Component("nettWebsocketHandler") @ChannelHandler.Sharable public class NettyWebsocketHandler extends SimpleChannelInboundHandler* channel管理器,维护channel信息 *
* * @author WECENG * @since 2020/7/25 17:56 */ @Component("channelManager") public class ChannelManager { /** * Channel组 */ private final ChannelGroup broadcastGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * Channel的Map */ private final ConcurrentMap* The identifier is generated from various sources listed in the following: *
* The global uniqueness of the generated identifier mostly depends on the MAC address and the current process ID, * which are auto-detected at the class-loading time in best-effort manner. If all attempts to acquire them fail, * a warning message is logged, and random values will be used instead. Alternatively, you can specify them manually * via system properties: *
* 消息接收切面类 *
* * @author WECENG * @since 2020/12/28 10:48 */ @Component @Aspect public class MessageReceiverAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private MessageLogService messageLogService; @Autowired private ChannelManager channelManager; @Pointcut("execution(* org.message.server.receiver.DefaultMessageReceiver.receive(..))") public void pointcut(){} @Before("pointcut()") public void before(JoinPoint joinPoint){ MessageRequest messageDTO = (MessageRequest)joinPoint.getArgs()[0]; MessageLogDO messageLogDO = new MessageLogDO(); messageLogDO.setMessageId(messageDTO.getMessageId()); messageLogDO.setPayload(messageDTO.getPayload()); messageLogDO.setBroadcast(messageDTO.isBroadcast()); messageLogDO.setTopic(messageDTO.getTopic()); messageLogDO.setCreateTime(new Date()); messageLogDO.setUpdateTime(new Date()); messageLogService.doCreateMessageLog(messageLogDO); } @After("pointcut()") public void after(JoinPoint joinPoint){ MessageRequest messageDTO = (MessageRequest)joinPoint.getArgs()[0]; MessageLogResponse messageLogRespDTO = new MessageLogResponse(); messageLogRespDTO.setMessageId(messageDTO.getMessageId()); messageLogRespDTO.setPayload(messageDTO.getPayload()); messageLogRespDTO.setBroadcast(messageDTO.isBroadcast()); messageLogRespDTO.setTopic(messageDTO.getTopic()); messageLogRespDTO.setStatus(MesLogStatusEnum.UNREAD); messageLogRespDTO.setCreateTime(new Date()); String messageLogJsonStr = JSONObject.toJSONString(messageLogRespDTO, new TsieSerializeConfig()); if (messageDTO.isBroadcast()) { TextWebSocketFrame webSocketFrame = new TextWebSocketFrame(messageLogJsonStr); channelManager.broadcast(webSocketFrame); } else { //abandon error message if (Objects.isNull(messageDTO.getTopic())) { logger.error("topic is required when the value of broadcast is false, abandon the message:{}", messageDTO.getMessageId()); }else { WebsocketChannel websocketChannel = channelManager.getWebsocketChannelList().stream().filter(item -> messageDTO.getTopic().equals(item.getTopic())).findAny().orElse(null); //online if (Objects.nonNull(websocketChannel)) { websocketChannel.getChannelSet().forEach(item -> { Channel channel = channelManager.findChannel(item); if (Objects.nonNull(channel)){ TextWebSocketFrame webSocketFrame = new TextWebSocketFrame(messageLogJsonStr); channel.writeAndFlush(webSocketFrame); } }); } } } } } ``` after()方法中,通过消息的主题topic从channelManager管理的WebsocketChannel集合中找到匹配topic的所有websocket channel,经websocket channel将消息推送至前端服务。