From 4bb0502be0df0c922daa2a5c6e760d6981c27ae5 Mon Sep 17 00:00:00 2001 From: uname <2986773479@qq.com> Date: Tue, 3 Feb 2026 09:36:34 +0800 Subject: [PATCH] refactor: refactor the scheduled running logic and extract public classes --- runtime/datamate-python/app/main.py | 13 ++- .../module/collection/interface/collection.py | 2 +- .../collection/{scheduler.py => schedule.py} | 41 +++------ .../module/collection/service/collection.py | 2 +- .../app/module/shared/schedule/__init__.py | 3 + .../app/module/shared/schedule/scheduler.py | 92 +++++++++++++++++++ 6 files changed, 116 insertions(+), 37 deletions(-) rename runtime/datamate-python/app/module/collection/{scheduler.py => schedule.py} (67%) create mode 100644 runtime/datamate-python/app/module/shared/schedule/__init__.py create mode 100644 runtime/datamate-python/app/module/shared/schedule/scheduler.py diff --git a/runtime/datamate-python/app/main.py b/runtime/datamate-python/app/main.py index fbc66fe8..5b08df56 100644 --- a/runtime/datamate-python/app/main.py +++ b/runtime/datamate-python/app/main.py @@ -20,11 +20,8 @@ ) from .module import router from .module.shared.schema import StandardResponse -from .module.collection.scheduler import ( - start_collection_scheduler, - shutdown_collection_scheduler, - load_scheduled_collection_tasks, -) +from .module.collection.schedule import load_scheduled_collection_tasks, set_collection_scheduler +from .module.shared.schedule import Scheduler setup_logging() logger = get_logger(__name__) @@ -62,13 +59,15 @@ def mask_db_url(url: str) -> Literal[b""] | str: logger.info(f"Label Studio: {settings.label_studio_base_url}") # Collection scheduler - start_collection_scheduler() + collection_scheduler = Scheduler(name="collection scheduler") + collection_scheduler.start() + set_collection_scheduler(collection_scheduler) await load_scheduled_collection_tasks() yield # @shutdown - shutdown_collection_scheduler() + collection_scheduler.shutdown() logger.info("DataMate Python Backend shutting down ...\n\n") # 创建FastAPI应用 diff --git a/runtime/datamate-python/app/module/collection/interface/collection.py b/runtime/datamate-python/app/module/collection/interface/collection.py index d447f9c4..42a83f38 100644 --- a/runtime/datamate-python/app/module/collection/interface/collection.py +++ b/runtime/datamate-python/app/module/collection/interface/collection.py @@ -15,7 +15,7 @@ from app.module.collection.client.datax_client import DataxClient from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, converter_to_response, \ convert_for_create, SyncMode -from app.module.collection.scheduler import schedule_collection_task, remove_collection_task +from app.module.collection.schedule import schedule_collection_task, remove_collection_task from app.module.collection.service.collection import CollectionTaskService from app.module.shared.schema import StandardResponse, PaginatedData diff --git a/runtime/datamate-python/app/module/collection/scheduler.py b/runtime/datamate-python/app/module/collection/schedule.py similarity index 67% rename from runtime/datamate-python/app/module/collection/scheduler.py rename to runtime/datamate-python/app/module/collection/schedule.py index 72e4b19c..4a2d1863 100644 --- a/runtime/datamate-python/app/module/collection/scheduler.py +++ b/runtime/datamate-python/app/module/collection/schedule.py @@ -2,38 +2,25 @@ from typing import Optional -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.triggers.cron import CronTrigger from sqlalchemy import select from app.core.logging import get_logger from app.db.models.data_collection import CollectionTask from app.db.session import AsyncSessionLocal from app.module.collection.schema.collection import SyncMode +from app.module.shared.schedule import Scheduler logger = get_logger(__name__) -_scheduler: Optional[AsyncIOScheduler] = None +_scheduler: Optional[Scheduler] = None -def start_collection_scheduler() -> AsyncIOScheduler: +def set_collection_scheduler(scheduler: Scheduler) -> None: global _scheduler - if _scheduler is None: - _scheduler = AsyncIOScheduler() - _scheduler.start() - logger.info("Collection scheduler started") - return _scheduler + _scheduler = scheduler -def shutdown_collection_scheduler() -> None: - global _scheduler - if _scheduler is not None: - _scheduler.shutdown(wait=False) - _scheduler = None - logger.info("Collection scheduler stopped") - - -def _get_scheduler() -> AsyncIOScheduler: +def _get_scheduler() -> Scheduler: if _scheduler is None: raise RuntimeError("Collection scheduler not initialized") return _scheduler @@ -41,14 +28,13 @@ def _get_scheduler() -> AsyncIOScheduler: def schedule_collection_task(task_id: str, schedule_expression: str, dataset_id: Optional[str] = None) -> None: scheduler = _get_scheduler() - trigger = CronTrigger.from_crontab(schedule_expression) from app.module.collection.service.collection import CollectionTaskService - scheduler.add_job( - CollectionTaskService.run_async, - trigger=trigger, + scheduler.add_cron_job( + job_id=f"collection:{task_id}", + cron_expression=schedule_expression, + func=CollectionTaskService.run_async, args=[task_id, dataset_id], - id=f"collection:{task_id}", replace_existing=True, max_instances=1, coalesce=True, @@ -58,11 +44,10 @@ def schedule_collection_task(task_id: str, schedule_expression: str, dataset_id: def remove_collection_task(task_id: str) -> None: - if _scheduler is None: - return + scheduler = _get_scheduler() job_id = f"collection:{task_id}" - if _scheduler.get_job(job_id): - _scheduler.remove_job(job_id) + if scheduler.has_job(job_id): + scheduler.remove_job(job_id) logger.info(f"Removed scheduled collection task {task_id}") @@ -72,7 +57,7 @@ def reschedule_collection_task(task_id: str, schedule_expression: str, dataset_i def validate_schedule_expression(schedule_expression: str) -> None: - CronTrigger.from_crontab(schedule_expression) + Scheduler.validate_cron_expression(schedule_expression) async def load_scheduled_collection_tasks() -> None: diff --git a/runtime/datamate-python/app/module/collection/service/collection.py b/runtime/datamate-python/app/module/collection/service/collection.py index 46ca07d6..f76275e2 100644 --- a/runtime/datamate-python/app/module/collection/service/collection.py +++ b/runtime/datamate-python/app/module/collection/service/collection.py @@ -12,7 +12,7 @@ from app.db.session import AsyncSessionLocal from app.module.collection.client.datax_client import DataxClient from app.module.collection.schema.collection import SyncMode, create_execute_record -from app.module.collection.scheduler import validate_schedule_expression +from app.module.collection.schedule import validate_schedule_expression from app.module.dataset.service.service import Service from app.module.shared.schema import TaskStatus, NodeType, EdgeType from app.module.shared.common.lineage import LineageService diff --git a/runtime/datamate-python/app/module/shared/schedule/__init__.py b/runtime/datamate-python/app/module/shared/schedule/__init__.py new file mode 100644 index 00000000..b6488a86 --- /dev/null +++ b/runtime/datamate-python/app/module/shared/schedule/__init__.py @@ -0,0 +1,3 @@ +from .scheduler import Scheduler + +__all__ = ["Scheduler"] diff --git a/runtime/datamate-python/app/module/shared/schedule/scheduler.py b/runtime/datamate-python/app/module/shared/schedule/scheduler.py new file mode 100644 index 00000000..1bf161b3 --- /dev/null +++ b/runtime/datamate-python/app/module/shared/schedule/scheduler.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +from typing import Any, Callable, Optional + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger + +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +class Scheduler: + def __init__(self, name: str = "scheduler") -> None: + self._name = name + self._scheduler: Optional[AsyncIOScheduler] = None + + def start(self) -> AsyncIOScheduler: + if self._scheduler is None: + self._scheduler = AsyncIOScheduler() + self._scheduler.start() + logger.info(f"{self._name} started") + return self._scheduler + + def shutdown(self) -> None: + if self._scheduler is not None: + self._scheduler.shutdown(wait=False) + self._scheduler = None + logger.info(f"{self._name} stopped") + + def add_cron_job( + self, + job_id: str, + cron_expression: str, + func: Callable[..., Any], + args: Optional[list[Any]] = None, + kwargs: Optional[dict[str, Any]] = None, + **job_kwargs: Any, + ) -> None: + scheduler = self._get_scheduler() + trigger = CronTrigger.from_crontab(cron_expression) + scheduler.add_job( + func, + trigger=trigger, + args=args or [], + kwargs=kwargs or {}, + id=job_id, + replace_existing=job_kwargs.pop("replace_existing", True), + max_instances=job_kwargs.pop("max_instances", 1), + coalesce=job_kwargs.pop("coalesce", True), + misfire_grace_time=job_kwargs.pop("misfire_grace_time", 60), + **job_kwargs, + ) + + def remove_job(self, job_id: str) -> None: + if self._scheduler is None: + return + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + + def has_job(self, job_id: str) -> bool: + if self._scheduler is None: + return False + return self._scheduler.get_job(job_id) is not None + + def reschedule_job( + self, + job_id: str, + cron_expression: str, + func: Callable[..., Any], + args: Optional[list[Any]] = None, + kwargs: Optional[dict[str, Any]] = None, + **job_kwargs: Any, + ) -> None: + self.remove_job(job_id) + self.add_cron_job( + job_id=job_id, + cron_expression=cron_expression, + func=func, + args=args, + kwargs=kwargs, + **job_kwargs, + ) + + @staticmethod + def validate_cron_expression(cron_expression: str) -> None: + CronTrigger.from_crontab(cron_expression) + + def _get_scheduler(self) -> AsyncIOScheduler: + if self._scheduler is None: + raise RuntimeError(f"{self._name} not initialized") + return self._scheduler