163 lines
5.5 KiB
Python
163 lines
5.5 KiB
Python
import atexit
|
|
import datetime
|
|
import json
|
|
import random
|
|
from enum import Enum
|
|
from apscheduler.jobstores.base import ConflictingIdError
|
|
from core.scheduler import Scheduler
|
|
from core.mongo import get_database as get_mongo
|
|
from application.settings import MONGO_DB_NAME, MONGO_DB_URL, REDIS_DB_URL, SUBSCRIBE, SCHEDULER_TASK, \
|
|
SCHEDULER_TASK_RECORD
|
|
from core.redis import get_database as get_redis
|
|
from core.logger import logger
|
|
from core.redis.redis_manage import RedisManage
|
|
|
|
|
|
class ScheduledTask:
|
|
|
|
class JobExecStrategy(Enum):
|
|
interval = "interval"
|
|
date = "date"
|
|
cron = "cron"
|
|
once = "once"
|
|
|
|
def __init__(self):
|
|
self.mongo = None
|
|
self.scheduler = None
|
|
self.rd = None
|
|
|
|
def add_job(self, exec_strategy: str, job_params: dict) -> None:
|
|
"""
|
|
添加定时任务
|
|
:param exec_strategy: 执行策略
|
|
:param job_params: 执行参数
|
|
:return:
|
|
"""
|
|
name = job_params.get("name", None)
|
|
error_info = None
|
|
try:
|
|
if exec_strategy == self.JobExecStrategy.interval.value:
|
|
self.scheduler.add_interval_job(**job_params)
|
|
elif exec_strategy == self.JobExecStrategy.cron.value:
|
|
self.scheduler.add_cron_job(**job_params)
|
|
elif exec_strategy == self.JobExecStrategy.date.value:
|
|
self.scheduler.add_date_job(**job_params)
|
|
elif exec_strategy == self.JobExecStrategy.once.value:
|
|
# 这种方式会自动执行事件监听器,用于保存执行任务完成后的日志
|
|
job_params["name"] = f"{name}-temp-{random.randint(1000, 9999)}"
|
|
self.scheduler.add_date_job(**job_params, expression=datetime.datetime.now())
|
|
else:
|
|
raise ValueError("无效的触发器")
|
|
except ConflictingIdError as e:
|
|
# 任务编号已存在,重复添加报错
|
|
error_info = "任务编号已存在"
|
|
except ValueError as e:
|
|
error_info = e.__str__()
|
|
|
|
if error_info:
|
|
logger.error(f"任务编号:{name},报错:{error_info}")
|
|
self.error_record(name, error_info)
|
|
|
|
def error_record(self, name: str, error_info: str) -> None:
|
|
"""
|
|
添加任务失败记录,并且将任务状态改为 False
|
|
:param name: 任务编号
|
|
:param error_info: 报错信息
|
|
:return:
|
|
"""
|
|
try:
|
|
self.mongo.put_data(SCHEDULER_TASK, name, {"is_active": False})
|
|
task = self.mongo.get_data(SCHEDULER_TASK, name)
|
|
# 执行你想要在任务执行前执行的代码
|
|
result = {
|
|
"job_id": name,
|
|
"job_class": task.get("job_class", None),
|
|
"name": task.get("name", None),
|
|
"group": task.get("group", None),
|
|
"exec_strategy": task.get("exec_strategy", None),
|
|
"expression": task.get("expression", None),
|
|
"start_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"process_time": 0,
|
|
"retval": "任务添加失败",
|
|
"exception": error_info,
|
|
"traceback": None
|
|
}
|
|
self.mongo.create_data(SCHEDULER_TASK_RECORD, result)
|
|
except ValueError as e:
|
|
logger.error(f"任务编号:{name}, 报错:{e}")
|
|
|
|
def run(self) -> None:
|
|
"""
|
|
启动监听订阅消息(阻塞)
|
|
:return:
|
|
"""
|
|
self.start_mongo()
|
|
self.start_scheduler()
|
|
self.start_redis()
|
|
|
|
assert isinstance(self.rd, RedisManage)
|
|
|
|
pubsub = self.rd.subscribe(SUBSCRIBE)
|
|
|
|
logger.info("已成功启动程序,等待接收消息...")
|
|
print("已成功启动程序,等待接收消息...")
|
|
|
|
# 处理接收到的消息
|
|
for message in pubsub.listen():
|
|
if message['type'] == 'message':
|
|
data = json.loads(message['data'].decode('utf-8'))
|
|
operation = data.get("operation")
|
|
task = data.get("task")
|
|
content = f"接收到任务:任务操作方式({operation}),任务详情:{task}"
|
|
logger.info(content)
|
|
print(content)
|
|
getattr(self, operation)(**task)
|
|
else:
|
|
print("意外", message)
|
|
|
|
def start_mongo(self) -> None:
|
|
"""
|
|
启动 mongo
|
|
:return:
|
|
"""
|
|
self.mongo = get_mongo()
|
|
self.mongo.connect_to_database(MONGO_DB_URL, MONGO_DB_NAME)
|
|
|
|
def start_scheduler(self) -> None:
|
|
"""
|
|
启动定时任务
|
|
:return:
|
|
"""
|
|
self.scheduler = Scheduler()
|
|
self.scheduler.start()
|
|
print("Scheduler 启动成功")
|
|
|
|
def start_redis(self) -> None:
|
|
"""
|
|
启动 redis
|
|
:return:
|
|
"""
|
|
self.rd = get_redis()
|
|
self.rd.connect_to_database(REDIS_DB_URL)
|
|
|
|
def close(self) -> None:
|
|
"""
|
|
# pycharm 执行停止,该函数无法正常被执行,怀疑是因为阻塞导致或 pycharm 的强制退出导致
|
|
# 报错导致得退出,会被执行
|
|
关闭程序
|
|
:return:
|
|
"""
|
|
self.mongo.close_database_connection()
|
|
if self.scheduler:
|
|
self.scheduler.shutdown()
|
|
if self.rd:
|
|
self.rd.close_database_connection()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main = ScheduledTask()
|
|
atexit.register(main.close)
|
|
main.run()
|
|
|