From 6618e3a563f656ef94d71e5b36c36c7df5223f59 Mon Sep 17 00:00:00 2001 From: wojiaoyishang Date: Fri, 27 Dec 2024 18:44:39 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A1=E5=88=92=E4=BB=BB=E5=8A=A1=E7=9A=84?= =?UTF-8?q?=E5=B0=9D=E8=AF=95=E6=B7=BB=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- applications/__init__.py | 5 + applications/common/tasks/events.py | 53 ++++ applications/config.py | 5 +- applications/extensions/init_apscheduler.py | 29 ++ applications/view/system/__init__.py | 3 + applications/view/system/task.py | 318 ++++++++++++++++++++ jobs.sqlite | Bin 0 -> 16384 bytes tasks/task_methods.py | 1 + tasks/test.py | 2 + templates/system/task/add.html | 163 +++++----- templates/system/task/edit.html | 136 +++++++++ templates/system/task/main.html | 170 ++++++++--- 12 files changed, 764 insertions(+), 121 deletions(-) create mode 100644 applications/common/tasks/events.py create mode 100644 applications/extensions/init_apscheduler.py create mode 100644 applications/view/system/task.py create mode 100644 jobs.sqlite create mode 100644 tasks/task_methods.py create mode 100644 tasks/test.py create mode 100644 templates/system/task/edit.html diff --git a/applications/__init__.py b/applications/__init__.py index af5bb8e..fc4de7b 100644 --- a/applications/__init__.py +++ b/applications/__init__.py @@ -5,6 +5,8 @@ from applications.config import BaseConfig from applications.extensions import init_plugs from applications.view import init_bps +from applications.extensions.init_apscheduler import init_scheduler + def create_app(): app = Flask(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) @@ -14,6 +16,9 @@ def create_app(): # 注册flask组件 init_plugs(app) + # 初始化任务调度器 + init_scheduler(app) + # 注册蓝图 init_bps(app) diff --git a/applications/common/tasks/events.py b/applications/common/tasks/events.py new file mode 100644 index 0000000..e186473 --- /dev/null +++ b/applications/common/tasks/events.py @@ -0,0 +1,53 @@ +from apscheduler.events import ( + EVENT_JOB_ADDED, + EVENT_JOB_ERROR, + EVENT_JOB_EXECUTED, + EVENT_JOB_MISSED, + EVENT_JOB_REMOVED, + EVENT_JOB_SUBMITTED, +) + +from applications.extensions.init_apscheduler import scheduler + +def job_missed(event): + """Job missed event.""" + with scheduler.app.app_context(): + print(event) # noqa: T001 + + +def job_error(event): + """Job error event.""" + with scheduler.app.app_context(): + print(event) # noqa: T001 + + +def job_executed(event): + """Job executed event.""" + with scheduler.app.app_context(): + print(event) # noqa: T001 + + +def job_added(event): + """Job added event.""" + with scheduler.app.app_context(): + print(event) # noqa: T001 + + +def job_removed(event): + """Job removed event.""" + with scheduler.app.app_context(): + print(event) # noqa: T001 + + +def job_submitted(event): + """Job scheduled to run event.""" + with scheduler.app.app_context(): + print(event) # noqa: T001 + + +scheduler.add_listener(job_missed, EVENT_JOB_MISSED) +scheduler.add_listener(job_error, EVENT_JOB_ERROR) +scheduler.add_listener(job_executed, EVENT_JOB_EXECUTED) +scheduler.add_listener(job_added, EVENT_JOB_ADDED) +scheduler.add_listener(job_removed, EVENT_JOB_REMOVED) +scheduler.add_listener(job_submitted, EVENT_JOB_SUBMITTED) diff --git a/applications/config.py b/applications/config.py index e8e3bdf..3ed6634 100644 --- a/applications/config.py +++ b/applications/config.py @@ -1,9 +1,9 @@ import logging from datetime import timedelta - # from urllib.parse import quote_plus as urlquote +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore class BaseConfig: SUPERADMIN = 'admin' @@ -47,6 +47,9 @@ class BaseConfig: # 数据库的配置信息 SQLALCHEMY_DATABASE_URI = 'sqlite:///../pear.db' # SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{MYSQL_USERNAME}:{urlquote(MYSQL_PASSWORD)}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}?charset=utf8mb4" + SCHEDULER_JOBSTORES = { + 'default': SQLAlchemyJobStore(url='sqlite:///./jobs.sqlite') # 使用 SQLite 数据库存储任务 + } # 默认日志等级 LOG_LEVEL = logging.WARN diff --git a/applications/extensions/init_apscheduler.py b/applications/extensions/init_apscheduler.py new file mode 100644 index 0000000..e025f0a --- /dev/null +++ b/applications/extensions/init_apscheduler.py @@ -0,0 +1,29 @@ +from flask import Flask +from applications.config import BaseConfig +from flask_apscheduler import APScheduler +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.executors.pool import ThreadPoolExecutor + + +# 配置 jobstores 和 executors +# jobstores = { +# 'default': SQLAlchemyJobStore(url='sqlite:///../jobs.sqlite') # 使用 SQLite 数据库存储任务 +# } + + +def init_scheduler(app: Flask): + """ + 初始化调度器并将其绑定到 Flask 应用中。 + """ + try: + jobstores = BaseConfig.SCHEDULER_JOBSTORES + executors = { + 'default': ThreadPoolExecutor(10) + } + scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors) + scheduler.start() + app.config['SCHEDULER'] = scheduler + app.logger.info("Scheduler initialized and started.") + except Exception as e: + app.logger.error(f"Scheduler initialization failed: {str(e)}") diff --git a/applications/view/system/__init__.py b/applications/view/system/__init__.py index 162c0fc..079e144 100644 --- a/applications/view/system/__init__.py +++ b/applications/view/system/__init__.py @@ -12,6 +12,7 @@ from applications.view.system.rights import bp as right_bp from applications.view.system.role import bp as role_bp from applications.view.system.user import bp as user_bp from applications.view.system.dept import bp as dept_bp +from applications.view.system.task import bp as task_bp # 创建sys system_bp = Blueprint('system', __name__, url_prefix='/system') @@ -30,5 +31,7 @@ def register_system_bps(app: Flask): system_bp.register_blueprint(passport_bp) system_bp.register_blueprint(right_bp) system_bp.register_blueprint(dept_bp) + system_bp.register_blueprint(task_bp) + app.register_blueprint(index_bp) app.register_blueprint(system_bp) diff --git a/applications/view/system/task.py b/applications/view/system/task.py new file mode 100644 index 0000000..ed12c9c --- /dev/null +++ b/applications/view/system/task.py @@ -0,0 +1,318 @@ +from flask import Blueprint, request, render_template +from flask_apscheduler.utils import job_to_dict +from applications.common.utils.rights import authorize +from applications.common.utils.http import table_api, fail_api, success_api +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.date import DateTrigger +from flask import current_app +from datetime import datetime +import os +from flask import jsonify +import importlib.util +import inspect + +try: + # 如果根目录下tasks目录不存在,则动态创建 + if not os.path.exists('tasks'): + os.makedirs('tasks') + import tasks.task_methods as tasks +except ImportError as e: + pass +bp = Blueprint('adminTask', __name__, url_prefix='/task') + + +def get_scheduler(): + return current_app.config.get('SCHEDULER') + + +# 获取tasks目录下的所有模块名称(排除__init__.py) +def get_module_names(directory): + module_names = [] + for filename in os.listdir(directory): + if filename.endswith('.py') and filename != '__init__.py' and filename != 'task_methods.py': + module_name = filename[:-3] # 去掉文件扩展名 + module_names.append(module_name) + return module_names + + +# 动态加载模块 +def load_modules_from_directory(directory): + modules = {} + for module_name in get_module_names(directory): + module_path = os.path.join(directory, f"{module_name}.py") + spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + modules[module_name] = module + return modules + + +# 获取模块中的所有内容,添加保存到新的task_methods模块中 +def get_module_contents(modules): + task_methods_path = './tasks/task_methods.py' + task_methods_spec = importlib.util.spec_from_file_location("task_methods", task_methods_path) # 获取模块名和路径 + task_methods = importlib.util.module_from_spec(task_methods_spec) # 动态加载模块 + task_methods_spec.loader.exec_module(task_methods) + return task_methods + + +# 获取模块中的所有函数, 返回字典 +# { +# 'module_name': [('function_name', function), ...], +# ... +# } +def get_module_methods(modules): + methods = {} + for module_name, module in modules.items(): + methods[module_name] = inspect.getmembers(module, inspect.isfunction) + return methods + + +# 执行指定模块中的函数 +def execute_task(task_name, a, b): + if task_name in globals(): + task_func = globals()[task_name] + task_func(a, b) + else: + print(f"任务 {task_name} 不存在") + + +# 获取任务列表 +def task_lists(): # 返回为列表['task1', 'task2'] + # 加载模块 + directory_path = './tasks' # 模块文件目录路径 + modules = load_modules_from_directory(directory_path) + methods = get_module_methods(modules) + task_lists = [] + task_methods_path = './tasks/task_methods.py' + + # 清空task_methods.py文件内容 + with open(task_methods_path, 'w', encoding='utf-8') as f: + pass + for module_name, method_list in methods.items(): + task_lists.append(module_name) + # 将模块名module_name中的函数添加到task_methods.py中 + with open(task_methods_path, 'a', encoding='utf-8') as f: + # 写入格式 from .{module_name} import {function_name} + for method in method_list: + f.write(f"from .{module_name} import {method[0]}\n") + # 判断task_lists中是否为空值 + + if len(task_lists) == 0: + task_lists.append('无') + + return task_lists # 返回为列表['task1', 'task2'] + + +@bp.get('/') +@authorize("system:task:main", log=True) +def main(): + return render_template('system/task/main.html') + + +@bp.route('/data', methods=['GET']) +@authorize("system:task:main", log=True) +def get_task(): + scheduler = get_scheduler() + jobs = scheduler.get_jobs() + jobs_list = [job_to_dict(job) for job in jobs] + return table_api(data=jobs_list, count=len(jobs_list)) + + +@bp.route('/query', methods=['POST', 'GET']) +@authorize("system:task:main", log=True) +def query(): + if request.method == 'GET': + selectname = request.args.get('selectname') + if request.method == 'POST': + data = request.get_json() + selectname = data.get('selectname') + # 查询参数构造 + scheduler = get_scheduler() + jobs = scheduler.get_jobs() + jobs_list = [job_to_dict(job) for job in jobs] + # 模糊查找jobs_list中id、name包含selectname的元素 + if selectname: + jobs_list = [job for job in jobs_list if + selectname.lower() in job['id'].lower() or selectname.lower() in job['name'].lower()] + # print(data) + # 返回api + return table_api(data=jobs_list, count=len(jobs_list)) + + +@bp.get('/add') +@authorize("system:task:add", log=True) +def add(): + return render_template('system/task/add.html', task_list=task_lists()) + + +@bp.post('/save') +@authorize("system:task:add", log=True) +def save(): + data = request.json + _id = data.get("id") # 任务id + name = data.get("name") # 任务名称 + type = data.get("type") # 任务类型 + functions = data.get("functions") # 任务函数 + datetime = data.get("datetime") # 触发时间 + interval = data.get("interval") # 间隔时间 + cron = data.get("cron") # cron表达式 + + # Validate function existence + if not hasattr(tasks, functions): + return fail_api(msg="任务函数不存在") + func = getattr(tasks, functions) + # Define job parameters + job_params = { + 'func': func, + 'id': _id, + 'name': name, + 'args': (_id, name), + 'replace_existing': True + } + + if type == 'date': + # Use DateTrigger for date type tasks + job_params.update({ + 'trigger': DateTrigger(run_date=datetime), + 'value': datetime + }) + + elif type == 'interval': + if not interval: + return fail_api(msg="缺少间隔时间参数") + # IntervalTrigger expects numerical values for the interval + try: + interval_seconds = int(interval) + except ValueError: + return fail_api(msg="间隔时间格式错误") + job_params.update({ + 'trigger': IntervalTrigger(seconds=interval_seconds) + }) + elif type == 'cron': + if not cron: + return fail_api(msg="缺少 Cron 表达式参数") + try: + # 这里可以添加对 cron 表达式格式的 validation + job_params.update({ + 'trigger': CronTrigger.from_crontab(cron) + }) + except ValueError as e: + return fail_api(msg=f"Cron 表达式错误: {str(e)}") + else: + return fail_api(msg="不支持的触发器类型") + + # Get scheduler instance + scheduler = get_scheduler() + # Add job to the scheduler + scheduler.add_job(**job_params) + return success_api() + + +@bp.put('/update') +@authorize("system:task:add", log=True) +def update(): + data = request.json + _id = data.get("id") + name = data.get("name") + type = data.get("type") + functions = data.get("functions") + datetime = data.get("datetime") + interval = data.get("interval") + cron = data.get("cron") + + # Validate function existence + if not hasattr(tasks, functions): + return fail_api(msg="任务函数不存在") + func = getattr(tasks, functions) + + # Define job parameters + job_params = { + 'func': func, + 'id': _id, + 'name': name, + 'args': (_id, name), + 'replace_existing': True + } + + if type == 'date': + # Use DateTrigger for date type tasks + job_params.update({ + 'trigger': DateTrigger(run_date=datetime), + 'value': datetime + }) + + elif type == 'interval': + if not interval: + return fail_api(msg="缺少间隔时间参数") + # IntervalTrigger expects numerical values for the interval + try: + interval_seconds = int(interval) + except ValueError: + return fail_api(msg="间隔时间格式错误") + job_params.update({ + 'trigger': IntervalTrigger(seconds=interval_seconds) + }) + elif type == 'cron': + if not cron: + return fail_api(msg="缺少 Cron 表达式参数") + # CronTrigger expects a cron expression + job_params.update({ + 'trigger': CronTrigger.from_crontab(cron) + }) + else: + return fail_api(msg="不支持的触发器类型") + + # Get scheduler instance + scheduler = get_scheduler() + # Add job to the scheduler + scheduler.add_job(**job_params) + return success_api() + + +@bp.get('/edit/') +@authorize("system:task:edit", log=True) +def edit(_id=""): + scheduler = get_scheduler() + job = scheduler.get_job(str(_id)) + if not job: + return fail_api(msg="任务不存在") + + job_dict = job_to_dict(job) + functions = job_dict['func'].split(':')[1] + job_dict['functions'] = functions + + return render_template('system/task/edit.html', task_list=task_lists(), job=job_dict) + + +@bp.put('/enable') +def enable(): + _id = request.json.get('id') + if _id: + scheduler = get_scheduler() + job = scheduler.get_job(str(_id)) + print(job) + scheduler.resume_job(str(_id)) + return success_api(msg="启动成功") + return fail_api(msg="数据错误") + + +@bp.put('/disable') +def dis_enable(): + _id = request.json.get('id') + if _id: + scheduler = get_scheduler() + job = scheduler.get_job(str(_id)) + print(job) + scheduler.pause_job(str(_id)) + return success_api(msg="暂停成功") + return fail_api(msg="数据错误") + + +@bp.delete('/remove/') +@authorize("system:task:remove", log=True) +def remove_job(_id): + scheduler = get_scheduler() + scheduler.remove_job(str(_id)) + return success_api(msg="删除成功") \ No newline at end of file diff --git a/jobs.sqlite b/jobs.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..786ca0ac01af03fb4ed1ac7b4b709ede7bd2cc67 GIT binary patch literal 16384 zcmeI&PiqrF6aetqY})*(?UsVnKUntQp#d8Sp7f&I8fi#OG>u}tER*acnRdI0yOSCU z1*4^{u(ysMz)#@^5d`rn=tofW;G39+CKbKcx3Jl_Z+3QeewoYLWbI*r3ue|^{uUMH z4KhnKjodL!LdXK_QP_{S2m~k3wd01r92ZD>`}MT(jp#E0F(!?7aKH@(Pyhu`00mG0 z1yBG5Pyhu`;QtWl&FZoF(vsH8aL-{~-nD5vs5P0>ahY##w5oyav97TFj%N$L#l~Bs zE9I(|K{FkSIcc$easRvb5RAj zp#Tb?01BW03ZMWApa2S>01BKPfxRhh!iXJCW~`5UM1==!2tU0|efa+Q#l)L0gP(O@ zInYK9Rx^-$KgjLM#W1$b{D8MSnb*PycVwt#WEib?yqXNp3mR+&X~6<&>wuk>$7C3_j1_QteT{n!+HUy_Ceu$R zp#b-V?LZX{H&6#P;>qwxt|Vh$&t0CoIzO@VR5ma4`^A~({O(l@-5OG2N*q_R0|vG zf8)p5S>ZSn6hHwKKmim$0Te(16hHwKKmim$fuTTDHxij){r_{A#{dOT00mG01yBG5 bPyhu`00mG01yJA&2u$g*g!;#au9L(s?yg+8 literal 0 HcmV?d00001 diff --git a/tasks/task_methods.py b/tasks/task_methods.py new file mode 100644 index 0000000..2c720ba --- /dev/null +++ b/tasks/task_methods.py @@ -0,0 +1 @@ +from .test import task_print diff --git a/tasks/test.py b/tasks/test.py new file mode 100644 index 0000000..691ae7f --- /dev/null +++ b/tasks/test.py @@ -0,0 +1,2 @@ +def task_print(*args, **kwargs): + print("这是间隔任务") diff --git a/templates/system/task/add.html b/templates/system/task/add.html index 4a16604..a04a105 100644 --- a/templates/system/task/add.html +++ b/templates/system/task/add.html @@ -1,3 +1,4 @@ + @@ -7,56 +8,57 @@
-
-
- -
- -
+
+ +
+
-
- -
- -
+
+
+ +
+
-
- -
- -
+
+
+ +
+ +
+
+
+ +
+
-
- -
- -
+
+ @@ -64,7 +66,7 @@
- @@ -82,59 +84,62 @@ let form = layui.form let $ = layui.jquery var laydate = layui.laydate; - - laydate.render({ - elem: '#datetime' - , type: 'datetime' - }); - laydate.render({ - elem: '#time' - , type: 'time' - }); - + let MODULE_PATH = "{{ url_for('system.adminTask.main') }}" + // laydate.render({ + // elem: '#datetime', + // type: 'datetime' + // }); form.on('select(type)', function (data) { if (data.value === 'date') { - $('#timeItem').hide() - $('#datetimeItem').show() + $('#intervalItem').hide(); + $('#cronItem').hide(); + $('#datetimeItem').show(); + laydate.render({ + elem: '#datetime' + , type: 'datetime' + }); } else if (data.value === 'interval') { - console.log('interval') - $('#datetimeItem').hide() - $('#timeItem').show() - $('#timeItem').val('') + $('#datetimeItem').hide(); + $('#cronItem').hide(); + $('#intervalItem').show(); } else if (data.value === 'cron') { - console.log('cron') - $('#datetimeItem').hide() - $('#timeItem').show() - $('#timeItem').val('') - + $('#datetimeItem').hide(); + $('#intervalItem').hide(); + $('#cronItem').show(); } + }); - }) - + form.on('submit(task-save)', function (data) { + // Prepare data based on type + let requestData = { ...data.field }; + if (requestData.type === 'interval') { + requestData.interval = $('#interval').val(); + } else if (requestData.type === 'cron') { + requestData.cron = $('#cron').val(); + } - form.on('submit(role-save)', function (data) { $.ajax({ - url: '/admin/task/save', - data: JSON.stringify(data.field), + url: MODULE_PATH + 'save', + data: JSON.stringify(requestData), dataType: 'json', contentType: 'application/json', type: 'post', success: function (result) { if (result.success) { layer.msg(result.msg, {icon: 1, time: 1000}, function () { - parent.layer.close(parent.layer.getFrameIndex(window.name))//关闭当前页 - parent.layui.table.reload('role-table') - }) + parent.layer.close(parent.layer.getFrameIndex(window.name)) //关闭当前页 + parent.layui.table.reload('task-table') + }); } else { - layer.msg(result.msg, {icon: 2, time: 1000}) + layer.msg(result.msg, {icon: 2, time: 1000}); } } - }) - return false - }) - }) + }); + return false; + }); + }); \ No newline at end of file diff --git a/templates/system/task/edit.html b/templates/system/task/edit.html new file mode 100644 index 0000000..53d27fa --- /dev/null +++ b/templates/system/task/edit.html @@ -0,0 +1,136 @@ + + + + + 任务新增 + {% include 'system/common/header.html' %} + + +
+
+
+ +
+ +
+
+
+ +
+ +
+
+ +
+ +
+ +
+
+
+ +
+ +
+
+
+ +
+ +
+
+ + +
+
+
+
+
+
+ + +
+
+ +{% include 'system/common/footer.html' %} + \ No newline at end of file diff --git a/templates/system/task/main.html b/templates/system/task/main.html index 12a8293..37de8ca 100644 --- a/templates/system/task/main.html +++ b/templates/system/task/main.html @@ -1,18 +1,86 @@ + 任务管理 - {% include 'system/common/header.html' %} + {% include 'system/common/header.html' %} + - +
+
+
+
+ +
+ +
+ + +
+
+
+
-
+
+
+
+ +
+
+

