Ai
1 Star 1 Fork 3

jkey666/devops

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
test.py 1.57 KB
一键复制 编辑 原始数据 按行查看 历史
leffss 提交于 2020-04-09 11:00 +08:00 . optimization many
#!/usr/bin/env python
import os
from datetime import timedelta
if __name__ == '__main__':
# 使用django配置文件进行设置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'devops.settings')
# 让django初始化
import django
django.setup()
from redismultibeat import RedisMultiScheduler
from devops.celery import app
schduler = RedisMultiScheduler(app=app)
for task in schduler.iter_tasks():
print(task)
# print('remove----------------------', end='')
# print(schduler.remove('task_check_scheduler'))
# for task in schduler.iter_tasks():
# print(task)
#
# print(schduler.remove('task_check_scheduler'))
#
# print('add----------------------', end='')
# print(schduler.add(**{
# 'name': 'task_check_scheduler_cron_2',
# 'task': 'tasks.tasks.task_check_scheduler',
# 'schedule': timedelta(seconds=7200),
# "args": (None, 1, 3),
# }))
# for task in schduler.iter_tasks():
# print(task)
#
# print('modify----------------------', end='')
# print(schduler.modify(**{
# 'name': 'task_check_scheduler_cron_2',
# 'task': 'tasks.tasks.task_check_scheduler',
# 'schedule': timedelta(seconds=1600),
# "args": (None, 1, 3),
# }))
# for task in schduler.iter_tasks():
# print(task)
#
# print(schduler.task('task_check_scheduler'))
#
# print(schduler.remove_all())
#
# for task in schduler.iter_tasks():
# print(task)
#
print(schduler.remove_all())
from redis import Redis
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/jkey666/devops.git
git@gitee.com:jkey666/devops.git
jkey666
devops
devops
master

搜索帮助