Skip to content

Commit c0e6fcc

Browse files
authored
fix: use apscheduler to fix the scheduled scheduling of collection tasks (#308)
1 parent 2efb1be commit c0e6fcc

File tree

11 files changed

+204
-29
lines changed

11 files changed

+204
-29
lines changed

frontend/src/pages/DataCollection/Create/CreateTask.tsx

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ export default function CollectionTaskCreate() {
5959
},
6060
});
6161
const [scheduleExpression, setScheduleExpression] = useState({
62-
type: "once",
62+
type: "daily",
6363
time: "00:00",
64-
cronExpression: "0 0 0 * * ?",
64+
cronExpression: "0 0 * * *",
6565
});
6666

6767
useEffect(() => {
@@ -166,6 +166,17 @@ export default function CollectionTaskCreate() {
166166
try {
167167
const values = await form.validateFields();
168168
const payload = { ...newTask, ...values };
169+
if (payload.syncMode === SyncMode.SCHEDULED) {
170+
if (!payload.scheduleExpression) {
171+
payload.scheduleExpression = scheduleExpression.cronExpression;
172+
}
173+
if (!payload.scheduleExpression) {
174+
message.error("请输入Cron表达式");
175+
return;
176+
}
177+
} else {
178+
delete payload.scheduleExpression;
179+
}
169180
if (selectedTemplate?.templateContent) {
170181
payload.config = {
171182
...(payload.config || {}),

frontend/src/pages/DataCollection/Create/SimpleCronScheduler.tsx

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,16 @@
1-
import React, { useState, useCallback } from "react";
1+
import React, { useState, useCallback, useEffect } from "react";
22
import {
3-
Card,
4-
Radio,
53
Select,
64
Space,
7-
Typography,
85
TimePicker,
96
Button,
10-
Input,
117
Form,
128
} from "antd";
13-
import type { RadioChangeEvent } from "antd";
149
import type { Dayjs } from "dayjs";
1510
import dayjs from "dayjs";
1611

17-
const { Text } = Typography;
18-
const { Option } = Select;
19-
2012
export interface SimpleCronConfig {
21-
type: "once" | "daily" | "weekly" | "monthly";
13+
type: "daily" | "weekly" | "monthly";
2214
time?: string; // HH:mm 格式
2315
weekDay?: number; // 0-6, 0 表示周日
2416
monthDay?: number; // 1-31
@@ -32,9 +24,9 @@ interface SimpleCronSchedulerProps {
3224
}
3325

3426
const defaultConfig: SimpleCronConfig = {
35-
type: "once",
27+
type: "daily",
3628
time: "00:00",
37-
cronExpression: "0 0 0 * * ?",
29+
cronExpression: "0 0 * * *",
3830
};
3931

4032
// 生成周几选项
@@ -71,26 +63,33 @@ const SimpleCronScheduler: React.FC<SimpleCronSchedulerProps> = ({
7163
}) => {
7264
const [config, setConfig] = useState<SimpleCronConfig>(value);
7365

66+
useEffect(() => {
67+
setConfig(value || defaultConfig);
68+
}, [value]);
69+
7470
// 更新配置并生成 cron 表达式
7571
const updateConfig = useCallback(
7672
(updates: Partial<SimpleCronConfig>) => {
7773
const newConfig = { ...config, ...updates };
7874
const [hour, minute] = (newConfig.time || "00:00").split(":");
75+
if (newConfig.type === "weekly" && (newConfig.weekDay === undefined || newConfig.weekDay === null)) {
76+
newConfig.weekDay = 1;
77+
}
78+
if (newConfig.type === "monthly" && (newConfig.monthDay === undefined || newConfig.monthDay === null)) {
79+
newConfig.monthDay = 1;
80+
}
7981

8082
// 根据不同类型生成 cron 表达式
8183
let cronExpression = "";
8284
switch (newConfig.type) {
83-
case "once":
84-
cronExpression = `0 ${minute} ${hour} * * ?`;
85-
break;
8685
case "daily":
87-
cronExpression = `0 ${minute} ${hour} * * ?`;
86+
cronExpression = `${minute} ${hour} * * *`;
8887
break;
8988
case "weekly":
90-
cronExpression = `0 ${minute} ${hour} ? * ${newConfig.weekDay}`;
89+
cronExpression = `${minute} ${hour} * * ${newConfig.weekDay}`;
9190
break;
9291
case "monthly":
93-
cronExpression = `0 ${minute} ${hour} ${newConfig.monthDay} * ?`;
92+
cronExpression = `${minute} ${hour} ${newConfig.monthDay} * *`;
9493
break;
9594
}
9695

@@ -106,9 +105,9 @@ const SimpleCronScheduler: React.FC<SimpleCronSchedulerProps> = ({
106105
const updates: Partial<SimpleCronConfig> = { type };
107106

108107
// 设置默认值
109-
if (type === "weekly" && !config.weekDay) {
108+
if (type === "weekly" && (config.weekDay === undefined || config.weekDay === null)) {
110109
updates.weekDay = 1; // 默认周一
111-
} else if (type === "monthly" && !config.monthDay) {
110+
} else if (type === "monthly" && (config.monthDay === undefined || config.monthDay === null)) {
112111
updates.monthDay = 1; // 默认每月1号
113112
}
114113

@@ -133,7 +132,6 @@ const SimpleCronScheduler: React.FC<SimpleCronSchedulerProps> = ({
133132
<div className="grid grid-cols-2 gap-4">
134133
<Form.Item label="执行周期" required>
135134
<Select value={config.type} onChange={handleTypeChange}>
136-
<Select.Option value="once">仅执行一次</Select.Option>
137135
<Select.Option value="daily">每天执行</Select.Option>
138136
<Select.Option value="weekly">每周执行</Select.Option>
139137
<Select.Option value="monthly">每月执行</Select.Option>

frontend/src/pages/DataManagement/dataset.const.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,7 @@ export function mapDataset(dataset: AnyObject): Dataset {
211211
status: datasetStatusMap[dataset.status],
212212
statistics: [
213213
{ label: "文件数", value: dataset.fileCount || 0 },
214-
{ label: "已标注", value: Math.floor(dataset.fileCount / 10) * 10},
215214
{ label: "大小", value: formatBytes(dataset.totalSize || 0) },
216-
{ label: "关联归集任务", value: Math.floor(dataset.fileCount / 10)},
217215
],
218216
lastModified: dataset.updatedAt,
219217
};

runtime/datamate-python/app/main.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
)
2121
from .module import router
2222
from .module.shared.schema import StandardResponse
23+
from .module.collection.scheduler import (
24+
start_collection_scheduler,
25+
shutdown_collection_scheduler,
26+
load_scheduled_collection_tasks,
27+
)
2328

2429
setup_logging()
2530
logger = get_logger(__name__)
@@ -56,9 +61,14 @@ def mask_db_url(url: str) -> Literal[b""] | str:
5661
# TODO Add actual connectivity check if needed
5762
logger.info(f"Label Studio: {settings.label_studio_base_url}")
5863

64+
# Collection scheduler
65+
start_collection_scheduler()
66+
await load_scheduled_collection_tasks()
67+
5968
yield
6069

6170
# @shutdown
71+
shutdown_collection_scheduler()
6272
logger.info("DataMate Python Backend shutting down ...\n\n")
6373

6474
# 创建FastAPI应用

runtime/datamate-python/app/module/collection/interface/collection.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from app.db.session import get_db
1515
from app.module.collection.client.datax_client import DataxClient
1616
from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, converter_to_response, \
17-
convert_for_create
17+
convert_for_create, SyncMode
18+
from app.module.collection.scheduler import schedule_collection_task, remove_collection_task
1819
from app.module.collection.service.collection import CollectionTaskService
1920
from app.module.shared.schema import StandardResponse, PaginatedData
2021

@@ -61,6 +62,8 @@ async def create_task(
6162
task = await db.execute(select(CollectionTask).where(CollectionTask.id == task.id))
6263
task = task.scalar_one_or_none()
6364
await db.commit()
65+
if task and task.sync_mode == SyncMode.SCHEDULED.value and task.schedule_expression:
66+
schedule_collection_task(task.id, task.schedule_expression)
6467

6568
return StandardResponse(
6669
code=200,
@@ -70,6 +73,9 @@ async def create_task(
7073
except HTTPException:
7174
await db.rollback()
7275
raise
76+
except ValueError as e:
77+
await db.rollback()
78+
raise HTTPException(status_code=400, detail=str(e))
7379
except Exception as e:
7480
await db.rollback()
7581
logger.error(f"Failed to create collection task: {str(e)}", e)
@@ -153,6 +159,7 @@ async def delete_collection_tasks(
153159
TaskExecution.__table__.delete()
154160
.where(TaskExecution.task_id == task_id)
155161
)
162+
remove_collection_task(task_id)
156163

157164
target_path = f"/dataset/local/{task_id}"
158165
if os.path.exists(target_path):
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
from __future__ import annotations
2+
3+
from typing import Optional
4+
5+
from apscheduler.schedulers.asyncio import AsyncIOScheduler
6+
from apscheduler.triggers.cron import CronTrigger
7+
from sqlalchemy import select
8+
9+
from app.core.logging import get_logger
10+
from app.db.models.data_collection import CollectionTask
11+
from app.db.session import AsyncSessionLocal
12+
from app.module.collection.schema.collection import SyncMode
13+
14+
logger = get_logger(__name__)
15+
16+
_scheduler: Optional[AsyncIOScheduler] = None
17+
18+
19+
def start_collection_scheduler() -> AsyncIOScheduler:
20+
global _scheduler
21+
if _scheduler is None:
22+
_scheduler = AsyncIOScheduler()
23+
_scheduler.start()
24+
logger.info("Collection scheduler started")
25+
return _scheduler
26+
27+
28+
def shutdown_collection_scheduler() -> None:
29+
global _scheduler
30+
if _scheduler is not None:
31+
_scheduler.shutdown(wait=False)
32+
_scheduler = None
33+
logger.info("Collection scheduler stopped")
34+
35+
36+
def _get_scheduler() -> AsyncIOScheduler:
37+
if _scheduler is None:
38+
raise RuntimeError("Collection scheduler not initialized")
39+
return _scheduler
40+
41+
42+
def schedule_collection_task(task_id: str, schedule_expression: str, dataset_id: Optional[str] = None) -> None:
43+
scheduler = _get_scheduler()
44+
trigger = CronTrigger.from_crontab(schedule_expression)
45+
from app.module.collection.service.collection import CollectionTaskService
46+
47+
scheduler.add_job(
48+
CollectionTaskService.run_async,
49+
trigger=trigger,
50+
args=[task_id, dataset_id],
51+
id=f"collection:{task_id}",
52+
replace_existing=True,
53+
max_instances=1,
54+
coalesce=True,
55+
misfire_grace_time=60,
56+
)
57+
logger.info(f"Scheduled collection task {task_id} with cron {schedule_expression}")
58+
59+
60+
def remove_collection_task(task_id: str) -> None:
61+
if _scheduler is None:
62+
return
63+
job_id = f"collection:{task_id}"
64+
if _scheduler.get_job(job_id):
65+
_scheduler.remove_job(job_id)
66+
logger.info(f"Removed scheduled collection task {task_id}")
67+
68+
69+
def reschedule_collection_task(task_id: str, schedule_expression: str, dataset_id: Optional[str] = None) -> None:
70+
remove_collection_task(task_id)
71+
schedule_collection_task(task_id, schedule_expression, dataset_id)
72+
73+
74+
def validate_schedule_expression(schedule_expression: str) -> None:
75+
CronTrigger.from_crontab(schedule_expression)
76+
77+
78+
async def load_scheduled_collection_tasks() -> None:
79+
async with AsyncSessionLocal() as session:
80+
result = await session.execute(
81+
select(CollectionTask).where(
82+
CollectionTask.sync_mode == SyncMode.SCHEDULED.value,
83+
CollectionTask.schedule_expression.isnot(None),
84+
)
85+
)
86+
tasks = result.scalars().all()
87+
88+
for task in tasks:
89+
if not task.schedule_expression:
90+
continue
91+
try:
92+
schedule_collection_task(task.id, task.schedule_expression)
93+
except Exception as exc:
94+
logger.error(f"Failed to schedule task {task.id}: {exc}")

runtime/datamate-python/app/module/collection/schema/collection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def converter_to_response(task: CollectionTask) -> CollectionTaskBase:
8484
)
8585

8686
def convert_for_create(task: CollectionTaskCreate, task_id: str) -> CollectionTask:
87+
schedule_expression = task.schedule_expression if task.sync_mode == SyncMode.SCHEDULED else None
8788
return CollectionTask(
8889
id=task_id,
8990
name=task.name,
@@ -92,7 +93,7 @@ def convert_for_create(task: CollectionTaskCreate, task_id: str) -> CollectionTa
9293
template_id=task.template_id,
9394
target_path=f"/dataset/local/{task_id}",
9495
config=json.dumps(task.config.dict()),
95-
schedule_expression=task.schedule_expression,
96+
schedule_expression=schedule_expression,
9697
status=TaskStatus.PENDING.name
9798
)
9899

runtime/datamate-python/app/module/collection/service/collection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from app.db.session import AsyncSessionLocal
1313
from app.module.collection.client.datax_client import DataxClient
1414
from app.module.collection.schema.collection import SyncMode, create_execute_record
15+
from app.module.collection.scheduler import validate_schedule_expression
1516
from app.module.dataset.service.service import Service
1617
from app.module.shared.schema import TaskStatus, NodeType, EdgeType
1718
from app.module.shared.common.lineage import LineageService
@@ -51,6 +52,10 @@ async def create_task(self, task: CollectionTask, dataset: Dataset) -> Collectio
5152
task.status = TaskStatus.RUNNING.name
5253
await self.db.commit()
5354
asyncio.create_task(CollectionTaskService.run_async(task.id, dataset.id if dataset else None))
55+
elif task.sync_mode == SyncMode.SCHEDULED:
56+
if not task.schedule_expression:
57+
raise ValueError("schedule_expression is required for scheduled tasks")
58+
validate_schedule_expression(task.schedule_expression)
5459
return task
5560

5661
@staticmethod

runtime/datamate-python/deploy/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ fi
77

88
# 启动应用
99
echo "=========================================="
10-
echo "Starting Label Studio Adapter..."
10+
echo "Starting DataMate Backend Service..."
1111
echo "Host: ${HOST:-0.0.0.0}"
1212
echo "Port: ${PORT:-18000}"
1313
echo "Debug: ${DEBUG:-false}"

0 commit comments

Comments
 (0)