# distributed-message **Repository Path**: cckevincyh/distributed-message ## Basic Information - **Project Name**: distributed-message - **Description**: No description available - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-08-28 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 业务场景 1. 消息推送实时性高,可以接受丢失。 2. 消息推送实时性低,不能丢失。 ## 消息推送实时性高,可以接受丢失 不需要用到transaction outbox的模式 ``` handle(){ [start transaction] service();//业务方法 [transaction commit] sendMessage();//发送message } ``` 好处: 1. 吞吐量高,不需要写本地消息表 存在问题: 1. **极端情况**下存在不一致的情况,比如message丢失(可容忍?)(无法处理) 2. message丢失之后,无法重发(没有储存本地消息表) (极端情况,基本很少)(无法处理) 3. 在发送message的时候,中间件挂了,会轮询发送直到中断,虽然message持久化了,但下次重新消费的时候会重新执行一次业务代码(重复操作),可以通过代码去处理幂等性,并且重新发送消息。(可处理) 4. 无法监控消息日志 5. 需要support来保证最终一致性 ## 消息推送实时性低,不能丢失 这种可以用写本地消息表的方式 ``` handle(){ //发送方处理 [start transaction] service();//业务方法 saveMessage();//保存消息到本地,状态为发送状态 [transaction commit] sendMessage();//发送message } schedule(){ //一段时间内trigger发送状态的message到消费端 } ``` ``` handle(){ //消费端 [start transaction] service();//业务方法 [transaction commit] ack();//向发送方发送ack的message,发送方接收到之后把消息状态设置为已完成 } ``` ![](img/本地消息表.png) 好处: 1. 确保消息不丢失,只要业务数据保存,本地消息数据也被持久化。如果生产方send message失败,可以通过schedule去扫未完成的message重新发送,确保了消息一定会被发出去 2. 消息日志可追溯 存在的问题: 1. send message的过程中如果有异常不应该抛出(可处理) 2. 如果发送方在发送message的过程中中间件挂了,发送法也是需要保证幂等性,但不需要特殊处理重新发送message,交给schedule去处理 (可处理) 3. 因为多进行了一次数据的持久化的IO操作,降低了吞吐量 (可容忍?) 4. 每个消息发送方都需要单独存储一个本地消息表(可容忍?) 5. 消费方需要做特殊的ACK处理,处理来自不同发送方的消息进行ACK处理。(一旦业务复杂,出现很多双向的消息,不好管理) (可容忍?) 6. 存在未完成消息的积压,加重schedule的压力(限制发送数据条数,消息重试最大值,最终的人为干预)(可处理) 7. 如果入库的消息体比较大,查询可能消耗的IO比较大,需要考虑拆分单独的一张消息内容表用于存放消息体内容,而经常更变的列应该单独拆分到另外一张表。 ## 独立的消息模块 根据上面提到的每个消息发送方都需要做一个单独的存储本地表,不是特别的通用。可以考虑抽取出一个独立的消息模块。 ![](img/独立消息模块-可靠消息一致性.jpg) ![](img/Snipaste_2020-09-02_16-40-12.png) 好处: 1. 所有的消息都汇总在了独立的消息模块中,更好的去monitor 2. 系统A和系统B只需要关心和独立消息的交互,不存在太多复杂的系统之间消息的交互 3. 每个系统中都不需要单独存在一张消息表 4. 消息可靠性高 存在的问题: 1. 所有的生产者和消费者都与独立消息模块进行交互,独立消息模块需要接受所有上游的消息,还有把消息转发到下游系统,独立消息系统负担过大,而且存在单点故障的问题,需要保证独立消息模块的高可用 2. 生产者系统和独立的消息系统存在一次RPC的调用,来保证同步存储预存消息(相当于本地消息表和业务数据存储在一个事务中),保证消息不会丢失(可考虑变成异步? 但是异步消息就代表存在消息丢失的可能性,达不到可靠性)。RPC调用降低了系统的吞吐量。 3. 复杂的业务场景下,业务处理和发送message需要做很多的考虑,当一个系统中可能需要发送多个消息出去的时候,事务的控制还有RPC调用失败的时候需要控制我们的事务。 4. 生产者和消费者可以抽取出来一套common的消息发送和消息接收的代码,但是当业务场景复杂或者使用场景复杂时候,很难覆盖(框架的弊端,如何使用的问题) 5. 最重要的一点就是需要保证生产者和消费者的幂等性 ## 本地消息事务 **[关键点]** 1. 在send message之前先持久化message payload,保证要发出去的message都被持久化(即能够重新获取并进行resend 处理) 2. message的持久化需要和业务数据的持久化在同一个transaction 3. 必须要先commit transaction 之后才进行send message的操作,防止在commit阶段的异常,但message仍旧发出,导致数据不一致的情况 4. 加入ACK确认机制,确保消费端已完成消费,**保证消息传递的可靠性** 5. 对于未进行ACK确认的message安排schedule计划去批量搜索并进行resend 操作,保证所有message都能够接收到ACK**(至少发一次原则)** 6. 消费端必须做好**幂等性**校验,保证不会因为生产端重复resend操作导致业务数据异常**(保证幂等性)**, 检测到有重复消费的情况即不会做任何业务操作,直接发送ACK确认的message给到生产方,保证生产方能够接收到ACK message**(ACK 至少发一次原则)** 7. 消费端的recovery是由生产方进行resend保证的,还有消费端的recovery还能依赖solace的retry来保证 经过以上步骤来确保最终一致性。 一些具体happy case和exception case的情况,如何保证最终一致性: 1. happy case,无异常发生,生产方执行业务逻辑并持久化message,然后把message 发送给消费端,消费端消费message并ACK 给生产方,生产方将message record的状态update 成EXECUTED ![](img/happycase.png) 2. exception case 1:生产端执行业务逻辑失败,记录下异常信息还有message payload,以便后续的恢复和support。消费端也没接收到生产方的message,两边数据是一致的。 ![](img/exceptioncase1.png) 3. 中间件消息传递中出现丢失(极少可能性),生产端已经处理了数据,但是消费端没有接收到message,两边数据不一致。但是消费端没有接收到ACK的message,所以schedule module会send message 通知生产端扫描未确认的message,然后重发给消费端,确保消息能够传递到消费端。一旦消费端接收到消息并消费成功,也把ACK message通知了生产端,只有生产端接收到ACK才能确保本次交付是成功的,来最终确保了数据的一致性。 ![](img/exception case 2.png) 4. 消费端处理失败,首先solace retry处理,做第一步恢复操作,仍然失败之后生产方检测到未确认的message就会进行重发啦,确保最终数据的一致性。 ![](img/exceptioncase3.png) 1. 直接发送message,与事务无关(很难保证一致性) 2. send message after commit transaction,与事务有关但不会记录message payload(难保证一致性,比1的情况稍微好一些) 3. send message after commit transaction 还有在发送前持久化message payload(较强一致性),无法处理中间件传递过程中的消息丢失问题(但极少发生) 4. send message after commit transaction 还有在发送前持久化message payload,然后生产方需要ACK确认message消费(强一致性)