代码拉取完成,页面将自动刷新
同步操作将从 zhongYeHai/test-platform-api 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# -*- coding: utf-8 -*-
import datetime
import requests
from flask import request
from flask.views import MethodView
from flask_apscheduler import APScheduler
from apps.config.model_factory import Config
from config import _job_server_port, _main_server_host
from utils.view import restful
from utils.parse.parse_cron import parse_cron
from apps import create_app
job = create_app()
# 注册并启动定时任务
scheduler = APScheduler()
scheduler.init_app(job)
scheduler.start()
def request_run_task_api(task_id, task_type, skip_holiday=1):
""" 调执行任务接口 """
job.logger.info(f'{"*" * 20} 开始触发执行定时任务 {"*" * 20}')
if skip_holiday:
today = datetime.datetime.now().strftime("%m-%d")
with create_app().app_context():
holiday_list = Config.get_first(name="holiday_list").value
if today in holiday_list:
job.logger.info(f'skip_holiday跳过执行')
return
if isinstance(task_id, str) and task_id.startswith('cron'): # 系统定时任务
api_addr = '/system/job/run'
else: # 自动化测试定时任务
api_addr = f'/{task_type}-test/task/run'
request_url = f'{_main_server_host}/api{api_addr}'
request_data = {
"id": task_id,
"func_name": task_id,
"trigger_type": "cron"
}
re = requests.post(url=request_url, json=request_data)
job.logger.info(f'触发参数:\nurl: {_main_server_host}/api{api_addr}\n请求参数: {request_data}')
job.logger.info(f'{"*" * 20} 定时任务触发完毕 {"*" * 20}')
job.logger.info(f'{"*" * 20} 触发响应为:{re.json()} {"*" * 20}')
class JobStatus(MethodView):
""" 任务状态修改 """
def post(self):
""" 添加定时任务 """
task, task_type = request.json.get('task'), request.json.get('type')
with job.db.auto_commit():
scheduler.add_job(
func=request_run_task_api, # 异步执行任务
trigger="cron",
misfire_grace_time=60,
coalesce=False,
kwargs={"task_id": task["id"], "task_type": task_type, "skip_holiday": task.get('skip_holiday')},
id=f'{task_type}_{str(task["id"])}',
**parse_cron(task["cron"])
)
return restful.success(f'定时任务启动成功')
def delete(self):
""" 删除定时任务 """
task_type, task_id = request.json.get('type'), request.json.get('task_id')
with job.db.auto_commit():
scheduler.remove_job(f'{task_type}_{str(task_id)}') # 移除任务
return restful.success(f'任务禁用成功')
job.add_url_rule('/api/job/status', view_func=JobStatus.as_view('jobStatus'))
if __name__ == '__main__':
job.run(host='0.0.0.0', port=_job_server_port)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。