Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions frontend/src/pages/DataCollection/Create/CreateTask.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ export default function CollectionTaskCreate() {
},
});
const [scheduleExpression, setScheduleExpression] = useState({
type: "once",
type: "daily",
time: "00:00",
cronExpression: "0 0 0 * * ?",
cronExpression: "0 0 * * *",
});

useEffect(() => {
Expand Down Expand Up @@ -166,6 +166,17 @@ export default function CollectionTaskCreate() {
try {
const values = await form.validateFields();
const payload = { ...newTask, ...values };
if (payload.syncMode === SyncMode.SCHEDULED) {
if (!payload.scheduleExpression) {
payload.scheduleExpression = scheduleExpression.cronExpression;
}
if (!payload.scheduleExpression) {
message.error("请输入Cron表达式");
return;
}
} else {
delete payload.scheduleExpression;
}
if (selectedTemplate?.templateContent) {
payload.config = {
...(payload.config || {}),
Expand Down
40 changes: 19 additions & 21 deletions frontend/src/pages/DataCollection/Create/SimpleCronScheduler.tsx
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
import React, { useState, useCallback } from "react";
import React, { useState, useCallback, useEffect } from "react";
import {
Card,
Radio,
Select,
Space,
Typography,
TimePicker,
Button,
Input,
Form,
} from "antd";
import type { RadioChangeEvent } from "antd";
import type { Dayjs } from "dayjs";
import dayjs from "dayjs";

const { Text } = Typography;
const { Option } = Select;

export interface SimpleCronConfig {
type: "once" | "daily" | "weekly" | "monthly";
type: "daily" | "weekly" | "monthly";
time?: string; // HH:mm 格式
weekDay?: number; // 0-6, 0 表示周日
monthDay?: number; // 1-31
Expand All @@ -32,9 +24,9 @@ interface SimpleCronSchedulerProps {
}

const defaultConfig: SimpleCronConfig = {
type: "once",
type: "daily",
time: "00:00",
cronExpression: "0 0 0 * * ?",
cronExpression: "0 0 * * *",
};

// 生成周几选项
Expand Down Expand Up @@ -71,26 +63,33 @@ const SimpleCronScheduler: React.FC<SimpleCronSchedulerProps> = ({
}) => {
const [config, setConfig] = useState<SimpleCronConfig>(value);

useEffect(() => {
setConfig(value || defaultConfig);
}, [value]);

// 更新配置并生成 cron 表达式
const updateConfig = useCallback(
(updates: Partial<SimpleCronConfig>) => {
const newConfig = { ...config, ...updates };
const [hour, minute] = (newConfig.time || "00:00").split(":");
if (newConfig.type === "weekly" && (newConfig.weekDay === undefined || newConfig.weekDay === null)) {
newConfig.weekDay = 1;
}
if (newConfig.type === "monthly" && (newConfig.monthDay === undefined || newConfig.monthDay === null)) {
newConfig.monthDay = 1;
}

// 根据不同类型生成 cron 表达式
let cronExpression = "";
switch (newConfig.type) {
case "once":
cronExpression = `0 ${minute} ${hour} * * ?`;
break;
case "daily":
cronExpression = `0 ${minute} ${hour} * * ?`;
cronExpression = `${minute} ${hour} * * *`;
break;
case "weekly":
cronExpression = `0 ${minute} ${hour} ? * ${newConfig.weekDay}`;
cronExpression = `${minute} ${hour} * * ${newConfig.weekDay}`;
break;
case "monthly":
cronExpression = `0 ${minute} ${hour} ${newConfig.monthDay} * ?`;
cronExpression = `${minute} ${hour} ${newConfig.monthDay} * *`;
break;
}

Expand All @@ -106,9 +105,9 @@ const SimpleCronScheduler: React.FC<SimpleCronSchedulerProps> = ({
const updates: Partial<SimpleCronConfig> = { type };

// 设置默认值
if (type === "weekly" && !config.weekDay) {
if (type === "weekly" && (config.weekDay === undefined || config.weekDay === null)) {
updates.weekDay = 1; // 默认周一
} else if (type === "monthly" && !config.monthDay) {
} else if (type === "monthly" && (config.monthDay === undefined || config.monthDay === null)) {
updates.monthDay = 1; // 默认每月1号
}

Expand All @@ -133,7 +132,6 @@ const SimpleCronScheduler: React.FC<SimpleCronSchedulerProps> = ({
<div className="grid grid-cols-2 gap-4">
<Form.Item label="执行周期" required>
<Select value={config.type} onChange={handleTypeChange}>
<Select.Option value="once">仅执行一次</Select.Option>
<Select.Option value="daily">每天执行</Select.Option>
<Select.Option value="weekly">每周执行</Select.Option>
<Select.Option value="monthly">每月执行</Select.Option>
Expand Down
2 changes: 0 additions & 2 deletions frontend/src/pages/DataManagement/dataset.const.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,7 @@ export function mapDataset(dataset: AnyObject): Dataset {
status: datasetStatusMap[dataset.status],
statistics: [
{ label: "文件数", value: dataset.fileCount || 0 },
{ label: "已标注", value: Math.floor(dataset.fileCount / 10) * 10},
{ label: "大小", value: formatBytes(dataset.totalSize || 0) },
{ label: "关联归集任务", value: Math.floor(dataset.fileCount / 10)},
],
lastModified: dataset.updatedAt,
};
Expand Down
10 changes: 10 additions & 0 deletions runtime/datamate-python/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
)
from .module import router
from .module.shared.schema import StandardResponse
from .module.collection.scheduler import (
start_collection_scheduler,
shutdown_collection_scheduler,
load_scheduled_collection_tasks,
)

setup_logging()
logger = get_logger(__name__)
Expand Down Expand Up @@ -56,9 +61,14 @@ def mask_db_url(url: str) -> Literal[b""] | str:
# TODO Add actual connectivity check if needed
logger.info(f"Label Studio: {settings.label_studio_base_url}")

# Collection scheduler
start_collection_scheduler()
await load_scheduled_collection_tasks()

yield

# @shutdown
shutdown_collection_scheduler()
logger.info("DataMate Python Backend shutting down ...\n\n")

# 创建FastAPI应用
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from app.db.session import get_db
from app.module.collection.client.datax_client import DataxClient
from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, converter_to_response, \
convert_for_create
convert_for_create, SyncMode
from app.module.collection.scheduler import schedule_collection_task, remove_collection_task
from app.module.collection.service.collection import CollectionTaskService
from app.module.shared.schema import StandardResponse, PaginatedData

Expand Down Expand Up @@ -61,6 +62,8 @@ async def create_task(
task = await db.execute(select(CollectionTask).where(CollectionTask.id == task.id))
task = task.scalar_one_or_none()
await db.commit()
if task and task.sync_mode == SyncMode.SCHEDULED.value and task.schedule_expression:
schedule_collection_task(task.id, task.schedule_expression)

return StandardResponse(
code=200,
Expand All @@ -70,6 +73,9 @@ async def create_task(
except HTTPException:
await db.rollback()
raise
except ValueError as e:
await db.rollback()
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
await db.rollback()
logger.error(f"Failed to create collection task: {str(e)}", e)
Expand Down Expand Up @@ -153,6 +159,7 @@ async def delete_collection_tasks(
TaskExecution.__table__.delete()
.where(TaskExecution.task_id == task_id)
)
remove_collection_task(task_id)

target_path = f"/dataset/local/{task_id}"
if os.path.exists(target_path):
Expand Down
94 changes: 94 additions & 0 deletions runtime/datamate-python/app/module/collection/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from typing import Optional

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select

from app.core.logging import get_logger
from app.db.models.data_collection import CollectionTask
from app.db.session import AsyncSessionLocal
from app.module.collection.schema.collection import SyncMode

logger = get_logger(__name__)

_scheduler: Optional[AsyncIOScheduler] = None


def start_collection_scheduler() -> AsyncIOScheduler:
global _scheduler
if _scheduler is None:
_scheduler = AsyncIOScheduler()
_scheduler.start()
logger.info("Collection scheduler started")
return _scheduler


def shutdown_collection_scheduler() -> None:
global _scheduler
if _scheduler is not None:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("Collection scheduler stopped")


def _get_scheduler() -> AsyncIOScheduler:
if _scheduler is None:
raise RuntimeError("Collection scheduler not initialized")
return _scheduler


def schedule_collection_task(task_id: str, schedule_expression: str, dataset_id: Optional[str] = None) -> None:
scheduler = _get_scheduler()
trigger = CronTrigger.from_crontab(schedule_expression)
from app.module.collection.service.collection import CollectionTaskService

scheduler.add_job(
CollectionTaskService.run_async,
trigger=trigger,
args=[task_id, dataset_id],
id=f"collection:{task_id}",
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=60,
)
logger.info(f"Scheduled collection task {task_id} with cron {schedule_expression}")


def remove_collection_task(task_id: str) -> None:
if _scheduler is None:
return
job_id = f"collection:{task_id}"
if _scheduler.get_job(job_id):
_scheduler.remove_job(job_id)
logger.info(f"Removed scheduled collection task {task_id}")


def reschedule_collection_task(task_id: str, schedule_expression: str, dataset_id: Optional[str] = None) -> None:
remove_collection_task(task_id)
schedule_collection_task(task_id, schedule_expression, dataset_id)


def validate_schedule_expression(schedule_expression: str) -> None:
CronTrigger.from_crontab(schedule_expression)


async def load_scheduled_collection_tasks() -> None:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(CollectionTask).where(
CollectionTask.sync_mode == SyncMode.SCHEDULED.value,
CollectionTask.schedule_expression.isnot(None),
)
)
tasks = result.scalars().all()

for task in tasks:
if not task.schedule_expression:
continue
try:
schedule_collection_task(task.id, task.schedule_expression)
except Exception as exc:
logger.error(f"Failed to schedule task {task.id}: {exc}")
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def converter_to_response(task: CollectionTask) -> CollectionTaskBase:
)

def convert_for_create(task: CollectionTaskCreate, task_id: str) -> CollectionTask:
schedule_expression = task.schedule_expression if task.sync_mode == SyncMode.SCHEDULED else None
return CollectionTask(
id=task_id,
name=task.name,
Expand All @@ -92,7 +93,7 @@ def convert_for_create(task: CollectionTaskCreate, task_id: str) -> CollectionTa
template_id=task.template_id,
target_path=f"/dataset/local/{task_id}",
config=json.dumps(task.config.dict()),
schedule_expression=task.schedule_expression,
schedule_expression=schedule_expression,
status=TaskStatus.PENDING.name
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from app.db.session import AsyncSessionLocal
from app.module.collection.client.datax_client import DataxClient
from app.module.collection.schema.collection import SyncMode, create_execute_record
from app.module.collection.scheduler import validate_schedule_expression
from app.module.dataset.service.service import Service
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
from app.module.shared.common.lineage import LineageService
Expand Down Expand Up @@ -51,6 +52,10 @@ async def create_task(self, task: CollectionTask, dataset: Dataset) -> Collectio
task.status = TaskStatus.RUNNING.name
await self.db.commit()
asyncio.create_task(CollectionTaskService.run_async(task.id, dataset.id if dataset else None))
elif task.sync_mode == SyncMode.SCHEDULED:
if not task.schedule_expression:
raise ValueError("schedule_expression is required for scheduled tasks")
validate_schedule_expression(task.schedule_expression)
return task

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion runtime/datamate-python/deploy/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fi

# 启动应用
echo "=========================================="
echo "Starting Label Studio Adapter..."
echo "Starting DataMate Backend Service..."
echo "Host: ${HOST:-0.0.0.0}"
echo "Port: ${PORT:-18000}"
echo "Debug: ${DEBUG:-false}"
Expand Down
Loading
Loading