# reliable-rabbit **Repository Path**: git.ganjian.net/reliable-rabbit ## Basic Information - **Project Name**: reliable-rabbit - **Description**: 实现RabbitMQ发送和消费可靠的组件。支持消息的可靠发送、幂等消费、顺序消费和可靠消费 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 3 - **Created**: 2021-09-10 - **Last Updated**: 2021-09-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # reliable-rabbit ### 背景 1、RabbitMQ本身有事务和发送确认机制,两种方式来保证消息发送的可靠性。但是这两者都没办法解决这个情况:业务数据修改成功,发送mq的时候服务异常或网络异常,mq没有发出去。 2、RabbitMQ对MQ的顺序保障做的并不好,得只有一个消费者,且不遇到消费失败的情况,才能保证消息的顺序。 上面这两种情况,RocketMQ都能解决,但是一个公司在使用了RabbitMQ之后,就不大可能更换成RocketMQ了。 所以本项目主要是对RabbitMQ做一些增强,一定程度上解决两个问题。 ### 简介 实现RabbitMQ发送和消费可靠的组件。支持消息的可靠发送、幂等消费、顺序消费和可靠消费。 ### 可靠发送 #### 方案 要让mq消息一定被发送出去,可以在本地业务库创建一张mq发送表,利用mysql的事务来实现,方案大致如下: 1. 开启事务 2. 修改本地业务数据 3. 事务提交之前,保存下要发送的mq消息,状态为“发送中” 4. 事务提交之后,调用RabbitTemplate.send方法发送mq消息 5. 利用rabbitMq的确认机制,添加消息发送回调监听:成功到达exchange,删除步骤1中保存的mq消息;到达exchange失败,发送失败次数+1,修改消息状态(到达最大重试次数,状态改为“失败”) 6. 进行重试,查询状态为“发送中”的mq消息,再重新调用RabbitTemplate.send方法发送mq消息 **消息发送失败后,就会一直重复步骤5、6,直到mq消息被成功发送到exchange或者到达最大重试次数** #### 流程图 ![可靠发送流程图](https://img-blog.csdnimg.cn/20210205100305307.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ppZG9uZ3hpYW5neGk=,size_16,color_FFFFFF,t_70#pic_center) #### 数据库表 ```sql CREATE TABLE `producer` ( `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '自增主键', `type` TINYINT(3) NOT NULL DEFAULT 0 COMMENT '消息类型,0=即时消息,1=顺序消息', `application` VARCHAR(50) NOT NULL COMMENT '发送消息的应用名称', `host` VARCHAR(255) DEFAULT NULL COMMENT 'rabbit主机地址', `port` INT(10) DEFAULT NULL COMMENT 'rabbit主机端口', `virtual_host` VARCHAR(255) NOT NULL COMMENT '虚拟主机', `exchange` VARCHAR(255) NULL COMMENT '交换器', `routing_key` VARCHAR(255) NULL COMMENT '路由key', `message_id` VARCHAR(50) NOT NULL COMMENT '消息id', `message` BLOB DEFAULT NULL COMMONT '整个Message的序列化内容', `send_status` TINYINT(3) NOT NULL DEFAULT 0 COMMENT '发送状态, 0=预提交,1=发送中,2=发送失败', `retry_times` SMALLINT(6) NOT NULL DEFAULT 0 COMMENT '重试次数', `max_retry_times` SMALLINT(6) NOT NULL COMMENT '最大重试次数', `next_retry_time` DATETIME DEFAULT NULL COMMENT '下一次重试时间', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), UNIQUE KEY `idx_message_app` (`message_id`, `application`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COMMENT = 'rabbitMQ发送表'; CREATE TABLE `producer_sequence` ( `id` BIGINT(20) UNSIGNED NOT NULL COMMENT '消息id,来自producer表', `application` VARCHAR(50) NOT NULL COMMENT '发送消息的应用名称', `group_name` VARCHAR(50) NOT NULL COMMENT '消息分组,同分组内,消息序号按发送顺序递增', `message_id` VARCHAR(50) NOT NULL COMMENT '消息id', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), UNIQUE KEY `idx_message_app` (`message_id`, `application`), KEY `idx_app_group` (`application`, `group_name`), KEY `idx_create_time` (`create_time`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COMMENT = 'rabbitMQ顺序发送记录表'; ``` ### 幂等消费 开启可靠发送,可能会导致mq的重复发送。像RabbitMQ集群里有节点负载高的时候,发送端可能会收不到confirm消息,然后将mq再次发送。所以消费端需要做幂等处理。 #### 方案 1. 接收到mq消息后,根据消息id查询是否已经消费过该消息。如果消费过该消息,转步骤5;如果没有消费过,转步骤2 2. 保存mq消费记录,保存成功,转步骤3,保存失败,转步骤5(多消费者的情况下,保证mq消息也只被消费一次!!!) 3. 执行正常的mq消费逻辑,消费成功,转步骤5,消费失败,转步骤4 4. 消费失败,删除mq消费记录,nack消息并requeue,结束(确保mq消息下次到达时,可以被消费) 5. 消费成功,ack确认mq消息,结束 #### 流程图 ![幂等消费流程](https://img-blog.csdnimg.cn/2021020510094882.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ppZG9uZ3hpYW5neGk=,size_16,color_FFFFFF,t_70#pic_center) #### 数据库表 ```sql CREATE TABLE `consume_record` ( `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '自增主键', `message_id` VARCHAR(50) NOT NULL COMMENT '消息id', `produce_application` VARCHAR(100) NOT NULL COMMENT '发送消息的应用名称', `consume_application` VARCHAR(100) NOT NULL COMMENT '消费消息的应用名称', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), UNIQUE KEY `idx_message_app` (`message_id`, `consume_application`, `produce_application`), KEY `idx_create_time` (`create_time`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COMMENT = '消息消费记录表'; ``` ### 顺序消费 #### 方案 要实现顺序消息,需要依赖消费记录,必须知道自己消费到了哪里,才能判断接收到的mq能不能处理。关于消费记录的,需要依赖幂等消费,幂等消费会保存每一次消费记录。 发送顺序mq消息的时候,查询该消息在同个分组中的上一个消息的消息id,并设置到mq消息头中(如果是分组内的第一个消息,查询结果为空,不设置消息头)。 消费者接收到mq消息时,尝试从消息头中获取“同分组内上一个消息的消息id”。 如果获取不到“同分组内上一个消息的消息id”,则当普通mq消息处理,结束顺序消息检查。 如果获取到“同分组内上一个消息的消息id”,则查询消费记录,看是否消费过该消息id的消息。消费过,则允许进入mq消息处理逻辑,并插入消费记录(插入消费记录,是幂等操作);未消费过,则休眠指定时间,抛出ImmediateRequeueAmqpException,让消息回到requeue,待下次消费。 #### 流程图 ![顺序消费流程](https://img-blog.csdnimg.cn/20210205103714970.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ppZG9uZ3hpYW5neGk=,size_16,color_FFFFFF,t_70#pic_center) ### 使用说明 添加依赖 ```xml com.zidongxiangxi spring-boot-starter-reliable-rabbi 1.0.0-SNAPSHOT ``` 添加配置 ```yaml reliable-rabbit: application: 服务名 zookeeper-address: zk的服务器地址 producer: rely: enabled: true #默认true,可以不配置 table-name: producer #默认表名为producer,支持配置,例如可以加上自己库的前缀test_producer,记得跟sql语句中的建表语句一致 batch-size: 20 #默认20,对发送失败的mq,每次处理多少个重发任务 sequence: enabled: false #默认false,不开启顺序消息功能 table-name: producer_sequence #默认表名为producer_sequence,支持配置,例如可以加上自己库的前缀test_producer_sequence,记得跟sql语句中的建表语句一致 retention-period: 10 #默认表名为producer_sequence表中的数据的保留时长,单位:天。消息发送成功后,表中数据不会删除,所以需要额外清理 batch-size: 20 #默认20,每次清理任务最多清理多少数据 work-period: 300000 #执行清理任务的周期,单位毫秒,默认300000 consumer: idempotent: enabled: false #默认为false,不开启 transaction: false #默认为false,不采用事务。如果采用事务,往数据库插入消费记录的操作会和整个mq消费动作在一个事务中,可能会造成长事务 table-name: consume_record #消费记录的默认表名为consume_record auto-clear: false #默认false,不清理消费记录。如果怕堆积的数据量太大,可以打开 retention-period: 30 #默认30, 保留时长,单位:天,只有auto-clear为true才有效 clear-batch-size: 20 #默认20,每次清理任务最多清理多少数据 work-period: 300000 #执行清理任务的周期,单位毫秒,默认300000 sequence: enabled: false #默认false,不开启顺序消费 consume-fail-delay: #默认10000,单位毫秒。当前置消息没被消费时,consume-fail-delay毫秒后可以被强行消费 ``` #### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request