# snail-job-python
**Repository Path**: opensnail/snail-job-python
## Basic Information
- **Project Name**: snail-job-python
- **Description**: 🔥🔥🔥 灵活,可靠和快速的分布式任务重试和分布式任务调度平台
- **Primary Language**: Python
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: https://snailjob.opensnail.com/
- **GVP Project**: No
## Statistics
- **Stars**: 16
- **Forks**: 4
- **Created**: 2024-07-10
- **Last Updated**: 2025-06-07
## Categories & Tags
**Categories**: Uncategorized
**Tags**: 分布式定时任务, Python, 脚本
## README
🔥🔥🔥 灵活,可靠和快速的分布式任务重试和分布式任务调度平台
> ✅️ 可重放,可管控、为提高分布式业务系统一致性的分布式任务重试平台
> ✅️ 支持秒级、可中断、可编排的高性能分布式任务调度平台
# 简介
> SnailJob 是一个灵活、可靠且高效的分布式任务重试和任务调度平台。其核心采用分区模式实现,具备高度可伸缩性和容错性的分布式系统。拥有完善的权限管理、强大的告警监控功能和友好的界面交互。欢迎大家接入并使用。
## snail-job-python
snail-job 项目的 python 客户端。[snail-job项目 java 后端](https://gitee.com/aizuda/snail-job)
Snail Job Python客户端主打的是“原汁原昧”, 无需嵌套在其他语言环境中; 具备与SnailJob的Java客户端Job模块一样的能力包括(集群、广播、静态分片、Map、MapReuce、DAG工作流、实时日志等功能),而 `xxl-job`、`PowerJob` 等其他的任务调度系统都是通过 Java 客户端使用 `Runtime` 执行 `Python` 脚本, 那么会有如下几个问题:
1. 需要运行在Java环境中,即耗内存和又显得笨重
2. 不方便编写复杂的 Python 脚本
3. Java 客户端通过 Python 命令执行脚本,需要系统全局安装脚本的第三方依赖
4. 代码可维护性和可调试比较差
Snail Job Python 客户端可以直接对接 SnailJob 服务器,实现定时任务调度,并上报日志。Python 客户端当前仍不支持`重试任务`,也没有支持计划。
## 开始使用
```shell
# 复制 `.env.example` 为 `.env`
cp .env.example .env # windows命令为 copy
# 创建虚拟环境
python -m venv venv
# 安装依赖
pip install -r requirements.txt
# 启动程序
python main.py
```
登录后台,能看到对应host-id 为 `py-xxxxxx` 的客户端
**注意: snail-job-python 支持 `pip` 包安装,包名为`snail-job-python`**
### 示例
#### 定时任务
```python
from snailjob import *
@job("testJobExecutor") # 1. testJobExecutor 为执行器名称
def test_job_executor(args: JobArgs) -> ExecuteResult:
SnailLog.REMOTE.info(f"job_params: {args.job_params}")
return ExecuteResult.success() # 2. 返回执行结果
if __name__ == "__main__":
ExecutorManager.register(test_job_executor) # 3. 注册执行器
client_main() # 4. 执行客户端主函数
```
新建定时任务, 执行器类型选择【Python】,执行器名称填入【testJobExecutor】
#### 动态分片
```python
from snailjob import *
testMyMapExecutor = MapExecutor("testMyMapExecutor") # 1. 定义 MapExecutor 变量
@testMyMapExecutor.map() # 2. 定义 ROOT_MAP 阶段任务
def testMyMapExecutor_rootMap(args: MapArgs):
assert args.task_name == ROOT_MAP
return mr_do_map(["1", "2", "3", "4"], "TWO_MAP")
@testMyMapExecutor.map("TWO_MAP") # 3. 定义 TWO_MAP 阶段任务
def testMyMapExecutor_twoMap(args: MapArgs):
return ExecuteResult.success(args.map_result)
if __name__ == "__main__":
ExecutorManager.register(testMyMapExecutor) # 4. 注册执行器
client_main()
```
#### MapReduce
```python
from snailjob import *
testMapReduceJobExecutor = MapReduceExecutor("testMapReduceJobExecutor") # 1. 定义 MapReduceExecutor 变量
@testMapReduceJobExecutor.map() # 2. 定义 ROOT_MAP 阶段任务
def testMapReduceJobExecutor_rootMap(args: MapArgs):
return mr_do_map(["1", "2", "3", "4", "5", "6"], "MONTH_MAP") # 3. 上报分片信息
@testMapReduceJobExecutor.map("MONTH_MAP") # 4. 定义 ROOT_MAP 阶段任务
def testMapReduceJobExecutor_monthMap(args: MapArgs):
return ExecuteResult.success(int(args.map_result) * 2)
@testMapReduceJobExecutor.reduce() # 5. 定义 reduce 阶段任务
def testMapReduceJobExecutor_reduce(args: ReduceArgs):
return ExecuteResult.success(sum([int(x) for x in args.map_result]))
@testMapReduceJobExecutor.merge() # 6. 定义 merge 阶段任务
def testMapReduceJobExecutor_merge(args: MergeReduceArgs):
return ExecuteResult.success(sum([int(x) for x in args.reduces]))
if __name__ == "__main__":
ExecutorManager.register(testMyMapExecutor) # 7. 注册执行器
client_main()
```
#### 响应停止事件
```python
@job("testJobExecutor")
def test_job_executor(args: JobArgs) -> ExecuteResult:
for i in range(40):
if ThreadPoolCache.event_is_set(args.task_batch_id): # 1. 判断当前任务批次是否被终止
SnailLog.REMOTE.info("任务已经被中断,立即返回")
return ExecuteResult.failure()
time.sleep(1)
return ExecuteResult.success()
```
#### 工作流、静态分片与普通定时任务类似,不做赘述
### gRPC
开发者工具
```shell
pip install grpcio-tools==1.66.2
cd snailjob/grpc/
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. *.proto
```
HACK, 需要手动修改自动生成的文件 `snailjob/grpc/snailjob_pb2_grpc.py`
```diff
- import snailjob_pb2 as snailjob__pb2
+ from . import snailjob_pb2 as snailjob__pb2
```