# big-smart **Repository Path**: gitduck/big-smart ## Basic Information - **Project Name**: big-smart - **Description**: 基于redis的延时任务,参考有赞延时队列的设计 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 5 - **Created**: 2022-03-07 - **Last Updated**: 2022-03-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 基于Redis的延时任务队列时间 > 参考[有赞延迟队列设计](https://tech.youzan.com/queuing_delay/) 源码地址: [https://gitee.com/A_yes/big-smart](https://gitee.com/A_yes/big-smart) ## 主要组成部分 ### 1. DelayJobBucket 数据结构 redis的 zset(有序集合)用于存放任务的id 并按照执行时间排序 ### 2. JobPoll 数据结构 redis的hash,以任务的id作为key,存放job的元信息 ### 3. ReadyQueue 数据结构为redis的list,就绪队列,用于存放已经到期的任务,随时可以被取出去消费 ### 4. BucketTimer 用于轮训对应delayJobBucket中到期的任务,并将任务从delayJobBucket中移除并加入对应ReadyQueue,一个线程对应一个bucket ### 5. TopicTimer 用于轮训ReadyQueue,并启动新线程去执行任务,一个线程对应一个readyQueue ### 6. IWorker 用于实现对不同任务不同的处理方式。 ![](img/redis延时任务.png) ## Job数据结构 - id : 任务的唯一标识,要保证同一个topic下唯一 - topic : 主题,用来把任务分组 - delay:延迟时间(单位毫秒) - ttr:超时时间,保证任务肯定被消费一次,当任务被从ReadyQueue中取出时,用ttr重新计算下次执行时间,并加入bucket,当任务被消费成功会删除jobPoll中的元数据 - body:业务数据 - status:任务状态 - execTime:执行时间(绝对时间) - createTime:任务创建时间(绝对时间) ## 使用方法 内置里一个http处理worker,我们以它为例说明使用方法。 这个worker处理逻辑是,当任务到期,像任务描述里的http接口发送一个请求,并比对返回值和给定的规则,判断是否通过。 http任务业务数据结构: - url:到期需要调用的接口 - method:get/post请求方法 - headers:请求头 - body:请求参数 - passrule:成功规则 ### 1. 先添加任务 - post /o/add 参数示例: ```json { "delay": 3000, "ttr": 10000, "topic": "http", "id": "2d56dc02-4aed-486w1-acea-bc1516b3959css", "body":"{\"method\":\"get\",\"headers\":{\"a\":\"asdad\",\"b\":123123},\"passrule\":{\"code\":1},\"body\":{\"p1\":\"asd\",\"p2\":123123},\"url\":\"http://192.168.200.250:7070/t/a\"}" } ``` > 对于topic为http的任务,只需要添加任务就行,到执行时间,会自动调用指定接口,其他类型的任务,可以通过实现IWorker接口在服务器端完成任务处理,或者通过http接口轮训在客户端完成任务处理,处理后要回报处理结果。 ### 2. 客户端轮询就绪任务 - get /o/get 参数示例: ``` /o/get?topic=test ``` ### 3. 回报处理结果 - get /o/complete 参数示例: ``` /o/complete?topic=test&id=123 ``` ## job流程 1. 通过http添加job,job会被加入bucket,id作为值,执行时间作为score,job元信息会被加入jobPoll。 2. 判断是否有此对应topic的BucketTimer线程启动,若无则启动一个开轮询对应的bucket,任务到期就将任务从bucket中取出,修改状态,根据ttr重新计算下次执行时间后重新入bucket(实现了超时重试机制),移入readyQueue 3. 程序启动时会扫描redis中是否已有bucket,若有自动启动对应BucketTimer。程序启动时会根据配置文件,启动服务器端任务处理器。 4. 服务端任务处理器轮询对应topic的readyQueue,当有任务时启动新线程去处理任务。 5. 任务处理完成后会将元数据从jobPoll中删除