From 419b5a184dead2fc17ee5a7066b31a7aa29f44ac Mon Sep 17 00:00:00 2001 From: Devil-Ryu <974529965@qq.com> Date: Tue, 7 Feb 2023 17:31:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9task=E8=A7=86=E5=9B=BE?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E4=BF=9D=E5=AD=98=E9=97=AE=E9=A2=98=EF=BC=8C=E4=BF=AE=E6=94=B9?= =?UTF-8?q?get=5Fjobs=E5=87=BD=E6=95=B0=E5=8F=AA=E8=83=BD=E7=9C=8B?= =?UTF-8?q?=E5=88=B0=E7=89=B9=E5=AE=9A=E5=89=8D=E7=BC=80=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=8C=E5=A2=9E=E5=8A=A0=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=EF=BC=8C=E6=89=A7=E8=A1=8C=E4=BB=BB=E5=8A=A1=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dvadmin_celery/tasks.py | 10 ++- dvadmin_celery/urls.py | 1 + dvadmin_celery/views/crontab_schedule.py | 15 +++- dvadmin_celery/views/task.py | 109 +++++++++++++++-------- 4 files changed, 97 insertions(+), 38 deletions(-) diff --git a/dvadmin_celery/tasks.py b/dvadmin_celery/tasks.py index ebecf4a..6fc9531 100644 --- a/dvadmin_celery/tasks.py +++ b/dvadmin_celery/tasks.py @@ -7,6 +7,7 @@ @Remark: """ from application.celery import app +import time @app.task def task__one(): @@ -19,10 +20,17 @@ def task__two(): @app.task -def task__three(): +def task__three(*args, **kargs): print(33333) @app.task def task__four(): print(44444) + +@app.task +def function_add(*args, **kwargs): + print(args) + time.sleep(4) + print(kwargs) + return {'args': args, 'kwargs': kwargs} diff --git a/dvadmin_celery/urls.py b/dvadmin_celery/urls.py index f3b4dff..238b585 100644 --- a/dvadmin_celery/urls.py +++ b/dvadmin_celery/urls.py @@ -20,6 +20,7 @@ urlpatterns = [ path('task/job_list/',CeleryTaskModelViewSet.as_view({'get':'job_list'})), path('task/update_status//',CeleryTaskModelViewSet.as_view({'post':'update_status'})), + path('task/run_task//',CeleryTaskModelViewSet.as_view({'post':'run_task'})), path('task_detail/',CeleryTaskDetailViewSet.as_view({'get':'list'})), ] urlpatterns += router.urls diff --git a/dvadmin_celery/views/crontab_schedule.py b/dvadmin_celery/views/crontab_schedule.py index 2bdb6d3..6663e6b 100644 --- a/dvadmin_celery/views/crontab_schedule.py +++ b/dvadmin_celery/views/crontab_schedule.py @@ -1,13 +1,26 @@ -from django_celery_beat.models import CrontabSchedule +from django_celery_beat.models import CrontabSchedule, cronexp +from rest_framework import serializers from dvadmin.utils.serializers import CustomModelSerializer from dvadmin.utils.viewset import CustomModelViewSet +CrontabSchedule.__str__ = lambda self : '{0} {1} {2} {3} {4} {5}'.format( + cronexp(self.minute), cronexp(self.hour), + cronexp(self.day_of_month), cronexp(self.month_of_year), + cronexp(self.day_of_week), str(self.timezone) + ) class CrontabScheduleSerializer(CustomModelSerializer): + + + crontab_str = serializers.SerializerMethodField() + class Meta: model = CrontabSchedule exclude = ('timezone',) + + def get_crontab_str(self, instance): + return instance.__str__() class CrontabScheduleModelViewSet(CustomModelViewSet): diff --git a/dvadmin_celery/views/task.py b/dvadmin_celery/views/task.py index e26f906..7603faf 100644 --- a/dvadmin_celery/views/task.py +++ b/dvadmin_celery/views/task.py @@ -11,10 +11,14 @@ import uuid from django_celery_beat.models import PeriodicTask, CrontabSchedule, cronexp from rest_framework.exceptions import APIException from rest_framework import serializers - +from django_restql.fields import DynamicSerializerMethodField from dvadmin.utils.serializers import CustomModelSerializer from dvadmin.utils.viewset import CustomModelViewSet from dvadmin.utils.json_response import SuccessResponse, ErrorResponse +from dvadmin_celery.views.crontab_schedule import CrontabScheduleSerializer +from dvadmin_celery import tasks +from kombu.utils.json import loads +from celery import current_app CrontabSchedule.__str__ = lambda self : '{0} {1} {2} {3} {4} {5}'.format( @@ -32,17 +36,18 @@ def get_job_list(): task_dict_list = [] for app in settings.INSTALLED_APPS: try: - exec(f""" -from {app} import tasks -for ele in [i for i in dir(tasks) if i.startswith('task__')]: - task_dict = dict() - task_dict['label'] = '{app}.tasks.' + ele - task_dict['value'] = '{app}.tasks.' + ele - task_list.append('{app}.tasks.' + ele) - task_dict_list.append(task_dict) - """) - except ImportError : + exec(f"""from {app} import tasks""") + except ImportError: pass + + current_app.loader.import_default_modules() + task_list = list(sorted(name for name in current_app.tasks.keys() if not name.startswith('celery.'))) + + for task in task_list: + task_dict = dict() + task_dict['label'] = task + task_dict['value'] = task + task_dict_list.append(task_dict) return {'task_list': task_list, 'task_dict_list': task_dict_list} @@ -53,9 +58,9 @@ def CronSlpit(cron): # "second":cron[0], "minute":cron[0], "hour":cron[1], - "day":cron[2], - "month":cron[3], - "week":cron[4] + "day_of_week":cron[2], + "day_of_month":cron[3], + "month_of_year":cron[4] } return result @@ -67,11 +72,18 @@ class CeleryCrontabScheduleSerializer(CustomModelSerializer): class PeriodicTasksSerializer(CustomModelSerializer): - crontab = serializers.StringRelatedField(read_only=True) + crontab_str = serializers.SerializerMethodField() class Meta: model = PeriodicTask fields = '__all__' + + def get_crontab_str(self, instance): + cron = instance.crontab + serializers = CrontabScheduleSerializer(cron) + # return [{"id": instance.crontab.id,"value":instance.crontab.__str__()}] + # return serializers.data + return instance.crontab.__str__() class CeleryTaskModelViewSet(CustomModelViewSet): @@ -81,7 +93,7 @@ class CeleryTaskModelViewSet(CustomModelViewSet): queryset = PeriodicTask.objects.exclude(name="celery.backend_cleanup") serializer_class = PeriodicTasksSerializer - filter_fields = ['name', 'task', 'enabled'] + filter_fields = ['name', 'task', 'enabled', 'crontab'] # permission_classes = [] # authentication_classes = [] @@ -102,38 +114,30 @@ class CeleryTaskModelViewSet(CustomModelViewSet): def create(self, request, *args, **kwargs): body_data = request.data.copy() - cron = body_data.get('crontab') - cron_lisr = CronSlpit(cron) - minute = cron_lisr["minute"] - hour = cron_lisr["hour"] - day = cron_lisr["day"] - month = cron_lisr["month"] - week = cron_lisr["week"] - cron_data = { - 'minute': minute, - 'hour': hour, - 'day_of_week': week, - 'day_of_month': day, - 'month_of_year': month - } + cron = body_data.get('crontab_str') + cron_data = CronSlpit(cron) + task = body_data.get('task') result = None task_list = get_job_list() task_list = task_list.get('task_list') if task in task_list: - # job_name = task.split('.')[-1] - # path_name = '.'.join(task.split('.')[:-1]) # 添加crontab serializer = CeleryCrontabScheduleSerializer(data=cron_data, request=request) serializer.is_valid(raise_exception=True) - self.perform_create(serializer) + print(cron_data) + schedule, created = CrontabSchedule.objects.get_or_create(**cron_data) # 手动创建crontab,避免重复创建 + # 添加任务 - body_data['crontab'] = serializer.data.get('id') + body_data['crontab'] = schedule.id body_data['enabled'] = False - serializer = self.get_serializer(data=body_data, request=request) - res = serializer.is_valid() + serializer = self.get_serializer(data=body_data, request=request, context={"crontab": serializer.data.get('id')}) + + + res = serializer.is_valid(raise_exception=True) + serializer.errors if not res: raise APIException({"msg":f"添加失败,已经有一个名为 {body_data['name']} 的任务了"}, code=4000) self.perform_create(serializer) @@ -141,6 +145,24 @@ class CeleryTaskModelViewSet(CustomModelViewSet): return SuccessResponse(msg="添加成功", data=result) else: return ErrorResponse(msg="添加失败,没有该任务", data=None) + + def update(self, request, *args, **kwargs): + instance = self.get_object() + body_data = request.data + instance.name = body_data.get('name') # 更新名称 + instance.task = body_data.get('task') # 更新任务 + instance.args = body_data.get('args') # 更新参数 + instance.kwargs = body_data.get('kwargs') # 更新参数 + + # 更新crontab + cron = body_data.get('crontab_str') + cron_data = CronSlpit(cron) + schedule, created = CrontabSchedule.objects.get_or_create(**cron_data) # 手动创建crontab,避免重复创建 + instance.crontab = schedule + + instance.description = body_data.get('description') # 更新描述 + instance.save() + return SuccessResponse(msg="修改成功", data=None) def destroy(self, request, *args, **kwargs): """删除定时任务""" @@ -155,3 +177,18 @@ class CeleryTaskModelViewSet(CustomModelViewSet): instance.enabled = body_data.get('enabled') instance.save() return SuccessResponse(msg="修改成功", data=None) + + def run_task(self, request, * args, ** kwargs): + # 加载环境 + from application import settings + for app in settings.INSTALLED_APPS: + try: + exec(f"""from {app} import tasks""") + except ImportError: + pass + print("load success") + current_app.loader.import_default_modules() + instance = self.get_object() + task = current_app.tasks.get(instance.task) + task_id = task.apply_async(args=loads(instance.args), kwargs=loads(instance.kwargs), queue=instance.queue, periodic_task_name=instance.name) + return SuccessResponse(msg="Task {}({}) 运行成功".format(instance.name, instance.task), data=None) -- Gitee