Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions runtime/datamate-python/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@
)
from .module import router
from .module.shared.schema import StandardResponse
from .module.collection.scheduler import (
start_collection_scheduler,
shutdown_collection_scheduler,
load_scheduled_collection_tasks,
)
from .module.collection.schedule import load_scheduled_collection_tasks, set_collection_scheduler
from .module.shared.schedule import Scheduler

setup_logging()
logger = get_logger(__name__)
Expand Down Expand Up @@ -62,13 +59,15 @@ def mask_db_url(url: str) -> Literal[b""] | str:
logger.info(f"Label Studio: {settings.label_studio_base_url}")

# Collection scheduler
start_collection_scheduler()
collection_scheduler = Scheduler(name="collection scheduler")
collection_scheduler.start()
set_collection_scheduler(collection_scheduler)
await load_scheduled_collection_tasks()

yield

# @shutdown
shutdown_collection_scheduler()
collection_scheduler.shutdown()
logger.info("DataMate Python Backend shutting down ...\n\n")

# 创建FastAPI应用
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from app.module.collection.client.datax_client import DataxClient
from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, converter_to_response, \
convert_for_create, SyncMode
from app.module.collection.scheduler import schedule_collection_task, remove_collection_task
from app.module.collection.schedule import schedule_collection_task, remove_collection_task
from app.module.collection.service.collection import CollectionTaskService
from app.module.shared.schema import StandardResponse, PaginatedData

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,39 @@

from typing import Optional

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select

from app.core.logging import get_logger
from app.db.models.data_collection import CollectionTask
from app.db.session import AsyncSessionLocal
from app.module.collection.schema.collection import SyncMode
from app.module.shared.schedule import Scheduler

logger = get_logger(__name__)

_scheduler: Optional[AsyncIOScheduler] = None
_scheduler: Optional[Scheduler] = None


def start_collection_scheduler() -> AsyncIOScheduler:
def set_collection_scheduler(scheduler: Scheduler) -> None:
global _scheduler
if _scheduler is None:
_scheduler = AsyncIOScheduler()
_scheduler.start()
logger.info("Collection scheduler started")
return _scheduler
_scheduler = scheduler


def shutdown_collection_scheduler() -> None:
global _scheduler
if _scheduler is not None:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("Collection scheduler stopped")


def _get_scheduler() -> AsyncIOScheduler:
def _get_scheduler() -> Scheduler:
if _scheduler is None:
raise RuntimeError("Collection scheduler not initialized")
return _scheduler


def schedule_collection_task(task_id: str, schedule_expression: str, dataset_id: Optional[str] = None) -> None:
scheduler = _get_scheduler()
trigger = CronTrigger.from_crontab(schedule_expression)
from app.module.collection.service.collection import CollectionTaskService

scheduler.add_job(
CollectionTaskService.run_async,
trigger=trigger,
scheduler.add_cron_job(
job_id=f"collection:{task_id}",
cron_expression=schedule_expression,
func=CollectionTaskService.run_async,
args=[task_id, dataset_id],
id=f"collection:{task_id}",
replace_existing=True,
max_instances=1,
coalesce=True,
Expand All @@ -58,11 +44,10 @@ def schedule_collection_task(task_id: str, schedule_expression: str, dataset_id:


def remove_collection_task(task_id: str) -> None:
if _scheduler is None:
return
scheduler = _get_scheduler()
job_id = f"collection:{task_id}"
if _scheduler.get_job(job_id):
_scheduler.remove_job(job_id)
if scheduler.has_job(job_id):
scheduler.remove_job(job_id)
logger.info(f"Removed scheduled collection task {task_id}")


Expand All @@ -72,7 +57,7 @@ def reschedule_collection_task(task_id: str, schedule_expression: str, dataset_i


def validate_schedule_expression(schedule_expression: str) -> None:
CronTrigger.from_crontab(schedule_expression)
Scheduler.validate_cron_expression(schedule_expression)


async def load_scheduled_collection_tasks() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from app.db.session import AsyncSessionLocal
from app.module.collection.client.datax_client import DataxClient
from app.module.collection.schema.collection import SyncMode, create_execute_record
from app.module.collection.scheduler import validate_schedule_expression
from app.module.collection.schedule import validate_schedule_expression
from app.module.dataset.service.service import Service
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
from app.module.shared.common.lineage import LineageService
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .scheduler import Scheduler

__all__ = ["Scheduler"]
92 changes: 92 additions & 0 deletions runtime/datamate-python/app/module/shared/schedule/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from __future__ import annotations

from typing import Any, Callable, Optional

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

from app.core.logging import get_logger

logger = get_logger(__name__)


class Scheduler:
def __init__(self, name: str = "scheduler") -> None:
self._name = name
self._scheduler: Optional[AsyncIOScheduler] = None

def start(self) -> AsyncIOScheduler:
if self._scheduler is None:
self._scheduler = AsyncIOScheduler()
self._scheduler.start()
logger.info(f"{self._name} started")
return self._scheduler

def shutdown(self) -> None:
if self._scheduler is not None:
self._scheduler.shutdown(wait=False)
self._scheduler = None
logger.info(f"{self._name} stopped")

def add_cron_job(
self,
job_id: str,
cron_expression: str,
func: Callable[..., Any],
args: Optional[list[Any]] = None,
kwargs: Optional[dict[str, Any]] = None,
**job_kwargs: Any,
) -> None:
scheduler = self._get_scheduler()
trigger = CronTrigger.from_crontab(cron_expression)
scheduler.add_job(
func,
trigger=trigger,
args=args or [],
kwargs=kwargs or {},
id=job_id,
replace_existing=job_kwargs.pop("replace_existing", True),
max_instances=job_kwargs.pop("max_instances", 1),
coalesce=job_kwargs.pop("coalesce", True),
misfire_grace_time=job_kwargs.pop("misfire_grace_time", 60),
**job_kwargs,
)

def remove_job(self, job_id: str) -> None:
if self._scheduler is None:
return
if self._scheduler.get_job(job_id):
self._scheduler.remove_job(job_id)

def has_job(self, job_id: str) -> bool:
if self._scheduler is None:
return False
return self._scheduler.get_job(job_id) is not None

def reschedule_job(
self,
job_id: str,
cron_expression: str,
func: Callable[..., Any],
args: Optional[list[Any]] = None,
kwargs: Optional[dict[str, Any]] = None,
**job_kwargs: Any,
) -> None:
self.remove_job(job_id)
self.add_cron_job(
job_id=job_id,
cron_expression=cron_expression,
func=func,
args=args,
kwargs=kwargs,
**job_kwargs,
)

@staticmethod
def validate_cron_expression(cron_expression: str) -> None:
CronTrigger.from_crontab(cron_expression)

def _get_scheduler(self) -> AsyncIOScheduler:
if self._scheduler is None:
raise RuntimeError(f"{self._name} not initialized")
return self._scheduler
Loading