# 分布式任务调度
**Repository Path**: deepeng/neotask
## Basic Information
- **Project Name**: 分布式任务调度
- **Description**: NeoTask | Python 轻量级异步任务调度库 · 零依赖 · 支持定时/优先级/重试 · SQLite/Redis 后端 · Celery 轻量替代方案。
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: main
- **Homepage**: https://task.pengline.cn/
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-05-07
- **Last Updated**: 2026-05-11
## Categories & Tags
**Categories**: Uncategorized
**Tags**: 分布式任务调度, 任务排队, 延时任务, 耗时任务, 排队机制
## README
# 分布式任务调度系统(NeoTask)
轻量级 Python 异步任务队列管理器,无需额外服务,开箱即用。
> NeoTask 是一个纯 Python 实现的异步任务队列调度系统,专为耗时任务(AI 生成、视频处理、数据爬取等)设计,支持定时任务、周期任务、延迟任务。无需部署 Redis、PostgreSQL 等外部服务,安装后即可在任意 Python 项目中直接使用。
中文 | [English](./README.md) | [文档](https://pengline.cn/2026/04/243d5a536d064df59c2ec8668362b8b5/) | [PyPI](https://pypi.org/project/neotask/) | [官网](https://task.pengline.cn)
[](LICENSE) [](https://www.python.org/) [](https://pypi.org/project/penshot/) [](https://pepy.tech/project/neotask) 
---
## 特性
- **零依赖部署** - 纯 Python 实现,无需 Redis/PostgreSQL
- **即时任务** - 支持优先级调度,高优先级优先执行
- **定时任务** - 支持延时执行、固定间隔、Cron 表达式
- **异步并发** - 基于 asyncio,多 Worker 并发处理
- **自动重试** - 失败任务自动重试,可配置次数
- **持久化** - 内存/SQLite/Redis 多种存储后端
- **分布式** - Redis 队列、分布式锁、多节点协调
- **高性能** - 预取机制、批量操作
- **高可用** - 看门狗、超时检测、任务回收、死信队列
- **任务编排** - DAG工作流引擎,支持复杂任务依赖管理和条件分支
- **事件回调** - 支持任务生命周期事件监听
------
## 应用场景
| 场景 | 说明 | 推荐配置 | 使用入口 |
| :--------------------- | :--------------------------- | :---------------------- | :------------- |
| **AI 文生图/视频生成** | 耗时任务排队,避免阻塞主流程 | `worker_concurrency=3` | TaskPool |
| **批量文件处理** | 转码、压缩、上传等批量操作 | `worker_concurrency=10` | TaskPool |
| **网页爬虫调度** | 分布式爬取,防止被封 | `storage_type="redis"` | TaskPool |
| **定时报表发送** | 每天9点发送日报 | `cron="0 9 * * *"` | TaskScheduler |
| **延迟通知** | 用户操作后5分钟发送提醒 | `delay_seconds=300` | TaskScheduler |
| **心跳检测** | 每30秒检测服务健康状态 | `interval_seconds=30` | TaskScheduler |
| **后台数据分析** | 夜间执行数据聚合任务 | `cron="0 2 * * *"` | TaskScheduler |
| **数据处理流水线** | ETL任务依赖编排 | DAG工作流 | WorkflowEngine |
| **条件分支处理** | 根据结果执行不同分支 | `condition` 表达式 | WorkflowEngine |
---
## 架构&演进
```mermaid
graph TB
subgraph User["用户应用层"]
APP[用户代码]
end
subgraph NeoTask["NeoTask 核心"]
TP[TaskPool
即时任务入口 v0.1]
TS[TaskScheduler
定时任务入口 v0.3]
WF[WorkflowEngine
工作流编排入口 v1.5]
subgraph Core["共享核心组件"]
LM[LifecycleManager
任务生命周期管理]
QS[QueueScheduler
优先级+延迟队列]
WP[WorkerPool
Worker池/并发控制]
FM[FutureManager
异步等待/结果回调]
end
subgraph Internal["内部组件"]
EB[EventBus
事件总线 v0.2]
MC[MetricsCollector
指标收集 v0.2]
LF[LockFactory
分布式锁 v0.4]
end
EX[TaskExecutor
用户业务逻辑]
end
subgraph Storage["存储层"]
MEM[MemoryStorage]
SQLITE[(SQLiteStorage)]
REDIS[(RedisStorage v0.4)]
end
APP -->|即时任务| TP
APP -->|定时任务| TS
APP -->|工作流| WF
TS -->|委托| TP
WF -->|委托| TP
TP --> LM
TP --> QS
TP --> WP
TP --> FM
LM --> MEM
LM --> SQLITE
LM --> REDIS
WP --> EX
WP --> EB
WP --> MC
WP --> LF
```
发展路线图
```mermaid
timeline
title NeoTask 架构演进路线图
section v0.1
基础任务池 : 本地内存队列
: 异步执行引擎
: 内存/SQLite存储
section v0.2
可观测性 : 事件总线
: 指标收集
: 健康检查
section v0.3
定时调度 : 延时队列/时间轮
: 周期任务
: Cron表达式
section v0.4
分布式基础 : Redis共享队列
: 分布式锁
section v0.5
性能优化 : 预取机制
: 批量操作
: 连接池优化
section v1.0
高可用保障 : 看门狗续期
: 超时检测
: 故障自动恢复
section v1.5
任务编排 : DAG工作流
: 条件分支
: 并行执行
section v2.0
企业级特性 : 独立Web UI
: 多租户隔离
: Prometheus集成
```
------
## 快速上手
详细使用方式 请参阅 [文档](https://pengline.cn/2026/04/118be805273f47408bc580c4bd1203d8/)
### 安装
```sh
# 基础安装
pip install neotask
# 带 Redis 分布式支持
pip install neotask[redis]
# 完整安装
pip install neotask[full]
```
### 即时任务(TaskPool)
```python
from neotask import TaskPool
async def process(data):
return {"result": "done", "data": data}
# 创建任务池
pool = TaskPool(executor=process)
# 提交任务
task_id = pool.submit({"id": 123})
# 等待结果
result = pool.wait_for_result(task_id)
pool.shutdown()
```
### 定时任务(TaskScheduler)
```python
from neotask import TaskScheduler
scheduler = TaskScheduler(executor=process)
# 延时 60 秒执行
scheduler.submit_delayed({"id": 123}, delay_seconds=60)
# 每 5 分钟执行一次
scheduler.submit_interval({"id": 123}, interval_seconds=300)
# 每天 9 点执行
scheduler.submit_cron({"id": 123}, "0 9 * * *")
scheduler.shutdown()
```
### 使用上下文管理器
```python
with TaskPool(executor=process) as pool:
task_id = pool.submit({"id": 123})
result = pool.wait_for_result(task_id)
```
### 使用事件回调
```python
from neotask import TaskPool
async def on_task_created(event):
print(f"任务创建: {event.task_id}")
async def on_task_completed(event):
print(f"任务完成: {event.task_id}, 结果: {event.data}")
async def on_task_failed(event):
print(f"任务失败: {event.task_id}, 错误: {event.data}")
pool = TaskPool(executor=my_executor)
pool.start()
# 注册事件回调
pool.on_created(on_task_created)
pool.on_completed(on_task_completed)
pool.on_failed(on_task_failed)
task_id = pool.submit({"test": "event"})
result = pool.wait_for_result(task_id)
```
## API 参考
| 方法 | 说明 |
| :------------------------------------------- | :---------------- |
| `pool.submit(data, priority=2, delay=0)` | 提交任务 |
| `pool.wait_for_result(task_id, timeout=300)` | 等待结果 |
| `pool.get_status(task_id)` | 获取状态 |
| `pool.cancel(task_id)` | 取消任务 |
| `scheduler.submit_delayed(data, delay)` | 延时任务 |
| `scheduler.submit_interval(data, interval)` | 周期任务 |
| `scheduler.submit_cron(data, cron)` | Cron 任务 |
| `engine.submit_workflow(definition)` | Submit workflow |
| `engine.wait_workflow(execution_id)` | Wait for workflow |
详细 API 请参阅 [文档](https://pengline.cn/2026/04/650ac5bb41c74e26bc4effcec88bf26c/)
## 配置示例
```python
from neotask import TaskPool, TaskPoolConfig
config = TaskPoolConfig(
worker_concurrency=10, # 并发 Worker 数
max_retries=3, # 重试次数
storage_type="sqlite", # 存储类型
)
pool = TaskPool(executor=process, config=config)
```
详细使用示例请参阅 [文档](https://pengline.cn/2026/04/fa51edd849b24f48b4d7fa8e27efef77/)
## 贡献指南
### 开发环境设置
```python
# 克隆仓库
git clone https://github.com/neopen/neotask.git
cd neotask
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# 安装开发依赖
pip install -e ".[dev]"
# 运行测试
pytest tests/
# 查看测试覆盖率
pytest --cov=neotask tests/
# 运行特定模块测试
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -v
```
### 项目结构
```
neotask/
├── api/ # TaskPool, TaskScheduler, WorkflowEngine
├── core/ # 生命周期、队列、Worker
├── workflow/ # DAG引擎、条件分支、并行执行
├── executor/ # 任务执行器
├── scheduler/ # 定时任务
├── storage/ # 内存/SQLite/Redis
├── event/ # 事件总线
└── models/ # 数据模型
```
### 贡献流程
欢迎提交 Issue 和 Pull Request
1. Fork 项目
2. 创建特性分支 (`git checkout -b feature/amazing`)
3. 提交更改 (`git commit -m 'Add amazing feature'`)
4. 推送分支 (`git push origin feature/amazing`)
5. 提交 Pull Request
### 代码规范
- 遵循 [PEP 8](https://peps.python.org/pep-0008/) 代码风格
- 添加适当的 [类型注解](https://peps.python.org/pep-0484/)
- 编写单元测试覆盖新功能(覆盖率 ≥ 80%)
- 更新相关文档和示例代码
- 提交信息遵循 [Conventional Commits](https://www.conventionalcommits.org/)
### 测试要求
```sh
# 运行所有测试
pytest tests/
# 运行特定模块测试
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -v
pytest tests/test_workflow.py -v
# 运行手动测试
python examples/01_simple.py
python examples/05_webui.py
```
## 问题反馈
- **提交 Issue**:https://github.com/neopen/neotask/issues
- **功能建议**:使用 Enhancement 标签
- **Bug 报告**:使用 Bug 标签并提供复现步骤
- **安全漏洞**:请直接发送邮件至作者邮箱
------
## 许可证
MIT License © 2026 NeoPen
------
## 致谢
感谢所有贡献者和开源社区的支持。
------
## 联系方式
- 项目主页:https://github.com/neopen/neotask
- 作者:NeoPen
- 邮箱:helpenx@gmail.com
- 文档:https://pengline.cn/2026/04/243d5a536d064df59c2ec8668362b8b5