# delaytaskframework
**Repository Path**: linshidream/delaytaskframework
## Basic Information
- **Project Name**: delaytaskframework
- **Description**: 一个轻量级的分布式延迟任务调度框架,专门用于解决订单取消、定时通知、自动收货、回调重试等延迟场景。
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 2
- **Forks**: 0
- **Created**: 2025-05-16
- **Last Updated**: 2025-08-26
## Categories & Tags
**Categories**: Uncategorized
**Tags**: 中间件, 调度器, 线程池
## README
## 📖 README
# Delay Task Framework
一个轻量级、可扩展、支持多种调度模式的分布式延迟任务调度框架,支持任务存储、失败重试、死信处理、主题订阅、错峰调度、分布式路由调度等特性,适用于订单取消、定时通知、自动收货、回调重试等延迟场景。
## ✨ 核心特性
- ✅ 支持延迟任务存储(Redis已实现、MySQL暂未实现)
- ✅ 支持任务处理器动态注册(基于注解自动化加载)
- ✅ 支持任务失败重试(指数退避 + 时钟抖动 避免任务雪崩)
- ✅ 支持死信任务处理(清理、批量处理到最大重试次数的任务)
- ✅ 每个 topic + tag 独立时间轮询 & 共享线程池
- ✅ 支持调度器插件(内置 ScheduledExecutorScheduler、支持接入 XXL-JOB)
- ✅ 支持分布式调度能力 (Redis 注册中心实现,节点注册 + 心跳续期 + 健康实例获取)
- ✅ 支持 LAST、一致性哈希、分片等路由策略 (解决多节点并发调度问题)
## 💡 使用场景
- 订单未支付10分钟后取消
- 注册后1小时发送欢迎邮件
- 已完成订单7天自动收货
- 订单充值结果通知下游系统
## 📝 分布式调度
多节点实例部署环境,使用本地线程池调度器会存在多实例并发调度的情况。可以通过注册中心,根据路由策略来匹配具体的实例执行调度。
- LAST:最后一个注册实例调度(默认方式)
- CONSISTENT_HASH:(计算 topic + tag 的哈希值)一致性哈希分配
- SHARDING_BROADCAST:所有节点同时调度(广播)
## License
This project is licensed under the Apache License 2.0 - see the [LICENSE](./LICENSE) file for details.
## 🚀 快速使用
### 1. 引入 maven 依赖
需要注意:引入该 maven 依赖后,只需要注入 RedissonClient 对象,参考 RedissonConfig 配置文件。无需重复引入 Redisson 相关依赖。(存储对列任务依赖redis)
```xml
com.linshidream
delay-task-core
1.0.0-SNAPSHOT
```
### 2. 定义任务类
```java
public class OrderNotifyTask extends BaseDelayMqMessage {
private String orderId;
// getter/setter
}
```
### 3. 编写任务处理器
```java
@Slf4j
@Component
@DelayTaskListener(topic = "order", tag = "notify", pollingIntervalSeconds = 30, pollTasksMax = 100)
public class OrderNotifyHandler implements DelayTaskHandler {
// 该处理器订阅的主题=order,标签=notify,每30秒轮询一次,每次处理100条
@Override
public DelayTaskResult handleTask(OrderNotifyTask task) {
try {
String msg = notify(task.getOrderId());
return DelayTaskResult.success(msg, task);
} catch (Exception e) {
log.error("", e);
return DelayTaskResult.failureAndRetry(e.getMessage(), task);
}
}
public String notify(String orderId) {
log.info("模拟延迟任务执行,通知订单返回成功");
return "ok";
}
}
```
### 4. 提交任务
```java
public class TestUnit {
public static void main(String[] args) {
// 自动注入
DelayTaskEngine delayTaskEngine = new DelayTaskEngine();
OrderNotifyTask task = new OrderNotifyTask();
task.setKey(orderId);
task.setSource("测试订单");
// 5秒后触发
task.setExecuteTime(task.toMilli(LocalDateTime.now().plusSeconds(5)));
task.setOrderId(IdUtil.fastSimpleUUID());
delayTaskEngine.submit("order", "notify", task);
}
}
```
## 📁 项目结构图
```
delay-task-framework/
├── core/
│ ├── DelayTaskHandler.java // 延迟任务处理器接口
│ ├── DelayTaskHandlerMeta.java // 延迟任务处理器元数据
│ ├── DelayTaskResult.java // 任务执行结果封装
│ ├── DelayTaskListener.java // 标注监听延迟任务的注解
│ ├── DelayTaskListenerRegistrar.java // 自动扫描 @DelayTaskListener 注解并注册
│ ├── DelayTaskHandlerRegistry.java // 任务处理器注册中心
│ ├── DelayTaskEngine.java // 延迟任务引擎:任务提交&执行入口
│ ├── DelayTaskDispatcher.java // 任务调度分发器:分发任务到执行器
│ ├── DelayTaskRetryDispatcher.java // 重试任务调度器:支持退避机制
│ ├── DelayTaskScheduler.java // 统一调度器:封装线程池 & 调度逻辑
│ ├── ScheduledExecutorScheduler.java // 本地线程池调度器实现
│ ├── XxlJobDelayTaskScheduler.java // XXL-Job等外部调度器实现
│ ├── BackoffUtils.java // 退避算法工具类(指数退避 + Jitter)
│ ├── DelayTaskConfiguration.java // Spring Boot 自动配置类
│ ├── TaskStorage.java // 延迟任务存储接口
│ ├── RedisDeadTaskStorage.java // 延迟任务存储 Redis 实现
│ ├── DeadTaskStorage.java // 延迟死信任务存储接口
│ ├── RedisDeadTaskStorage.java // 延迟死信任务存储 Redis 实现
├── distributor/
│ ├── ... // 分布式调度和注册中心实现
```
------
## 🧩 各文件职责说明
| 类名 | 描述 |
| ---------------------------- | ------------------------------------------------------------ |
| `DelayTaskHandler` | 延迟任务处理接口,所有任务处理器需实现此接口。 |
| `DelayTaskHandlerMeta` | 延迟任务处理器元数据,包含处理器原始信息。 |
| `DelayTaskResult` | 任务处理结果封装,含是否成功、错误原因、是否重试等。 |
| `DelayTaskListener` | 标注在处理类上,用于注册为某个 tag/topic 的处理器。 |
| `DelayTaskListenerRegistrar` | 扫描并注册带有 `@DelayTaskListener` 注解的任务处理器。 |
| `DelayTaskHandlerRegistry` | 注册中心,按 topic+tag 注册处理器,并支持类型泛型绑定。 |
| `DelayTaskEngine` | 核心引擎:提供任务提交、任务调度的统一入口。 |
| `DelayTaskDispatcher` | 任务调度执行器,周期性从存储中拉取任务并交给执行线程池处理。 |
| `DelayTaskRetryDispatcher` | 处理失败任务的重试逻辑,使用独立线程池 & 支持退避策略。 |
| `DelayTaskScheduler` | 封装线程池和调度操作,支持错峰调度、动态 tag 轮询等。 |
| `ScheduledExecutorScheduler` | 本地线程池调度器实现。 |
| `XxlJobDelayTaskScheduler` | XXL-Job等外部调度器实现。 |
| `BackoffUtils` | 提供指数退避(delay * 2^n)、Jitter、最大重试次数等算法。 |
| `DelayTaskConfiguration` | Spring Boot 自动装配类,统一初始化调度组件。 |