Crontab 表达式格式

+
+  *  *  *  *  *
+  │ │ │ │ │
+  │ │ │ │ └─── 周 (0 - 7) (周日 0 或 7)
+  │ │ │ └───── 月 (1 - 12)
+  │ │ └─────── 日 (1 - 31)
+  │ └───────── 时 (0 - 23)
+  └─────────── 分 (0 - 59)
+
+

各字段的合法值

+
    +
  • 分钟(0-59): 表示每小时的第几分钟。
  • +
  • 小时(0-23): 表示每天的第几小时。
  • +
  • 日(1-31): 表示每月的第几天。
  • +
  • 月(1-12): 表示月份。
  • +
  • 周(0-7): 表示星期几,其中 0 和 7 都表示星期日。
  • +
+

特殊符号和用法

+
    +
  • 星号(*): 表示该字段可以是任何值。
  • +
  • 逗号(,): 用于指定多个值。例如,1,2,3 表示 1,2 和 3。
  • +
  • 连字符(-): 用于指定一个范围。例如,1-5 表示 1 到 5。
  • +
  • 斜杠(/): 用于表示步长。例如,*/5 表示每隔 5 个单位。
  • +
+

示例

+
    +
  • 每分钟执行一次: cron = "* * * * *"
  • +
  • 每天中午 12 点执行: cron = "0 12 * * *"
  • +
  • 每周一到周五的早上 9 点执行: cron = "0 9 * * 1-5"
  • +
  • 每月的第一天执行: cron = "0 0 1 * *"
  • +
- - - - + + {% include 'system/common/footer.html' %} -- Gitee