# rabbitmq_accdient **Repository Path**: git.ganjian.net/rabbitmq_accdient ## Basic Information - **Project Name**: rabbitmq_accdient - **Description**: No description available - **Primary Language**: Java - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 8 - **Created**: 2021-09-10 - **Last Updated**: 2021-09-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/handling_rabbitmq_queue_blocking_header.jpg) ### 前言   那天我和同事一起吃完晚饭回公司加班,然后就群里就有人@我说xxx商户说收不到推送,一开始觉得没啥。我第一反应是不是极光没注册上,就让客服通知商户,重新登录下试试。这边打开极光推送的后台进行检查。后面反应收不到推送的越来越多,我就知道这事情不简单。 ### 事故经过   由于大量商户反应收不到推送,我第一反应是不是推送系统挂了,导致没有进行推送。于是让运维老哥检查推送系统各节点的情况,发现都正常。于是打开RabbitMQ的管控台看了一下,人都蒙了。已经有几万条消息处于`ready`状态,还有几百条`unacked`的消息。   我以为推送服务和MQ连接断开了,导致无法推送消息,于是让运维重启推送服务,将所有的推送服务重启完,发现`unacked`的消息全部变成`ready`,但是没过多久又有几百条`unacked`的消息了,这个就很明显了能消费,没有进行`ack`呀。   当时我以为是网络问题,导致mq无法接收到`ack`,让运维老哥检查了一下,发现网络没问题。现在看是真的是傻,网络有问题连接都连不上。由于确定的是无法`ack`造成的,立马将`ack模式`由原来的`manual` 改成`auto`紧急发布。将所有的节点升级好以后,发现推送正常了。   你以为这就结束了其实并没有,没过多久发现有一台MQ服务出现异常,由于生产采用了`镜像队列`,立即将这台有问题的MQ从集群中移除。直接进行重置,然后加入回集群。这事情算是告一段落了。此时已经接近24:00了。 ![](https://cos.huangxunhui.com/blog/factory_method/depressed.gif)   时间来到第二天上午10:00,运维那边又出现报警了,说推送系统有台机器,磁盘快被写满了,并且占用率很高。我的乖乖从昨晚到现在写了快40G的日志,一看报错信息瞬间就明白问题出在哪里了。麻溜的把`bug`修了紧急发布。 > 吐槽一波公司的ELK,压根就没有收集到这个报错信息,导致我没有及时发现。 ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/architecture.png) ### 事故重现-队列阻塞 #### MQ配置 ```yml spring: # 消息队列 rabbitmq: host: 10.0.0.53 username: guest password: guest virtual-host: local port: 5672 # 消息发送确认 publisher-confirm-type: correlated # 开启发送失败退回 publisher-returns: true listener: simple: # 消费端最小并发数 concurrency: 1 # 消费端最大并发数 max-concurrency: 5 # 一次请求中预处理的消息数量 prefetch: 2 # 手动应答 acknowledge-mode: manual ``` #### 问题代码 ```java @RabbitListener(queues = ORDER_QUEUE) public void receiveOrder(@Payload String encryptOrderDto, @Headers Map headers, Channel channel) throws Exception { // 解密和解析 String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto); OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class); try { // 模拟推送 pushMsg(orderDto); }catch (Exception e){ log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto)); }finally { // 消息签收 channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); } } ```   看起来好像没啥问题。由于和交易系统约定好,订单数据需要先转换`json`串,然后再使用`AES`进行加密,所以这边需要,先进行解密然后在进行解析。才能得到订单数据。   为了防止消息丢失,交易系统做了`失败重发`机制,防止消息丢失,不巧的是重发的时候没有对订单数据进行加密。这就导致推送系统,在解密的时候出异常,从而无法进行`ack`。 > 默默的吐槽一句:人在家中坐,锅从天上来。 #### 模拟推送 [推送代码](https://gitee.com/huangxunhui/rabbitmq_accdient/blob/master/src/main/java/com/hxh/rabbitmq/accdient/controller/SendController.java) **发送3条正常的消息** ```shell curl http://localhost:8080/sendMsg/3 ``` **发送1条错误的消息** ```shell curl http://localhost:8080/sendErrorMsg/1 ``` **再发送3条正常的消息** ```shell curl http://localhost:8080/sendMsg/3 ``` ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/console_log.png)   观察日志发下,虽然有报错,但是还能正常进行推送。但是RabbitMQ已经出现了一条`unacked`的消息。 ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/has_unacked_msg.png) **继续发送1条错误的消息** ```shell curl http://localhost:8080/sendErrorMsg/1 ``` **再发送3条正常的消息** ```shell curl http://localhost:8080/sendMsg/3 ```   这个时候你会发现控制台报错,当然错误信息是解密失败,但是正常的消息却没有被消费,这个时候其实队列已经阻塞了。 ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/queue_is_block.png) ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/has_many_unacked_msg.png)   从`RabbitMQ`管控台也可以看到,刚刚发送的的3条消息处于`ready`状态。这个时候就如果一直有消息进入,都会堆积在队里里面无法被消费。 **再发送3条正常的消息** ```shell curl http://localhost:8080/sendMsg/3 ``` ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/has_6_unacked_msg.png) #### 分析原因   上面说了是由于没有进行`ack`导致队里阻塞。那么问题来了,这是为什么呢?其实这是`RabbitMQ`的一种保护机制。防止当消息激增的时候,海量的消息进入`consumer`而引发`consumer`宕机。   RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认的消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置`PrefetchCount`实现。   举例说明:可以理解为在`consumer`前面加了一个缓冲容器,容器能容纳最大的消息数量就是`PrefetchCount`。如果容器没有满`RabbitMQ`就会将消息投递到容器内,如果满了就不投递了。当`consumer`对消息进行`ack`以后就会将此消息移除,从而放入新的消息。 ```yml listener: simple: # 消费端最小并发数 concurrency: 1 # 消费端最大并发数 max-concurrency: 5 # 一次处理的消息数量 prefetch: 2 # 手动应答 acknowledge-mode: manual ``` > prefetch参数就是PrefetchCount   通过上面的配置发现`prefetch`我只配置了2,并且`concurrency`配置的只有1,所以当我发送了2条错误消息以后,由于解密失败这2条消息一直没有被`ack`。将缓冲区沾满了,这个时候`RabbitMQ`认为这个`consumer`已经没有消费能力了就不继续给它推送消息了,所以就造成了队列阻塞。 #### 判断队列是否有阻塞的风险。   当`ack`模式为`manual`,并且线上出现了`unacked`消息,这个时候不用慌。由于QOS是限制信道`channel`上的消费者所能保持的最大未确认的数量。所以允许出现`unacked`的数量可以通过`channelCount * prefetchCount * 节点数量` 得出。 > `channlCount`就是由`concurrency`,`max-concurrency`决定的。 - `min` = `concurrency * prefetch * 节点数量` - `max` = `max-concurrency * prefetch * 节点数量` 由此可以的出结论 - `unacked_msg_count` < `min` 队列不会阻塞。但需要及时处理`unacked`的消息。 - `unacked_msg_count` >= `min` 可能会出现堵塞。 - `unacked_msg_count` >= `max` 队列一定阻塞。 **这里需要好好理解一下。** #### 处理方法   其实处理的方法很简单,将解密和解析的方法放入`try catch`中就解决了这样不管解密正常与否,消息都会被签收。如果出错将会输出错误日志,让开发人员进行处理了。 > 对于这个就需要有日志监控系统,来及时告警了。 ```java @RabbitListener(queues = ORDER_QUEUE) public void receiveOrder(@Payload String encryptOrderDto, @Headers Map headers, Channel channel) throws Exception { try { // 解密和解析 String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto); OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class); // 模拟推送 pushMsg(orderDto); }catch (Exception e){ log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto); }finally { // 消息签收 channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); } } ``` #### 注意的点   `unacked`的消息在`consumer`切断连接后(重启),会自动回到队头。 ### 事故重现-磁盘占用飙升   一开始我不知道代码有问题,就是以为单纯的没有进行`ack`所以将`ack`模式改成`auto`自动,紧急升级了,这样不管正常与否,消息都会被签收,所以在当时确实是解决了问题。   其实现在回想起来是非常危险的操作的,将`ack`模式改成`auto`自动,这样会使QOS不生效。会出现大量消息涌入`consumer`从而造成`consumer`宕机,可以是因为当时在晚上,交易比较少,并且推送系统有多个节点,才没出现问题。 #### 问题代码 ```java @RabbitListener(queues = ORDER_QUEUE) public void receiveOrder(@Payload String encryptOrderDto, @Headers Map headers, Channel channel) throws Exception { // 解密和解析 String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto); OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class); try { // 模拟推送 pushMsg(orderDto); }catch (Exception e){ log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto); }finally { // 消息签收 channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); } } ``` **配置文件** ```yml listener: simple: # 消费端最小并发数 concurrency: 1 # 消费端最大并发数 max-concurrency: 5 # 一次处理的消息数量 prefetch: 2 # 手动应答 acknowledge-mode: auto ```   由于当时不知道交易系统的重发机制,重发时没有对订单数据加密的bug,所以还是会发出少量有误的消息。 **发送1条错误的消息** ```shell curl http://localhost:8080/sendErrorMsg/1 ``` ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/error_mq.png) ![](https://cos.huangxunhui.com/blog/handling_rabbitmq_queue_blocking/random_exception.gif) #### 原因   `RabbitMQ`消息监听程序异常时,`consumer`会向`rabbitmq server`发送`Basic.Reject`,表示消息拒绝接受,由于`Spring`默认`requeue-rejected`配置为`true`,消息会重新入队,然后`rabbitmq server`重新投递。就相当于死循环了,所以控制台在疯狂刷错误日志造成磁盘利用率飙升的原因。 ### 解决方法   将`default-requeue-rejected: false`即可。 ### 总结 - 个人建议,生产环境不建议使用自动ack,这样会QOS无法生效。 - 在使用手动ack的时候,需要非常注意消息签收。 - 其实在将有问题的MQ重置时,是将错误的消息给清除才没有问题了,相当于是消息丢失了。 ```java try { // 业务逻辑。 }catch (Exception e){ // 输出错误日志。 }finally { // 消息签收。 } ``` ### 参考资料 - [RabbitMQ消息监听异常问题探究](https://blog.csdn.net/u014513883/article/details/77907898) ### 代码地址 > https://gitee.com/huangxunhui/rabbitmq_accdient.git ### 结尾   如果有人告诉你遇到线上事故不要慌,除非是超级大佬久经沙场。否则就是瞎扯淡,你让他来试试,看看他会不会大脑一片空白,直冒汗。   如果觉得对你有帮助,可以多多评论,多多点赞哦,也可以到我的主页看看,说不定有你喜欢的文章,也可以随手点个关注哦,谢谢。 ![](https://cos.huangxunhui.com/blog/个人铭牌.png)