# ReliableMessageService **Repository Path**: yingjie_rong/ByteRMS ## Basic Information - **Project Name**: ReliableMessageService - **Description**: 可靠消息最终一致性分布式方案通用框架 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 4 - **Forks**: 4 - **Created**: 2018-09-26 - **Last Updated**: 2024-10-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ByteRMS #### 项目介绍 可靠消息最终一致性分布式方案通用框架 #### 软件架构 软件架构说明 1. 采用spring-cloud 2 ![输入图片说明](https://images.gitee.com/uploads/images/2018/0929/180826_0e97d69f_414803.png "可靠消息最终一致性方案.png") #### 使用说明 2. 引入supports-springcloud模块 ```xml supports-springcloud cn.gxufe.reliable.message 1.0.0 ``` *创建消息表(上游服务和下游服务都需要创建) ```sql CREATE TABLE `reliable_message` ( `key` varchar(100) NOT NULL, `header` varchar(100) NOT NULL, `data` varchar(1000) NOT NULL, `status` int(11) NOT NULL, `id` bigint(20) DEFAULT NULL, `other` varchar(1000) NOT NULL DEFAULT '{}' COMMENT '{}', PRIMARY KEY (`key`) ) ``` * api 接口 ```java public interface RMSApi { /** * prepare * @param message * @return */ @RequestMapping(value = "/message/prepare",method = RequestMethod.PUT) @Transactional(rollbackFor = Throwable.class,propagation = Propagation.MANDATORY) ResultVo prepare(@RequestBody Message message); /** * finish * @param message * @return */ @RequestMapping(value = "/message/finish",method = RequestMethod.POST ) @Transactional(rollbackFor = Throwable.class,propagation = Propagation.MANDATORY) ResultVo finish(@RequestBody Message message); /** * cancel * @param message * @return */ @RequestMapping(value = "/message/cancel",method = RequestMethod.POST) @Transactional(rollbackFor = Throwable.class,propagation = Propagation.MANDATORY) ResultVo cancel(@RequestBody Message message); } ``` 2.1、作为生产者(上游服务) * 启动类 @Import(RMSApiConfigForProducer.class) * 添加 RMSClient(feign client)接口 ```java @FeignClient(name = "reliableMessageService",configuration = FeignConfig.class) @Service public interface RMSClient extends RMSApi { } ``` * 实现 MessageCallBackInterface,完成回调功能 ```java @Service public class MessageCallBackInterfaceImpl implements MessageCallBackInterface , CompensableMessageAware { private CompensableMessage compensableMessage; private static final Logger logger = LoggerFactory.getLogger(MessageCallBackInterfaceImpl.class); /** * 消费者(下游服务)执行成功后回调此方法 * @return */ @Override public ResultVo finish(Long messageId, String key, String header,String finishData) { logger.info(""); Message message = compensableMessage.getMessage(); // 获得 prepare 阶段 compensableMessage 写入的数据(这些数据不会投递出去) System.out.println(message + ", name = " + compensableMessage.getValue("name")); return ResultVo.ok(); } /** *消费者(下游服务)执行失败后回调此方法 * @return */ @Override public ResultVo cancel(Long messageId, String key, String header,String cancelData) { System.out.println("cancel : messageId = " + messageId + "finishData = " + cancelData); return ResultVo.ok(); } @Override public void setCompensableMessage(CompensableMessage compensableMessage) { this.compensableMessage = compensableMessage; } } ``` * 测试用例( 调用/test 接口 开启事务 ) ```java @RestController @RequestMapping("/") public class TestController implements CompensableMessageAware { private CompensableMessage compensableMessage; @Autowired private RMSClient reliableMessageClient; @RequestMapping("/test") @Transactional(rollbackFor = Throwable.class) public String test(@RequestParam("name") String name){ Message message = new Message(); // key 必须全局位置,且手动生成 message.setKey("test_key_"+ UUID.randomUUID().toString().replace("-","")); // header 就是消息类型,根据这个字段,分发数据 message.setHeader("header01"); message.setData("{TestController}"); // 当前事务所绑定的数据,但是不会投递给下游服务 compensableMessage.set("name",name); reliableMessageClient.prepare(message); return "ok"; } @Override public void setCompensableMessage(CompensableMessage compensableMessage) { this.compensableMessage = compensableMessage; } } ``` 2.2、作为消费者(下游服务) * 启动类配置: @Import(RMSApiConfigForConsumer.class) * 添加 RMSClient(feign client)接口 ```java @FeignClient(name = "reliableMessageService",configuration = FeignConfig.class) @Service public interface RMSClient extends RMSApi { } ``` * 实现数据处理: MqMessageService ```java @Service public class HandleMessageServiceImpl implements MqMessageService { @Override public Message handle(Message message) { // 业务处理 doSomething(); // 确认消息处理成功或者失败 // 有报错,直接抛出去( 重新投递 ),如果这里成功执行,返回finish 就会回调上finish, ; cancel 回调 上游 cancel message.setStatus(Message.STATUS_FINISH); // 回调上游,需要传递的数据 message.setData("{HandleMessageServiceImpl}"); return message; } } ``` * 监听队列,实现 ByteRMSConsumer ```java @Component public class KafkaRMSConsumer implements ByteRMSConsumer { private Gson gson = new Gson(); @Autowired private ProcessMqMessage handleMqMessage; @Override @KafkaListener(topics = "rmsKafkaQueue",groupId = "rms_kafka_queue") public void listen(String data) { Message message = gson.fromJson(data, Message.class); // 消息往 handleMqMessage 投递即可,kafka, rabbitmq 都可以(跟可靠消息服务,确保一致) handleMqMessage.handle(message); } } ``` 2.3、一个服务,可以生成者,也可以是消费者,按照事务的概念是划分的,不是绝对的 3、可靠消息服务(admin) * 创建消息表 ```sql CREATE TABLE `message` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `key` varchar(400) NOT NULL, `header` varchar(40) DEFAULT NULL, `source_service` varchar(40) DEFAULT NULL, `target_service` varchar(40) DEFAULT NULL, `finish_data` varchar(1000) DEFAULT '{}', `data` varchar(1000) DEFAULT '{}', `prepare_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `confirm_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `send_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `finish_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `cancel_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `try_send` int(10) NOT NULL DEFAULT '0', `status` int(11) NOT NULL, `try_confirm` int(10) NOT NULL, `try_finish` int(10) NOT NULL DEFAULT '0', `is_done` int(2) NOT NULL DEFAULT '0', PRIMARY KEY (`id`,`key`) ) ENGINE=InnoDB AUTO_INCREMENT=63 DEFAULT CHARSET=utf8 ``` * 配置mq, 数据库等