代码拉取完成,页面将自动刷新
#!/usr/bin/env python
import os
from datetime import timedelta
if __name__ == '__main__':
# 使用django配置文件进行设置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'devops.settings')
# 让django初始化
import django
django.setup()
from redismultibeat import RedisMultiScheduler
from devops.celery import app
schduler = RedisMultiScheduler(app=app)
for task in schduler.iter_tasks():
print(task)
# print('remove----------------------', end='')
# print(schduler.remove('task_check_scheduler'))
# for task in schduler.iter_tasks():
# print(task)
#
# print(schduler.remove('task_check_scheduler'))
#
# print('add----------------------', end='')
# print(schduler.add(**{
# 'name': 'task_check_scheduler_cron_2',
# 'task': 'tasks.tasks.task_check_scheduler',
# 'schedule': timedelta(seconds=7200),
# "args": (None, 1, 3),
# }))
# for task in schduler.iter_tasks():
# print(task)
#
# print('modify----------------------', end='')
# print(schduler.modify(**{
# 'name': 'task_check_scheduler_cron_2',
# 'task': 'tasks.tasks.task_check_scheduler',
# 'schedule': timedelta(seconds=1600),
# "args": (None, 1, 3),
# }))
# for task in schduler.iter_tasks():
# print(task)
#
# print(schduler.task('task_check_scheduler'))
#
# print(schduler.remove_all())
#
# for task in schduler.iter_tasks():
# print(task)
#
print(schduler.remove_all())
from redis import Redis
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。