# redis-delay **Repository Path**: mmcLine/redis-delay ## Basic Information - **Project Name**: redis-delay - **Description**: No description available - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-08-07 - **Last Updated**: 2022-08-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # redis-delay #### 背景 我们先看看以下业务场景: 当订单一直处于未支付状态时,如何在半小时后自动取消? 如何定期检查处于退款状态的订单是否已经退款成功? #### 实现方案 1. 定时任务扫表 为了解决以上问题,最简单直接的办法就是定时去扫表。每个业务都要维护一个自己的扫表逻辑。 - 优点:简单 - 缺点:每分钟全局扫表,浪费资源,损耗数据库性能 2. 消息中间件 使用消息中间件可以实现延迟消息,像RocketMQ就自带这个功能 - 优点:成熟稳定,使用方便 - 缺点:针对那些项目里没有使用消息中间件的,需要额外部署,成本大。 3.基于Redis 根据Redis的zset、list的特性,我们可以利用Redis来实现一个延迟队列。 - 优点:性能高,容易实现 - 缺点:消息持久化依赖于redis,占用线程多 方案参考有赞延迟队列:https://tech.youzan.com/queuing_delay/ #### 软件架构 ![输入图片说明](https://images.gitee.com/uploads/images/2021/0807/192823_fbde4057_2217011.png "屏幕截图.png") 消息流程: - 用户对某个商品下单,系统创建订单成功,同时往延迟队列里put一个job。 - 延迟队列收到该job后,先往job pool中存入job信息,然后根据delay计算出绝对执行时间,放入delay bucket. - 如果不是延迟job,直接放入ready set中。 - DelayMoveToReadyTimer定时器循环取出delay中延迟时间达到的数据,放入ready set里 - ReadyQueueTimer定时器循环取出消息消费 #### 接入流程 在项目里的redis-delay-test测试项目中有完整的接入示例 大概流程如下: 1. 引入pom ``` com.mmc redis-delay-core 1.0-SNAPSHOT ``` 2. 注入两个bean ``` @Configuration public class DelayConfig { @Autowired private RedisTemplate redisTemplate; @Autowired(required = false) public void setRedisTemplate(RedisTemplate redisTemplate) { GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); StringRedisTemplate stringRedisTemplate=new StringRedisTemplate(redisTemplate.getConnectionFactory()); stringRedisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); stringRedisTemplate.setValueSerializer(jackson2JsonRedisSerializer); this.redisTemplate = stringRedisTemplate; } @Bean public Producer redisDelayService(){ DelayProducer delayProducer = new DelayProducer(redisTemplate); delayProducer.setJobHandle("ORDER", new JobHandle() { @Override public boolean handle(Job job) { System.out.println("收到消息:"+job); return true; } @Override public boolean errorHandle(Job job) { return false; } }); return delayProducer; } } ``` 3. 建议创建一个topic枚举类 ``` public enum TopicEnum { ORDER, LOG; } ``` 4. 使用api 目前提供2个api: - asyncPutMessage 异步放入消息 - syncPutMessage 同步放入消息 ``` JobParam param = JobParam.builder().topic(TopicEnum.ORDER.toString()).body("nihao").jobId("23111").build(); boolean b = producer.syncPutMessage(param); ``` #### 接入后台管理界面 1. 引入依赖 ``` com.mmc redis-delay-ui 1.0-SNAPSHOT ``` 2. config类修改如下: ``` @Configuration public class DelayUIConfig { @Autowired private RedisTemplate redisTemplate; @Autowired(required = false) public void setRedisTemplate(RedisTemplate redisTemplate) { GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); StringRedisTemplate stringRedisTemplate=new StringRedisTemplate(redisTemplate.getConnectionFactory()); stringRedisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); stringRedisTemplate.setValueSerializer(jackson2JsonRedisSerializer); this.redisTemplate = stringRedisTemplate; } @Bean public DelayFacade delayFacade(){ return new DelayFacadeImpl(redisTemplate); } @Bean public Producer redisDelayService(){ DelayProducer delayProducer = new DelayProducer(redisTemplate); delayProducer.setJobHandle("ORDER", new JobHandle() { @Override public boolean handle(Job job) { System.out.println("收到消息:"+job); return false; } @Override public boolean errorHandle(Job job) { return false; } }); return delayProducer; } } ``` 3. 配置页面路径或者将页面拷贝到你的项目 spring.mvc.view.prefix=/pages/ spring.mvc.view.suffix=.html 4. 访问后台管理界面 > http://localhost:8080/redisDelay/admin/index ![](https://img2020.cnblogs.com/blog/1178991/202109/1178991-20210902082644296-1056628151.png) #### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request #### TODO 1. 后台管理界面 - 查询异常数据(pool里面有,但是delay和ready里没了的数据) 2. 内置持久化,确保数据不丢失 #### 更新list 更新时间:2021-08-09 1. 支持JDK1.7,很不想支持,但是我们公司项目用的1.7。。。。 2. 支持超时熔断 3. 将timer里面的thread+while(true)改为用ScheduledExecutorService替换,避免topic数量多时 线程占用过多。因为之前那个方案是一个topic一个线程。 4. 支持多机器部署,增加简单的分布式锁,防止同一个数据重复消费。 更新时间:2021-08-16 5. 支持手动停止某个topic消费 6. 优雅停机 7. 过滤器实现 更新时间:2021-08-24 8. 超过重试次数进入死信队列 9. 死信队列内消息可批量手动重试 更新时间:2021-08-25 10. 去除多余依赖 11. 修改jobID为String类型,避免Long类型超过16为存入redis精度丢失 更新时间:2021-09-01 12. 修改api设计