diff --git a/packages/cz-cli/src/commands/datasource.ts b/packages/cz-cli/src/commands/datasource.ts index 04e08016c..8d8626ed3 100644 --- a/packages/cz-cli/src/commands/datasource.ts +++ b/packages/cz-cli/src/commands/datasource.ts @@ -139,6 +139,138 @@ export async function resolveDatasource(sc: StudioConfig, nameOrId: string): Pro } } +// --------------------------------------------------------------------------- +// CDC prerequisite check — shared helper used by check-cdc command and task commands +// --------------------------------------------------------------------------- +export interface CdcPrereqCheck { + name: string + required: string + actual: string + pass: boolean +} + +export interface CdcPrereqResult { + ok: boolean + message: string + checks: CdcPrereqCheck[] +} + +export async function checkCdcPrereqs( + sc: StudioConfig, + ds: { id: number; name: string; dsType: number }, + sourceArg: string, +): Promise { + const MYSQL_LIKE = new Set([5, 17, 18, 19, 39]) + const PG_LIKE = new Set([7, 22, 40, 46, 48]) + const SS_LIKE = new Set([8]) + const DM_LIKE = new Set([26]) + const dsType = ds.dsType + + if (MYSQL_LIKE.has(dsType)) { + const resp = await apiSampleData(sc, { + id: ds.id, nameSpace: "performance_schema", dataObjectName: "global_variables", dsType, + where: "VARIABLE_NAME IN ('log_bin','binlog_format','binlog_row_image')", + }).catch(() => null) + const data = resp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined + const nameIdx = data?.fieldNames?.findIndex(f => f.toUpperCase() === "VARIABLE_NAME") ?? 0 + const valIdx = data?.fieldNames?.findIndex(f => f.toUpperCase() === "VARIABLE_VALUE") ?? 1 + const varMap: Record = {} + for (const row of data?.rows ?? []) { + varMap[String((row as unknown[])[nameIdx] ?? "").toLowerCase()] = String((row as unknown[])[valIdx] ?? "") + } + const checks: CdcPrereqCheck[] = [ + { name: "log_bin", required: "ON", actual: varMap["log_bin"] ?? "UNKNOWN" }, + { name: "binlog_format", required: "ROW", actual: varMap["binlog_format"] ?? "UNKNOWN" }, + { name: "binlog_row_image", required: "FULL", actual: varMap["binlog_row_image"] ?? "UNKNOWN" }, + ].map(c => ({ ...c, pass: c.actual.toUpperCase() === c.required })) + const ready = checks.every(c => c.pass) + const failed = checks.filter(c => !c.pass) + const fixHints = failed.map(c => + c.name === "log_bin" ? "Enable binary logging: add `log_bin=ON` to my.cnf and restart MySQL" : + c.name === "binlog_format" ? `SET GLOBAL binlog_format = 'ROW';` : + c.name === "binlog_row_image" ? `SET GLOBAL binlog_row_image = 'FULL';` : "" + ).filter(Boolean) + return { ok: ready, checks, message: ready ? "" : `MySQL CDC prerequisites not met for '${ds.name}':\n${fixHints.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } + } + + if (PG_LIKE.has(dsType)) { + const checks: CdcPrereqCheck[] = [] + const walResp = await apiSampleData(sc, { + id: ds.id, nameSpace: "pg_catalog", dataObjectName: "pg_settings", dsType, + where: "name = 'wal_level'", + }).catch(() => null) + const walData = walResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined + const settingIdx = walData?.fieldNames?.findIndex(f => f.toLowerCase() === "setting") ?? 1 + const walLevel = walData?.rows?.[0] ? String((walData.rows[0] as unknown[])[settingIdx]) : "UNKNOWN" + checks.push({ name: "wal_level", required: "logical", actual: walLevel, pass: walLevel === "logical" }) + const slotResp = await listPgSlots(sc, [ds.id]).catch(() => null) + const slots = slotResp?.data?.find(s => s.datasourceId === ds.id)?.pipelineSlotMetaVos ?? [] + checks.push({ name: "replication_slot", required: ">= 1 slot", actual: slots.length > 0 ? slots.map(s => s.slotName).join(", ") : "none", pass: slots.length > 0 }) + const ready = checks.every(c => c.pass) + const failed = checks.filter(c => !c.pass) + const fixHints = failed.map(c => + c.name === "wal_level" + ? "Set wal_level=logical in postgresql.conf and restart PostgreSQL" + : "Create a replication slot: SELECT pg_create_logical_replication_slot('slot_name', 'pgoutput');" + ) + return { ok: ready, checks, message: ready ? "" : `PostgreSQL CDC prerequisites not met for '${ds.name}':\n${fixHints.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } + } + + if (SS_LIKE.has(dsType)) { + const checks: CdcPrereqCheck[] = [] + // Check CDC enabled: query INFORMATION_SCHEMA.TABLES for cdc schema tables + // (sys.databases is not accessible via getSampleData API — uses schema prefix internally) + const cdcTablesResp = await apiSampleData(sc, { id: ds.id, nameSpace: "INFORMATION_SCHEMA", dataObjectName: "TABLES", dsType }).catch(() => null) + const cdcTablesData = cdcTablesResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined + const schemaIdx = cdcTablesData?.fieldNames?.findIndex(f => f.toUpperCase() === "TABLE_SCHEMA") ?? -1 + const cdcRows = (cdcTablesData?.rows ?? []).filter(row => String((row as unknown[])[schemaIdx] ?? "").toLowerCase() === "cdc") + const isCdcEnabled = cdcRows.length > 0 + checks.push({ name: "cdc_enabled", required: "1 (enabled)", actual: isCdcEnabled ? "1 (cdc schema found)" : "0 (no cdc schema)", pass: isCdcEnabled }) + // Check SQL Server Agent: try msdb.dbo.sysjobs (Agent jobs table) — accessible if Agent is configured + const agentResp = await apiSampleData(sc, { id: ds.id, nameSpace: "INFORMATION_SCHEMA", dataObjectName: "SCHEMATA", dsType }).catch(() => null) + const agentData = agentResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined + const schemaNameIdx = agentData?.fieldNames?.findIndex(f => f.toUpperCase() === "SCHEMA_NAME") ?? -1 + const hasCdcSchema = (agentData?.rows ?? []).some(row => String((row as unknown[])[schemaNameIdx] ?? "").toLowerCase() === "cdc") + // If CDC schema exists, SQL Server Agent must be running (CDC requires it) + const agentStatus = isCdcEnabled ? "Running (inferred from active CDC)" : "UNKNOWN (enable CDC first)" + checks.push({ name: "sql_server_agent", required: "Running", actual: agentStatus, pass: isCdcEnabled }) + const ready = checks.every(c => c.pass) + const failed = checks.filter(c => !c.pass) + const fixHints = failed.map(c => c.name === "cdc_enabled" + ? "Enable CDC on the database: EXEC sys.sp_cdc_enable_db\nThen enable CDC on each table: EXEC sys.sp_cdc_enable_table @source_schema='dbo', @source_name='', @role_name=NULL" + : "Start SQL Server Agent service (required for CDC capture jobs)") + return { ok: ready, checks, message: ready ? "" : `SQL Server CDC prerequisites not met for '${ds.name}':\n${fixHints.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } + } + + if (DM_LIKE.has(dsType)) { + const checks: CdcPrereqCheck[] = [] + const archResp = await apiSampleData(sc, { id: ds.id, nameSpace: "SYS", dataObjectName: "V$DATABASE", dsType }).catch(() => null) + const archData = archResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined + const archIdx = archData?.fieldNames?.findIndex(f => f.toUpperCase() === "ARCH_MODE") ?? -1 + const archMode = archIdx >= 0 && archData?.rows?.[0] ? String((archData.rows[0] as unknown[])[archIdx]) : "UNKNOWN" + checks.push({ name: "arch_mode", required: "1 (archiving enabled)", actual: archMode, pass: archMode === "1" }) + const suppIdx = archData?.fieldNames?.findIndex(f => f.toUpperCase() === "SUPPLEMENTAL_LOG_DATA_MIN") ?? -1 + const suppLog = suppIdx >= 0 && archData?.rows?.[0] ? String((archData.rows[0] as unknown[])[suppIdx]) : "UNKNOWN" + checks.push({ name: "supplemental_log", required: "YES", actual: suppLog, pass: suppLog.toUpperCase() === "YES" }) + const ready = checks.every(c => c.pass) + const failed = checks.filter(c => !c.pass) + const fixHints = failed.map(c => c.name === "arch_mode" ? "Enable archive log mode in DM: ALTER DATABASE MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN;" : "Enable supplemental logging: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;") + return { ok: ready, checks, message: ready ? "" : `DM CDC prerequisites not met for '${ds.name}':\n${fixHints.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } + } + + // Oracle (dsType=25) is not supported for CDC multi-table sync + const ORACLE_LIKE = new Set([25]) + if (ORACLE_LIKE.has(dsType)) { + return { + ok: false, + checks: [{ name: "cdc_support", required: "supported", actual: "not supported", pass: false }], + message: `Oracle is not supported as a CDC source for multi-table real-time sync. Supported sources: MySQL, PostgreSQL, SQL Server, DM.`, + } + } + + return { ok: true, message: "", checks: [] } +} + // --------------------------------------------------------------------------- // Command registration // --------------------------------------------------------------------------- @@ -370,205 +502,17 @@ export function registerDatasourceCommand(cli: Argv): void { const sc = await getStudioContext(argv) const ds = await resolveDatasource(sc, argv.datasource as string) const dsType = ds.dsType ?? 0 - - const MYSQL_LIKE = new Set([5, 17, 18, 19, 39]) - const PG_LIKE = new Set([7, 22, 40, 46, 48]) - const SS_LIKE = new Set([8]) // SQL Server - const DM_LIKE = new Set([26]) // DM 达梦 - - // ── MySQL / TiDB / Aurora MySQL / PolarDB MySQL ──────────────── - if (MYSQL_LIKE.has(dsType)) { - const resp = await apiSampleData(sc, { - id: ds.id, nameSpace: "performance_schema", dataObjectName: "global_variables", dsType, - where: "VARIABLE_NAME IN ('log_bin','binlog_format','binlog_row_image')", - }) - const data = resp.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined - const nameIdx = data?.fieldNames?.findIndex(f => f.toUpperCase() === "VARIABLE_NAME") ?? 0 - const valIdx = data?.fieldNames?.findIndex(f => f.toUpperCase() === "VARIABLE_VALUE") ?? 1 - const varMap: Record = {} - for (const row of data?.rows ?? []) { - const k = String((row as unknown[])[nameIdx] ?? "").toLowerCase() - varMap[k] = String((row as unknown[])[valIdx] ?? "") - } - const checks = [ - { name: "log_bin", required: "ON", actual: varMap["log_bin"] ?? "UNKNOWN" }, - { name: "binlog_format", required: "ROW", actual: varMap["binlog_format"] ?? "UNKNOWN" }, - { name: "binlog_row_image", required: "FULL", actual: varMap["binlog_row_image"] ?? "UNKNOWN" }, - ].map(c => ({ ...c, pass: c.actual.toUpperCase() === c.required })) - - const ready = checks.every(c => c.pass) - const failed = checks.filter(c => !c.pass) - const fixHints = failed.map(c => - c.name === "log_bin" ? "Enable binary logging: add `log_bin=ON` to my.cnf and restart MySQL" : - c.name === "binlog_format" ? `SET GLOBAL binlog_format = 'ROW';` : - c.name === "binlog_row_image" ? `SET GLOBAL binlog_row_image = 'FULL';` : "" - ).filter(Boolean) - - logOperation("datasource check-cdc", { ok: ready }) - success({ datasource: ds.name, ds_type: dsType, checks, ready }, { - format, - aiMessage: ready - ? `MySQL CDC prerequisites met for '${ds.name}'. Proceed to: cz-cli task create-realtime-sync --folder --source ${ds.name} --database --target ` - : `MySQL CDC prerequisites NOT met for '${ds.name}'. Fix:\n${fixHints.join("\n")}`, - }) - return - } - - // ── PostgreSQL / Aurora PG / PolarDB PG / Greenplum / Redshift ─ - if (PG_LIKE.has(dsType)) { - const checks: { name: string; required: string; actual: string; pass: boolean }[] = [] - - // wal_level check - const walResp = await apiSampleData(sc, { - id: ds.id, nameSpace: "pg_catalog", dataObjectName: "pg_settings", dsType, - where: "name = 'wal_level'", - }).catch(() => null) - const walData = walResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined - const settingIdx = walData?.fieldNames?.findIndex(f => f.toLowerCase() === "setting") ?? 1 - const walRow = walData?.rows?.[0] - const walLevel = walRow ? String((walRow as unknown[])[settingIdx]) : "UNKNOWN" - checks.push({ name: "wal_level", required: "logical", actual: walLevel, pass: walLevel === "logical" }) - - // slot check - const slotResp = await listPgSlots(sc, [ds.id]).catch(() => null) - const slots = slotResp?.data?.find(s => s.datasourceId === ds.id)?.pipelineSlotMetaVos ?? [] - checks.push({ - name: "replication_slot", - required: ">= 1 slot", - actual: slots.length > 0 ? slots.map(s => s.slotName).join(", ") : "none", - pass: slots.length > 0, - }) - - const ready = checks.every(c => c.pass) - const failed = checks.filter(c => !c.pass) - const fixHints = failed.map(c => - c.name === "wal_level" - ? "Set wal_level=logical in postgresql.conf and restart PostgreSQL" - : "Create a replication slot: SELECT pg_create_logical_replication_slot('slot_name', 'pgoutput');" - ) - - logOperation("datasource check-cdc", { ok: ready }) - success({ datasource: ds.name, ds_type: dsType, checks, ready }, { - format, - aiMessage: ready - ? `PostgreSQL CDC prerequisites met for '${ds.name}'. Proceed to: cz-cli task create-realtime-sync --folder --source ${ds.name} --database --target ` - : `PostgreSQL CDC prerequisites NOT met for '${ds.name}'. Fix:\n${fixHints.join("\n")}`, - }) - return - } - - // ── SQL Server ───────────────────────────────────────────────── - // Note: not tested (no environment available) — based on standard SQL Server CDC docs - if (SS_LIKE.has(dsType)) { - const checks: { name: string; required: string; actual: string; pass: boolean }[] = [] - - // Check if CDC is enabled at database level: sys.databases.is_cdc_enabled = 1 - const dbResp = await apiSampleData(sc, { - id: ds.id, nameSpace: "master", dataObjectName: "sys.databases", dsType, - where: "name = DB_NAME()", - }).catch(() => null) - const dbData = dbResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined - const cdcIdx = dbData?.fieldNames?.findIndex(f => f.toLowerCase() === "is_cdc_enabled") ?? -1 - const isCdcEnabled = cdcIdx >= 0 && dbData?.rows?.[0] - ? String((dbData.rows[0] as unknown[])[cdcIdx]) === "1" || String((dbData.rows[0] as unknown[])[cdcIdx]).toLowerCase() === "true" - : false - checks.push({ - name: "cdc_enabled", - required: "1 (enabled)", - actual: isCdcEnabled ? "1" : (cdcIdx >= 0 ? "0" : "UNKNOWN"), - pass: isCdcEnabled, - }) - - // Check if SQL Server Agent is running: sys.dm_server_services - const agentResp = await apiSampleData(sc, { - id: ds.id, nameSpace: "master", dataObjectName: "sys.dm_server_services", dsType, - where: "servicename LIKE 'SQL Server Agent%'", - }).catch(() => null) - const agentData = agentResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined - const statusIdx = agentData?.fieldNames?.findIndex(f => f.toLowerCase() === "status_desc") ?? -1 - const agentStatus = statusIdx >= 0 && agentData?.rows?.[0] - ? String((agentData.rows[0] as unknown[])[statusIdx]) - : "UNKNOWN" - checks.push({ - name: "sql_server_agent", - required: "Running", - actual: agentStatus, - pass: agentStatus.toLowerCase() === "running", - }) - - const ready = checks.every(c => c.pass) - const failed = checks.filter(c => !c.pass) - const fixHints = failed.map(c => - c.name === "cdc_enabled" - ? "Enable CDC on the database: EXEC sys.sp_cdc_enable_db" - : "Start SQL Server Agent service (required for CDC capture jobs)" - ) - - logOperation("datasource check-cdc", { ok: ready }) - success({ datasource: ds.name, ds_type: dsType, checks, ready }, { - format, - aiMessage: ready - ? `SQL Server CDC prerequisites met for '${ds.name}'.` - : `SQL Server CDC prerequisites NOT met for '${ds.name}'. Fix:\n${fixHints.join("\n")}`, - }) - return - } - - // ── DM 达梦 ──────────────────────────────────────────────────── - // Note: not tested (no environment available) — based on standard DM CDC docs - if (DM_LIKE.has(dsType)) { - const checks: { name: string; required: string; actual: string; pass: boolean }[] = [] - - // Check archive log mode: V$DATABASE ARCH_MODE = 1 - const archResp = await apiSampleData(sc, { - id: ds.id, nameSpace: "SYS", dataObjectName: "V$DATABASE", dsType, - }).catch(() => null) - const archData = archResp?.data as { fieldNames?: string[]; rows?: unknown[][] } | undefined - const archIdx = archData?.fieldNames?.findIndex(f => f.toUpperCase() === "ARCH_MODE") ?? -1 - const archMode = archIdx >= 0 && archData?.rows?.[0] - ? String((archData.rows[0] as unknown[])[archIdx]) - : "UNKNOWN" - checks.push({ - name: "arch_mode", - required: "1 (archiving enabled)", - actual: archMode, - pass: archMode === "1", - }) - - // Check supplemental log: V$DATABASE SUPPLEMENTAL_LOG_DATA_MIN - const suppIdx = archData?.fieldNames?.findIndex(f => f.toUpperCase() === "SUPPLEMENTAL_LOG_DATA_MIN") ?? -1 - const suppLog = suppIdx >= 0 && archData?.rows?.[0] - ? String((archData.rows[0] as unknown[])[suppIdx]) - : "UNKNOWN" - checks.push({ - name: "supplemental_log", - required: "YES", - actual: suppLog, - pass: suppLog.toUpperCase() === "YES", - }) - - const ready = checks.every(c => c.pass) - const failed = checks.filter(c => !c.pass) - const fixHints = failed.map(c => - c.name === "arch_mode" - ? "Enable archive log mode in DM: ALTER DATABASE MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN;" - : "Enable supplemental logging: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;" - ) - - logOperation("datasource check-cdc", { ok: ready }) - success({ datasource: ds.name, ds_type: dsType, checks, ready }, { - format, - aiMessage: ready - ? `DM CDC prerequisites met for '${ds.name}'.` - : `DM CDC prerequisites NOT met for '${ds.name}'. Fix:\n${fixHints.join("\n")}`, - }) - return - } - - // ── Other types ──────────────────────────────────────────────── - success({ datasource: ds.name, ds_type: dsType, checks: [], ready: true }, { - format, - aiMessage: `Datasource type ${dsType} does not require CDC prerequisite checks.`, + const result = await checkCdcPrereqs(sc, { id: ds.id, name: ds.name, dsType }, ds.name) + logOperation("datasource check-cdc", { ok: result.ok }) + const aiMsg = result.ok + ? (dsType === 0 + ? `Datasource type ${dsType} does not require CDC prerequisite checks.` + : result.checks.length === 0 + ? `Datasource type ${dsType} does not require CDC prerequisite checks.` + : `${ds.name} CDC prerequisites met. Proceed to: cz-cli task create-realtime-sync --folder --source ${ds.name} --database --target `) + : result.message + success({ datasource: ds.name, ds_type: dsType, checks: result.checks, ready: result.ok }, { + format, aiMessage: aiMsg, }) } catch (err) { reportDatasourceError(err, format) diff --git a/packages/cz-cli/src/commands/task.ts b/packages/cz-cli/src/commands/task.ts index cb3f0d728..9954a5aab 100644 --- a/packages/cz-cli/src/commands/task.ts +++ b/packages/cz-cli/src/commands/task.ts @@ -34,8 +34,8 @@ import { studioUrl } from "./studio-url.js" import { normalizeTaskIdentity } from "../identity.js" import { t } from "../locale.js" import { resolveConnectionConfig } from "../connection/config.js" -import { resolveDatasource } from "./datasource.js" -import { convertAgentCron } from "../cron-adapter.js" +import { resolveDatasource, checkCdcPrereqs } from "./datasource.js" +import { convertAgentCron, cronNextRuns } from "../cron-adapter.js" function formatIsoStartOfDay(value: string | undefined | null): string { if (!value) return new Date().toISOString().slice(0, 10) + "T00:00:00.000Z" @@ -45,6 +45,21 @@ function formatIsoStartOfDay(value: string | undefined | null): string { return trimmed } +async function checkDuplicateTaskName( + sc: StudioConfig, + folderId: number, + name: string, + format: string | undefined, +): Promise { + const existing = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId, fileName: name }) + const existingData = (existing.data && typeof existing.data === "object" ? existing.data : {}) as Record + const existingList = Array.isArray(existingData.list) ? existingData.list as Record[] : [] + const duplicate = existingList.find((t) => String(t.dataFileName ?? t.fileName ?? "") === name) + if (duplicate) { + error("DUPLICATE_TASK", `Task '${name}' already exists in this folder (task_id=${duplicate.id ?? duplicate.task_id}). Use a different name or delete the existing task first.`, { format, exitCode: 2 }) + } +} + const TASK_TYPE_MAP: Record = { @@ -121,102 +136,6 @@ async function pMap(items: T[], fn: (item: T) => Promise, concurrency: return results } -// CDC prerequisite check helper — returns { ok, message } without throwing -async function checkCdcPrereqs( - sc: StudioConfig, - ds: { id: number; name: string; dsType: number }, - sourceArg: string, -): Promise<{ ok: boolean; message: string }> { - const MYSQL_LIKE = new Set([5, 17, 18, 19, 39]) - const PG_LIKE = new Set([7, 22, 40, 46, 48]) - const SS_LIKE = new Set([8]) - const DM_LIKE = new Set([26]) - - const dsType = ds.dsType - - if (MYSQL_LIKE.has(dsType)) { - const r = await studioRequest(sc, "/ide-authority/v1/projectDataSources/getSampleData", { - id: ds.id, nameSpace: "performance_schema", dataObjectName: "global_variables", - options: { dsType, where: "VARIABLE_NAME IN ('log_bin','binlog_format','binlog_row_image')" }, - }).catch(() => null) - const data = (r as { data?: { fieldNames?: string[]; rows?: unknown[][] } } | null)?.data - const nameIdx = data?.fieldNames?.findIndex(f => f.toUpperCase() === "VARIABLE_NAME") ?? 0 - const valIdx = data?.fieldNames?.findIndex(f => f.toUpperCase() === "VARIABLE_VALUE") ?? 1 - const varMap: Record = {} - for (const row of data?.rows ?? []) { - varMap[String((row as unknown[])[nameIdx] ?? "").toLowerCase()] = String((row as unknown[])[valIdx] ?? "") - } - const failed: string[] = [] - if ((varMap["log_bin"] ?? "").toUpperCase() !== "ON") failed.push("log_bin must be ON (add log_bin=ON to my.cnf and restart)") - if ((varMap["binlog_format"] ?? "").toUpperCase() !== "ROW") failed.push("SET GLOBAL binlog_format = 'ROW'") - if ((varMap["binlog_row_image"] ?? "").toUpperCase() !== "FULL") failed.push("SET GLOBAL binlog_row_image = 'FULL'") - if (failed.length > 0) return { ok: false, message: `MySQL CDC prerequisites not met for '${ds.name}':\n${failed.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } - return { ok: true, message: "" } - } - - if (PG_LIKE.has(dsType)) { - const walR = await studioRequest(sc, "/ide-authority/v1/projectDataSources/getSampleData", { - id: ds.id, nameSpace: "pg_catalog", dataObjectName: "pg_settings", - options: { dsType, where: "name = 'wal_level'" }, - }).catch(() => null) - const walData = (walR as { data?: { fieldNames?: string[]; rows?: unknown[][] } } | null)?.data - const si = walData?.fieldNames?.findIndex(f => f.toLowerCase() === "setting") ?? 1 - const walLevel = walData?.rows?.[0] ? String((walData.rows[0] as unknown[])[si]) : "" - const slotR = await listPgSlots(sc, [ds.id]).catch(() => null) - const slots = slotR?.data?.find(s => s.datasourceId === ds.id)?.pipelineSlotMetaVos ?? [] - const failed: string[] = [] - if (walLevel !== "logical") failed.push(`wal_level must be 'logical' (current: '${walLevel || "unknown"}') — set in postgresql.conf and restart`) - if (slots.length === 0) failed.push("No replication slot found — run: SELECT pg_create_logical_replication_slot('slot_name', 'pgoutput')") - if (failed.length > 0) return { ok: false, message: `PostgreSQL CDC prerequisites not met for '${ds.name}':\n${failed.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } - return { ok: true, message: "" } - } - - // SQL Server — not tested, based on standard SQL Server CDC docs - if (SS_LIKE.has(dsType)) { - const dbR = await studioRequest(sc, "/ide-authority/v1/projectDataSources/getSampleData", { - id: ds.id, nameSpace: "master", dataObjectName: "sys.databases", - options: { dsType, where: "name = DB_NAME()" }, - }).catch(() => null) - const dbData = (dbR as { data?: { fieldNames?: string[]; rows?: unknown[][] } } | null)?.data - const cdcIdx = dbData?.fieldNames?.findIndex(f => f.toLowerCase() === "is_cdc_enabled") ?? -1 - const isCdcEnabled = cdcIdx >= 0 && dbData?.rows?.[0] - ? String((dbData.rows[0] as unknown[])[cdcIdx]) === "1" - : false - const agentR = await studioRequest(sc, "/ide-authority/v1/projectDataSources/getSampleData", { - id: ds.id, nameSpace: "master", dataObjectName: "sys.dm_server_services", - options: { dsType, where: "servicename LIKE 'SQL Server Agent%'" }, - }).catch(() => null) - const agentData = (agentR as { data?: { fieldNames?: string[]; rows?: unknown[][] } } | null)?.data - const stIdx = agentData?.fieldNames?.findIndex(f => f.toLowerCase() === "status_desc") ?? -1 - const agentStatus = stIdx >= 0 && agentData?.rows?.[0] ? String((agentData.rows[0] as unknown[])[stIdx]) : "UNKNOWN" - const failed: string[] = [] - if (!isCdcEnabled) failed.push("Enable CDC: EXEC sys.sp_cdc_enable_db") - if (agentStatus.toLowerCase() !== "running") failed.push(`SQL Server Agent must be Running (current: ${agentStatus}) — start the service`) - if (failed.length > 0) return { ok: false, message: `SQL Server CDC prerequisites not met for '${ds.name}':\n${failed.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } - return { ok: true, message: "" } - } - - // DM 达梦 — not tested, based on standard DM CDC docs - if (DM_LIKE.has(dsType)) { - const dbR = await studioRequest(sc, "/ide-authority/v1/projectDataSources/getSampleData", { - id: ds.id, nameSpace: "SYS", dataObjectName: "V$DATABASE", - options: { dsType }, - }).catch(() => null) - const dbData = (dbR as { data?: { fieldNames?: string[]; rows?: unknown[][] } } | null)?.data - const archIdx = dbData?.fieldNames?.findIndex(f => f.toUpperCase() === "ARCH_MODE") ?? -1 - const archMode = archIdx >= 0 && dbData?.rows?.[0] ? String((dbData.rows[0] as unknown[])[archIdx]) : "UNKNOWN" - const suppIdx = dbData?.fieldNames?.findIndex(f => f.toUpperCase() === "SUPPLEMENTAL_LOG_DATA_MIN") ?? -1 - const suppLog = suppIdx >= 0 && dbData?.rows?.[0] ? String((dbData.rows[0] as unknown[])[suppIdx]) : "UNKNOWN" - const failed: string[] = [] - if (archMode !== "1") failed.push(`ARCH_MODE must be 1 (current: ${archMode}) — ALTER DATABASE MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN;`) - if (suppLog.toUpperCase() !== "YES") failed.push(`Supplemental logging must be enabled (current: ${suppLog}) — ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;`) - if (failed.length > 0) return { ok: false, message: `DM CDC prerequisites not met for '${ds.name}':\n${failed.join("\n")}\n\nRun 'cz-cli datasource check-cdc ${sourceArg}' for details. Use --skip-check to bypass.` } - return { ok: true, message: "" } - } - - return { ok: true, message: "" } // other types: no check needed -} - async function autoResolveLakehouseDs(sc: StudioConfig): Promise<{ id: number; name: string } | null> { const resp = await studioRequest<{ list?: unknown[] }>(sc, "/ide-authority/v1/projectDataSources/list", { current: 1, pageSize: 50, status: 1, pageIndex: 1, dsType: 1, projectName: sc.workspaceName, @@ -386,8 +305,8 @@ export function registerTaskCommand(cli: Argv): void { "List tasks", (y) => y - .option("page", { type: "number", default: 1 }) - .option("page-size", { type: "number", default: 10 }) + .option("page", { type: "number", default: 1, describe: "Page number (1-based)" }) + .option("page-size", { type: "number", default: 10, describe: "Number of tasks per page" }) .option("limit", { type: "number", describe: "Alias of --page-size" }) .option("parent", { type: "number", describe: "Folder ID filter" }) .option("folder", { type: "number", describe: "Folder ID filter (alias of --parent)", hidden: true }) @@ -422,9 +341,9 @@ export function registerTaskCommand(cli: Argv): void { const total = data.total as number | undefined const totalPages = data.totalPages as number | undefined const aiMessage = - `当前仅展示第 ${argv.page} 页` + - (total != null ? `(${tasks.length} 条 / 共 ${total} 条,共 ${totalPages} 页)` : "") + - `。如需下一页,请执行: cz-cli task list --page ${(argv.page as number) + 1} --page-size ${pageSize}` + `Showing page ${argv.page}` + + (total != null ? ` (${tasks.length} of ${total} tasks, ${totalPages} pages total)` : "") + + `. For next page: cz-cli task list --page ${(argv.page as number) + 1} --page-size ${pageSize}` logOperation("task list", { ok: true }) success(tasks, { format, @@ -441,8 +360,8 @@ export function registerTaskCommand(cli: Argv): void { "List task folders", (y) => y - .option("page", { type: "number", default: 1 }) - .option("page-size", { type: "number", default: 10 }) + .option("page", { type: "number", default: 1, describe: "Page number (1-based)" }) + .option("page-size", { type: "number", default: 10, describe: "Number of folders per page" }) .option("parent", { type: "number", default: 0, describe: "Parent folder ID" }), async (argv) => { const format = argv.format @@ -459,9 +378,9 @@ export function registerTaskCommand(cli: Argv): void { const total = data.total as number | undefined const totalPages = data.totalPages as number | undefined const aiMessage = - `当前仅展示第 ${argv.page} 页` + - (total != null ? `(${items.length} 条 / 共 ${total} 条,共 ${totalPages} 页)` : "") + - `。可使用 --page 和 --page-size 翻页。` + `Showing page ${argv.page}` + + (total != null ? ` (${items.length} of ${total} folders, ${totalPages} pages total)` : "") + + `. Use --page and --page-size to paginate.` logOperation("task list-folders", { ok: true }) success(items, { format, @@ -531,9 +450,9 @@ export function registerTaskCommand(cli: Argv): void { type: "string", demandOption: true, describe: - 'Available options: SQL, PYTHON, SHELL, JDBC, FLOW, INTEGRATION, REALTIME, VIRTUAL, FULL_INCREMENTAL, MULTI_REALTIME, MULTI_DI"', + 'Available options: SQL, PYTHON, SHELL, JDBC, FLOW (composite task — DAG of SQL/Python/Shell nodes), INTEGRATION, REALTIME, VIRTUAL, FULL_INCREMENTAL, MULTI_REALTIME, MULTI_DI', }) - .option("folder", { type: "string", describe: "Folder ID or name (required; root directory not allowed)" }) + .option("folder", { type: "string", demandOption: true, describe: "Folder ID or name (run 'cz-cli task folder-tree' to list folders)" }) .option("description", { type: "string", describe: "Task description" }), async (argv) => { const format = argv.format @@ -549,19 +468,17 @@ export function registerTaskCommand(cli: Argv): void { if (folderId === 0) { process.stderr.write("Warning: creating task in root directory. Consider using a subfolder to keep the workspace organized.\n") } - // Check for duplicate name in the same folder - const existing = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId, fileName: argv.name as string }) - const existingData = (existing.data && typeof existing.data === "object" ? existing.data : {}) as Record - const existingList = Array.isArray(existingData.list) ? existingData.list as Record[] : [] - const duplicate = existingList.find((t) => String(t.dataFileName ?? t.fileName ?? "") === argv.name) - if (duplicate) { - error("DUPLICATE_TASK", `Task '${argv.name}' already exists in this folder (task_id=${duplicate.id ?? duplicate.task_id}). Use a different name or delete the existing task first.`, { format, exitCode: 2 }); return + const nameRaw = argv.name as string + if (!/^[a-zA-Z0-9_\-一-龥]+$/.test(nameRaw)) { + error("INVALID_ARGUMENTS", `Task name '${nameRaw}' contains illegal characters. Only letters, numbers, underscores, hyphens, and Chinese characters are allowed.`, { format, exitCode: 2 }); return } + // Check for duplicate name in the same folder + await checkDuplicateTaskName(sc, folderId, nameRaw, format) const resp = await createTask(sc, { fileType: String(parseTaskType(argv.type as string)), createdBy: String(sc.userId), projectId: sc.projectId, - dataFileName: argv.name as string, + dataFileName: nameRaw, fileDescription: argv.description, dataFolderId: folderId, workspaceName: sc.workspaceName, @@ -578,7 +495,7 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "create-realtime-sync ", - "Create a multi-table realtime CDC sync task (MULTI_REALTIME) — one step: prereq check + create + configure", + "Create and configure a multi-table real-time CDC sync task in one step (checks CDC prerequisites, creates and configures the task)", (y) => y .positional("name", { type: "string", demandOption: true, describe: "Task name" }) @@ -611,6 +528,17 @@ export function registerTaskCommand(cli: Argv): void { error("INVALID_ARGUMENTS", `Cannot determine dsType for source datasource '${argv.source}'.`, { format, exitCode: 2 }); return } + // Validate source dsType is supported for CDC multi-table sync + const CDC_SUPPORTED_TYPES = new Set([5, 7, 8, 17, 18, 19, 21, 22, 25, 26, 39, 40, 46, 48]) + // Note: Oracle (25) is listed in DS_TYPE_MAP but NOT supported for CDC — checkCdcPrereqs will reject it + const CDC_SUPPORTED_MYSQL = "MySQL/TiDB/MariaDB (5,17,18,19,39)" + const CDC_SUPPORTED_PG = "PostgreSQL/Greenplum (7,22,40,46,48)" + const CDC_SUPPORTED_SS = "SQL Server (8)" + const CDC_SUPPORTED_DM = "DM (26)" + if (!CDC_SUPPORTED_TYPES.has(sourceDs.dsType)) { + error("UNSUPPORTED_DATASOURCE", `Datasource '${argv.source}' (dsType=${sourceDs.dsType}) is not supported for CDC multi-table sync. Supported: ${CDC_SUPPORTED_MYSQL}, ${CDC_SUPPORTED_PG}, ${CDC_SUPPORTED_SS}, ${CDC_SUPPORTED_DM}.`, { format, exitCode: 2 }); return + } + // Step 2: CDC prerequisite check (before creating anything) if (!(argv["skip-check"] as boolean)) { const check = await checkCdcPrereqs(sc, sourceDs as { id: number; name: string; dsType: number }, String(argv.source)) @@ -624,12 +552,7 @@ export function registerTaskCommand(cli: Argv): void { : await resolveFolderIdByName(sc, folderRaw, format) // Step 4: check duplicate name - const existing = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId, fileName: argv.name as string }) - const existingData = (existing.data && typeof existing.data === "object" ? existing.data : {}) as Record - const existingList = Array.isArray(existingData.list) ? existingData.list as Record[] : [] - if (existingList.find(t => String(t.dataFileName ?? t.fileName ?? "") === argv.name)) { - error("DUPLICATE_TASK", `Task '${argv.name}' already exists in this folder.`, { format, exitCode: 2 }); return - } + await checkDuplicateTaskName(sc, folderId, argv.name as string, format) // Step 5: create task const created = await createTask(sc, { @@ -652,7 +575,6 @@ export function registerTaskCommand(cli: Argv): void { error("INVALID_ARGUMENTS", `CDC multi-table sync only supports Lakehouse as target. '${targetDs.name}' is not a Lakehouse datasource.`, { format, exitCode: 2 }); return } targetDsId = targetDs.id - targetDsType = 1 } else { const lhDs = await autoResolveLakehouseDs(sc) if (!lhDs) { @@ -705,7 +627,7 @@ export function registerTaskCommand(cli: Argv): void { instanceName: sc.workspaceName, etlVcCode: vcName, ...(resolvedVcId != null && { etlVcId: resolvedVcId }), - activeStartTime: new Date().toISOString().slice(0, 10) + "T00:00:00.000Z", + activeStartTime: formatIsoStartOfDay(undefined), activeEndTime: "2099-01-01T00:00:00.000Z", }).catch(() => null) } @@ -730,7 +652,7 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "create-batch-sync ", - "Create a multi-table offline batch sync task (MULTI_DI) — one step: prereq check + create + configure", + "Create and configure a multi-table offline batch sync task in one step (checks CDC prerequisites, creates and configures the task)", (y) => y .positional("name", { type: "string", demandOption: true, describe: "Task name" }) @@ -773,12 +695,7 @@ export function registerTaskCommand(cli: Argv): void { : await resolveFolderIdByName(sc, folderRaw, format) // Step 4: check duplicate name - const existing = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId, fileName: argv.name as string }) - const existingData = (existing.data && typeof existing.data === "object" ? existing.data : {}) as Record - const existingList = Array.isArray(existingData.list) ? existingData.list as Record[] : [] - if (existingList.find(t => String(t.dataFileName ?? t.fileName ?? "") === argv.name)) { - error("DUPLICATE_TASK", `Task '${argv.name}' already exists in this folder.`, { format, exitCode: 2 }); return - } + await checkDuplicateTaskName(sc, folderId, argv.name as string, format) // Step 5: create task const created = await createTask(sc, { @@ -879,7 +796,7 @@ export function registerTaskCommand(cli: Argv): void { cronExpress: normalizeCron((argv.cron as string | undefined) ?? "0 0 2 * * ? *"), ...(vcName && { etlVcCode: vcName }), ...(resolvedVcId != null && { etlVcId: resolvedVcId }), - activeStartTime: new Date().toISOString().slice(0, 10) + "T00:00:00.000Z", + activeStartTime: formatIsoStartOfDay(undefined), activeEndTime: "2099-01-01T00:00:00.000Z", }).catch(() => null) @@ -968,12 +885,7 @@ export function registerTaskCommand(cli: Argv): void { const folderId = /^\d+$/.test(folderRaw) ? parseInt(folderRaw, 10) : await resolveFolderIdByName(sc, folderRaw, format) - const existing = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId, fileName: argv.name as string }) - const existingData = (existing.data && typeof existing.data === "object" ? existing.data : {}) as Record - const existingList = Array.isArray(existingData.list) ? existingData.list as Record[] : [] - if (existingList.find(t => String(t.dataFileName ?? t.fileName ?? "") === argv.name)) { - error("DUPLICATE_TASK", `Task '${argv.name}' already exists in this folder.`, { format, exitCode: 2 }); return - } + await checkDuplicateTaskName(sc, folderId, argv.name as string, format) // Step 5: create task const created = await createTask(sc, { @@ -1083,7 +995,7 @@ export function registerTaskCommand(cli: Argv): void { cronExpress: normalizeCron((argv.cron as string | undefined) ?? "0 0 2 * * ? *"), ...(vcName && { etlVcCode: vcName }), ...(resolvedVcId != null && { etlVcId: resolvedVcId }), - activeStartTime: new Date().toISOString().slice(0, 10) + "T00:00:00.000Z", + activeStartTime: formatIsoStartOfDay(undefined), activeEndTime: "2099-01-01T00:00:00.000Z", }).catch(() => null) const ddlHint = !targetTableExists @@ -1149,13 +1061,7 @@ export function registerTaskCommand(cli: Argv): void { process.stderr.write("Warning: creating task in root directory. Consider using a subfolder to keep the workspace organized.\n") } // Check for duplicate name - const existing = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId, fileName: argv.name as string }) - const existingData = (existing.data && typeof existing.data === "object" ? existing.data : {}) as Record - const existingList = Array.isArray(existingData.list) ? existingData.list as Record[] : [] - const duplicate = existingList.find((t) => String(t.dataFileName ?? t.fileName ?? "") === argv.name) - if (duplicate) { - error("DUPLICATE_TASK", `Task '${argv.name}' already exists in this folder (task_id=${duplicate.id ?? duplicate.task_id}). Use a different name or delete the existing task first.`, { format, exitCode: 2 }); return - } + await checkDuplicateTaskName(sc, folderId, argv.name as string, format) // Step 1: create const parsedFileType = parseTaskType(argv.type as string) @@ -1254,7 +1160,7 @@ export function registerTaskCommand(cli: Argv): void { selfDependsJob: 0, executeTimeout: 0, executeTimeoutUnit: "m", - activeStartTime: new Date().toISOString().slice(0, 10) + "T00:00:00.000Z", + activeStartTime: formatIsoStartOfDay(undefined), activeEndTime: "2099-01-01T00:00:00.000Z", dataFileInputListReqs: [], configProperties: JSON.stringify(oldConfigProps), @@ -1287,7 +1193,7 @@ export function registerTaskCommand(cli: Argv): void { "Create a task folder", (y) => y - .positional("name", { type: "string", demandOption: true }) + .positional("name", { type: "string", demandOption: true, describe: "Folder name" }) .option("parent", { type: "number", default: 0, describe: "Parent folder ID" }), async (argv) => { const format = argv.format @@ -1320,7 +1226,7 @@ export function registerTaskCommand(cli: Argv): void { .command( "status ", "Get combined draft + deployed status in one call", - (y) => y.positional("task", { type: "string", demandOption: true }), + (y) => y.positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }), async (argv) => { const format = argv.format try { @@ -1412,7 +1318,7 @@ export function registerTaskCommand(cli: Argv): void { .command( ["get-content ", "detail "], "Get task script content and configuration", - (y) => y.positional("task", { type: "string", demandOption: true }), + (y) => y.positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }), async (argv) => { const format = argv.format try { @@ -1474,7 +1380,7 @@ export function registerTaskCommand(cli: Argv): void { "Save script content for a SQL/Python/Shell task", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("content", { type: "string", describe: "Script content" }) .option("file", { alias: "f", type: "string", describe: "Read content from file" }) .option("params", { type: "string", describe: 'Runtime parameters as JSON object. Values starting with "$[" or matching system param names (bizdate, sys_biz_day, sys_plan_day, etc.) are treated as system/expression params automatically. e.g. \'{"city":"beijing","dt":"bizdate","yesterday":"$[yyyy-MM-dd,-1d]"}\'' }) @@ -1535,7 +1441,7 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "create-offline-sync ", - "Create a single-table offline batch sync task (INTEGRATION) and fetch source schema for Agent field mapping", + "Create a single-table offline batch sync task (INTEGRATION). Fetches source column metadata to help configure the field mapping", (y) => y .positional("name", { type: "string", demandOption: true, describe: "Task name" }) @@ -1557,12 +1463,7 @@ export function registerTaskCommand(cli: Argv): void { const folderId = /^\d+$/.test(folderRaw) ? parseInt(folderRaw, 10) : await resolveFolderIdByName(sc, folderRaw, format) - const existing = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId, fileName: argv.name as string }) - const existingData = (existing.data && typeof existing.data === "object" ? existing.data : {}) as Record - const existingList = Array.isArray(existingData.list) ? existingData.list as Record[] : [] - if (existingList.find(t => String(t.dataFileName ?? t.fileName ?? "") === argv.name)) { - error("DUPLICATE_TASK", `Task '${argv.name}' already exists in this folder.`, { format, exitCode: 2 }); return - } + await checkDuplicateTaskName(sc, folderId, argv.name as string, format) // Step 1.5: resolve datasources BEFORE creating task (avoid orphan on failure) const sourceDs = await resolveDatasource(sc, String(argv.source)) @@ -1651,10 +1552,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "offline-sync-schema ", - "Fetch source table schema for INTEGRATION task — Agent uses output to generate field mapping for save-offline-sync", + "Fetch source table column metadata for an INTEGRATION task. Use the output to build the config JSON for save-offline-sync", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("source", { type: "string", demandOption: true, describe: "Source datasource name or ID" }) .option("source-db", { type: "string", demandOption: true, describe: "Source database/schema" }) .option("source-table", { type: "string", demandOption: true, describe: "Source table name" }) @@ -1734,7 +1635,7 @@ export function registerTaskCommand(cli: Argv): void { const primaryCols = sourceColumns.filter((c) => c.primary === true || c.isPrimary === true) // Good splitPk candidates: INT/BIGINT/SERIAL with wide range — exclude TINYINT/SMALLINT (too narrow for effective splitting) const splitPkTypes = new Set(['INT','INTEGER','BIGINT','MEDIUMINT','SERIAL','BIGSERIAL','INT4','INT8','INT2']) - const splitPkExclude = new Set(['TINYINT','SMALLINT','INT2']) + const splitPkExclude = new Set(['INT2']) // INT2 (smallint) is too small for reliable range splits const numericTypes = new Set(['INT','INTEGER','BIGINT','SMALLINT','TINYINT','MEDIUMINT','SERIAL','BIGSERIAL']) const numericCols = sourceColumns.filter((c) => numericTypes.has(String(c.type ?? "").toUpperCase().split("(")[0].trim())) const splitPkCols = sourceColumns.filter((c) => { @@ -1954,7 +1855,7 @@ export function registerTaskCommand(cli: Argv): void { } } if (SS_LIKE.has(dt ?? 0)) return { // SQLServer - note: `SQLServer: default schema is 'dbo'. where format: col >= '${BIZDATE}'. splitPk supported.`, + note: `SQLServer: default schema is 'dbo'. where format: col >= '${BIZDATE}'. splitPk supported. UNSUPPORTED types (remove from config): sql_variant, image, binary, varbinary, timestamp (rowversion). text/ntext map to STRING.`, params: { dsType: dt, operatorType: "source", table: argv["source-table"] as string, database: argv["source-db"] as string, ...(recommendedSplitPk != null ? { splitPk: recommendedSplitPk } : {}), ...(recommendedWhere != null ? { where: recommendedWhere } : {}), @@ -2063,7 +1964,8 @@ export function registerTaskCommand(cli: Argv): void { // --- Config JSON template --- lines.push(`\nConfig JSON structure (use source_params_template.params for source.params):`) - lines.push(`{"templateKey":1,"userParams":{},"sourceConnection":{"datasourceId":,"datasourceName":"","type":},"sinkConnection":{"datasourceId":,"datasourceName":"","type":1},"jobs":[{"source":{"dataObject":"
","namespace":"","params":,"columns":[...source columns as-is...]},"sink":{"dataObject":"
","namespace":"","params":{"dsType":1,"writeMode":"${recommendedWriteMode}","operatorType":"sink","table":"
","database":"","is_partition":false},"columns":[...same columns with Lakehouse types...]},"setting":{"parallelism":1,"errorLimit":{"maxCount":-1,"collectDirtyData":true,"record":-1}},"columnMapping":{"col":"col",...}}]}`) + lines.push(`{"templateKey":1,"userParams":{},"sourceConnection":{"datasourceId":,"datasourceName":"","type":},"sinkConnection":{"datasourceId":,"datasourceName":"","type":1},"jobs":[{"source":{"dataObject":"
","namespace":"","params":,"columns":[...source columns as-is...]},"sink":{"dataObject":"
","namespace":"","params":{"dsType":1,"writeMode":"${recommendedWriteMode}","operatorType":"sink","table":"
","database":"","is_partition":false},"columns":[...same columns with Lakehouse types...]},"setting":{"parallelism":1,"errorLimit":{"maxCount":-1,"collectDirtyData":true,"record":-1}},"columnMapping":{"":"",...}}]}`) + lines.push(`Note: columnMapping format is sink_column: source_column (e.g. if source has uppercase "ID" and sink has lowercase "id", write {"id":"ID"}).`) return lines.join("\n") })(), @@ -2075,13 +1977,13 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "save-offline-sync ", - "Save INTEGRATION task configuration with agent-generated field mapping", + "Save INTEGRATION task field mapping config. Pass the config JSON built from create-offline-sync schema output", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("config", { type: "string", demandOption: true, - describe: "Integration config JSON (output from agent after reviewing integration-schema). Must contain templateKey, sourceConnection, sinkConnection, jobs[].", + describe: "Field mapping config JSON. Must contain templateKey, sourceConnection, sinkConnection, jobs[]. Use 'task offline-sync-schema' to get column info for building this config.", }) .option("vc", { type: "string", describe: "VCluster for execution" }) .option("target-schema", { type: "string", default: "public", describe: "Target schema (used in adhocConfigs)" }) @@ -2102,6 +2004,30 @@ export function registerTaskCommand(cli: Argv): void { if (!integrationConfig.jobs || !integrationConfig.sourceConnection || !integrationConfig.sinkConnection) { error("INVALID_ARGUMENTS", "--config JSON must have templateKey, sourceConnection, sinkConnection, and jobs fields.", { format, exitCode: 2 }); return } + // SQL Server: reject unsupported FlinkX types before saving + const srcConnType = Number((integrationConfig.sourceConnection as Record)?.type ?? 0) + const SS_UNSUPPORTED = new Set(["sql_variant", "image", "binary", "varbinary", "timestamp"]) + if (srcConnType === 8) { + const jobs = Array.isArray(integrationConfig.jobs) ? integrationConfig.jobs as Record[] : [] + const badCols: string[] = [] + for (const job of jobs) { + const srcCols = (Array.isArray((job.source as Record)?.columns) + ? (job.source as Record).columns : []) as Record[] + for (const col of srcCols) { + if (SS_UNSUPPORTED.has(String(col.type ?? "").toLowerCase())) { + badCols.push(`${col.name} (${col.type})`) + } + } + } + if (badCols.length > 0) { + error("UNSUPPORTED_COLUMN_TYPE", + `SQL Server source contains FlinkX-unsupported column types: ${badCols.join(", ")}.\n` + + `Remove these columns from both source.columns and sink.columns in the config, then retry.\n` + + `Unsupported types: sql_variant, image, binary, varbinary, timestamp (rowversion).`, + { format, exitCode: 2 }) + return + } + } const vcCode = (argv.vc as string | undefined) ?? "DEFAULT" // Resolve VC name to numeric ID for scheduler let etlVcId: string | number | undefined @@ -2131,9 +2057,12 @@ export function registerTaskCommand(cli: Argv): void { adhocConfigs, }) } else { - await saveTaskContent(sc, { + // INTEGRATION tasks require collectType:1 (not 0 used by saveTaskContent) + await studioRequest(sc, "/ide-admin/v1/dataFileConfiguration/saveDataFileConfiguration", { dataFileId: fileId, dataFileContent: JSON.stringify(integrationConfig), + collectType: 1, + onlySaveContent: 1, projectId: sc.projectId, updateBy: String(sc.userId), instanceName: sc.instanceName, @@ -2190,6 +2119,29 @@ export function registerTaskCommand(cli: Argv): void { })() : "" createTableDdl = `CREATE TABLE IF NOT EXISTS ${sinkNs}.${sinkTable} (\n${colDefs}\n)${partitionClause}` + + // Check if target table exists — block save if missing so field mapping is consistent with actual schema + const lhDs = await autoResolveLakehouseDs(sc) + if (lhDs) { + const objResp = await studioRequest(sc, "/ide-authority/v1/projectDataSources/listDataObjects", + { id: lhDs.id, nameSpace: sinkNs }).catch(() => null) + const tableList = ((objResp as Record | null)?.data as Record | null)?.list + // listDataObjects returns list as string[] (table names) or object[] + const tables = Array.isArray(tableList) ? tableList as unknown[] : [] + const tableExists = tables.some((t) => + (typeof t === "string" ? t : String((t as Record).dataObjectName ?? (t as Record).name ?? "")).toLowerCase() === sinkTable.toLowerCase() + ) + if (!tableExists) { + error("TARGET_TABLE_NOT_EXISTS", + `Target table '${sinkNs}.${sinkTable}' does not exist. Create it first, then retry save-offline-sync.\n` + + `Run the following DDL:\n\n${createTableDdl}\n\n` + + ` Write to file and execute: cz-cli sql --file /tmp/create_table.sql --write`, + { format, exitCode: 2 }) + return + } + // Table exists — no DDL needed + createTableDdl = null + } } // Verify the content was actually saved by reading it back @@ -2266,6 +2218,17 @@ export function registerTaskCommand(cli: Argv): void { error("INVALID_ARGUMENTS", `Cannot determine dsType for source datasource '${argv.source}'. Specify a valid datasource.`, { format, exitCode: 2 }); return } + // Validate source dsType is supported for CDC multi-table sync + const CDC_SUPPORTED_TYPES = new Set([5, 7, 8, 17, 18, 19, 21, 22, 25, 26, 39, 40, 46, 48]) + // Note: Oracle (25) is listed in DS_TYPE_MAP but NOT supported for CDC — checkCdcPrereqs will reject it + const CDC_SUPPORTED_MYSQL = "MySQL/TiDB/MariaDB (5,17,18,19,39)" + const CDC_SUPPORTED_PG = "PostgreSQL/Greenplum (7,22,40,46,48)" + const CDC_SUPPORTED_SS = "SQL Server (8)" + const CDC_SUPPORTED_DM = "DM (26)" + if (!CDC_SUPPORTED_TYPES.has(sourceDs.dsType)) { + error("UNSUPPORTED_DATASOURCE", `Datasource '${argv.source}' (dsType=${sourceDs.dsType}) is not supported for CDC multi-table sync. Supported: ${CDC_SUPPORTED_MYSQL}, ${CDC_SUPPORTED_PG}, ${CDC_SUPPORTED_SS}, ${CDC_SUPPORTED_DM}.`, { format, exitCode: 2 }); return + } + // CDC prerequisite check (skip with --skip-check) if (!(argv["skip-check"] as boolean)) { const check = await checkCdcPrereqs(sc, sourceDs as { id: number; name: string; dsType: number }, String(argv.source)) @@ -2281,7 +2244,6 @@ export function registerTaskCommand(cli: Argv): void { error("INVALID_ARGUMENTS", `CDC multi-table sync only supports Lakehouse as target. '${targetDs.name}' is not a Lakehouse datasource.`, { format, exitCode: 2 }); return } targetDsId = targetDs.id - targetDsType = 1 } else { // Auto-find Lakehouse datasource matching current workspace const lhDs = await autoResolveLakehouseDs(sc) @@ -2343,7 +2305,7 @@ export function registerTaskCommand(cli: Argv): void { instanceName: sc.workspaceName, etlVcCode: vcName, ...(resolvedVcId != null && { etlVcId: resolvedVcId }), - activeStartTime: new Date().toISOString().slice(0, 10) + "T00:00:00.000Z", + activeStartTime: formatIsoStartOfDay(undefined), activeEndTime: "2099-01-01T00:00:00.000Z", }).catch(() => null) } @@ -2371,7 +2333,7 @@ export function registerTaskCommand(cli: Argv): void { "Save cron schedule configuration (preserves non-cron settings)", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("cron", { type: "string", demandOption: true, describe: "Cron expression — ClickZetta uses 7-field format: second minute hour day month weekday year (e.g. '0 30 9 * * ? *' = daily 09:30)" }) .option("vc", { type: "string", describe: "Virtual cluster code" }) .option("vc-id", { type: "string", describe: "Virtual cluster ID" }) @@ -2387,8 +2349,12 @@ export function registerTaskCommand(cli: Argv): void { return } // Fetch existing config to merge - const oldResp = await getTaskConfigDetail(sc, { projectId: sc.projectId, workspaceId: sc.workspaceId, dataFileId: fileId }) + const [oldResp, detailResp] = await Promise.all([ + getTaskConfigDetail(sc, { projectId: sc.projectId, workspaceId: sc.workspaceId, dataFileId: fileId }), + getTaskDetail(sc, fileId), + ]) const oldData = (oldResp.data && typeof oldResp.data === "object" ? oldResp.data : {}) as Record + const detailData = (detailResp.data && typeof detailResp.data === "object" ? detailResp.data : {}) as Record // Build configProperties with schedule times from UI param const oldConfigProps = (() => { const raw = oldData.configProperties @@ -2401,6 +2367,13 @@ export function registerTaskCommand(cli: Argv): void { if (cronResult.uiParam.scheduleStartTime) oldConfigProps["scheduleStartTime"] = cronResult.uiParam.scheduleStartTime if (cronResult.uiParam.scheduleEndTime) oldConfigProps["scheduleEndTime"] = cronResult.uiParam.scheduleEndTime + // DDL guard: tasks with DDL content should not be scheduled (build/manage separation) + const fileContent = (detailData.fileContent as string | undefined) ?? (oldData.fileContent as string | undefined) ?? "" + if (/\b(CREATE|DROP|ALTER|TRUNCATE)\s+(TABLE|VIEW|SCHEMA|DATABASE)\b/i.test(fileContent)) { + error("DDL_TASK_SCHEDULE_FORBIDDEN", "DDL tasks (CREATE/DROP/ALTER TABLE) should not be scheduled. Keep the task in DRAFT state and run manually. Use cz-cli task execute to run once.", { format, exitCode: 2 }) + return + } + const vcCodeFinal = (argv.vc as string | undefined) ?? (oldData.etlVcCode as string | undefined) ?? "DEFAULT" // Resolve VC name to numeric ID for scheduler (mirrors frontend vcluster/list lookup) let etlVcIdFinal: number | string | undefined = argv["vc-id"] != null ? Number(argv["vc-id"]) : (oldData.etlVcId as number | undefined) @@ -2441,17 +2414,17 @@ export function registerTaskCommand(cli: Argv): void { "Save non-cron task configuration (retry, deps, VC, timeout — preserves cron)", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("retry-count", { type: "number", describe: "Max retry attempts" }) .option("retry-interval", { type: "number", describe: "Retry interval value" }) - .option("retry-unit", { type: "string", describe: "Retry interval unit (m/s)" }) + .option("retry-unit", { type: "string", describe: "Retry interval unit: m=minutes, s=seconds" }) .option("rerun-property", { type: "number", describe: "Rerun policy: 1=ANY_TIME, 2=FAILED_ONLY, 3=NOT_RERUN" }) .option("self-depends", { type: "number", describe: "Self dependency: 0=no, 1=yes" }) .option("vc", { type: "string", describe: "Virtual cluster code" }) .option("vc-id", { type: "string", describe: "Virtual cluster ID" }) .option("schema", { type: "string", describe: "Schema name" }) .option("timeout", { type: "number", describe: "Execute timeout" }) - .option("timeout-unit", { type: "string", describe: "Timeout unit (m/s)" }) + .option("timeout-unit", { type: "string", describe: "Timeout unit: m=minutes, s=seconds" }) .option("deps", { type: "string", choices: ["keep", "replace", "clear"], describe: "Dependency action" }) .option("dep-tasks", { type: "string", describe: "Dependency tasks JSON array. Each item requires taskId (number) and taskName (string), e.g. '[{\"taskId\":123,\"taskName\":\"upstream_task\"}]'" }) .option("param", { type: "array", string: true, describe: "Set task param: key=value (can repeat)" }), @@ -2547,7 +2520,7 @@ export function registerTaskCommand(cli: Argv): void { "Show task dependencies (draft state)", (y) => y - .positional("task", { type: "string", demandOption: true }), + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }), async (argv) => { const format = argv.format try { @@ -2587,7 +2560,7 @@ export function registerTaskCommand(cli: Argv): void { "Publish/online a task", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .version(false) .option("yes", { alias: "y", type: "boolean", default: false, describe: "Skip confirmation" }), async (argv) => { @@ -2636,8 +2609,7 @@ export function registerTaskCommand(cli: Argv): void { error("NO_INTEGRATION_CONFIG", `INTEGRATION task has no field mapping configured. Run:\n` + ` cz-cli task offline-sync-schema ${fileId} --source --source-db --source-table
\n` + - ` # Agent maps types, then:\n` + - ` cz-cli task save-offline-sync ${fileId} --config ''\n` + + ` # then: cz-cli task save-offline-sync ${fileId} --config ''\n` + ` cz-cli task save-cron ${fileId} --cron '0 2 * * *' --vc `, { format, exitCode: 2 }); return } @@ -2668,7 +2640,7 @@ export function registerTaskCommand(cli: Argv): void { if (!ok) { success( { - message: "Cancelled by user. No online action was executed.", + message: "Requires -y to proceed. No action was executed.", action: "task.online", executed: false, }, @@ -2695,16 +2667,16 @@ export function registerTaskCommand(cli: Argv): void { "Start a CDC/streaming task (MULTI_REALTIME, REALTIME, STREAMING types)", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("startup-mode", { type: "number", default: 0, - describe: "0=无状态启动 (default), 1=从上次保存状态恢复, 4=自定义起始位置", + describe: "0=stateless start (default), 1=resume from last saved state, 4=custom start position (requires --config)", }) .option("engine-type", { type: "number", default: 5, - describe: "Engine type (default: 5)", + describe: "Flink engine type (5=default, do not change unless instructed)", }) .option("snapshot", { type: "boolean", @@ -2719,13 +2691,13 @@ export function registerTaskCommand(cli: Argv): void { .option("blacklist-strategy", { type: "number", default: 2, - describe: "Blacklist strategy (default: 2)", + describe: "Blacklist handling strategy (2=default, do not change unless instructed)", }) .option("config", { type: "string", describe: "JSON string for custom startup position (required when --startup-mode=4). " + - 'Array of {datasourceId, startupMode (2=指定时间|3=指定文件), startTimestamp|file+pos}. ' + + 'Array of {datasourceId, startupMode (2=by timestamp|3=by binlog file), startTimestamp|file+pos}. ' + 'Example: \'[{"datasourceId":123,"startupMode":2,"startTimestamp":"1718000000000"}]\'', }), async (argv) => { @@ -2782,7 +2754,7 @@ export function registerTaskCommand(cli: Argv): void { .command( "stop ", "Stop a CDC/streaming task (MULTI_REALTIME, REALTIME, STREAMING types)", - (y) => y.positional("task", { type: "string", demandOption: true }), + (y) => y.positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }), async (argv) => { const format = argv.format try { @@ -2812,7 +2784,7 @@ export function registerTaskCommand(cli: Argv): void { "Take a task offline (clears all run instances, irreversible)", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("with-downstream", { type: "boolean", default: false, describe: "Also offline downstream tasks" }) .option("yes", { alias: "y", type: "boolean", default: false, describe: "Skip confirmation" }), async (argv) => { @@ -2827,7 +2799,7 @@ export function registerTaskCommand(cli: Argv): void { if (!ok) { success( { - message: "Cancelled by user. No offline action was executed.", + message: "Requires -y to proceed. No action was executed.", action: "task.offline", executed: false, }, @@ -3049,11 +3021,11 @@ export function registerTaskCommand(cli: Argv): void { execution_status: STATUS_NAME[statusCode as number] ?? statusCode, } const aiMessage = - `临时执行完成(task_id=${fileId},run_id=${runInstanceId})。Notice: 这是一次临时执行,不影响调度计划。` + + `Ad-hoc execution complete (task_id=${fileId}, run_id=${runInstanceId}). This is a one-time run and does not affect the schedule.` + (argv["save-params"] && Object.keys(cliParams).length > 0 - ? `--param 值已通过 --save-params 写回任务 paramValueList,调度运行将使用这些参数值。` - : `--param 传入的参数值仅对本次执行有效,调度运行使用任务配置中保存的参数(通过 cz-cli task save-content --params 设置)。`) + - `如需将当前脚本提升为正式调度,请在用户确认后执行: cz-cli task online ${fileId} -y` + ? ` --param values have been saved back to the task via --save-params and will be used by scheduled runs.` + : ` --param values apply to this run only. Scheduled runs use the params saved in the task config (set via cz-cli task save-content --params).`) + + ` To promote the current script to the schedule, run: cz-cli task online ${fileId} -y` if (statusCode === 3 || failMsg) { error("EXECUTE_FAILED", String(failMsg ?? `Task execution ${runInstanceId} failed`), { format }) return @@ -3091,12 +3063,12 @@ export function registerTaskCommand(cli: Argv): void { } }, ) - .command("flow", "Flow task operations", (flowYargs) => + .command("flow", "Operations for composite Flow tasks (DAGs of SQL/Python/Shell nodes running in dependency order). Run 'cz-cli task flow --help' for the full workflow.", (flowYargs) => flowYargs .command( "dag ", - "Get flow DAG", - (y) => y.positional("task", { type: "string", demandOption: true }), + "Show the node dependency graph of a composite task (Flow)", + (y) => y.positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }), async (argv) => { const format = argv.format try { @@ -3112,10 +3084,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "create-node ", - "Add node to flow", + "Add a SQL/Python/Shell node to a composite task (Flow). Step 2 after 'task create --type FLOW'", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("name", { type: "string", demandOption: true }) .option("type", { type: "string", default: "sql", describe: "SQL/PYTHON/SHELL/SPARK" }) .option("description", { type: "string" }) @@ -3142,7 +3114,7 @@ export function registerTaskCommand(cli: Argv): void { }) logOperation("task flow create-node", { ok: true }) const createdNodeId = (resp.data as Record)?.id - success(resp.data, { format, aiMessage: `Node created (id=${createdNodeId}). Next steps:\n1. Save content: cz-cli task flow node-save ${argv.task} --node-id ${createdNodeId} --content ''\n2. Set VC/schema: cz-cli task flow node-save-config ${argv.task} --node-id ${createdNodeId} --vc \n3. Bind dependencies if needed: cz-cli task flow bind ${argv.task} --upstream --downstream \n4. Publish: cz-cli task flow submit ${argv.task}` }) + success(resp.data, { format, aiMessage: `Node created (id=${createdNodeId}). Next steps:\n1. Save content: cz-cli task flow node-save ${argv.task} --node-id ${createdNodeId} --content ''\n2. Set VC/schema: cz-cli task flow node-save-config ${argv.task} --node-id ${createdNodeId} --vc \n3. Bind dependencies if needed: cz-cli task flow bind ${argv.task} --upstream --downstream \n4. Publish: cz-cli task flow submit ${argv.task} (then verify in Studio UI — [Known issue: CLI submit may not fully publish — verify in Studio UI and click Submit manually if the flow status does not change to Published])` }) } catch (err) { reportTaskError(err, format) } @@ -3150,10 +3122,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "remove-node ", - "Remove node from flow", + "Remove a node from a composite task (Flow)", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("name", { type: "string", describe: "Node name" }) .option("node-id", { type: "number", describe: "Node ID" }), async (argv) => { @@ -3179,10 +3151,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "bind ", - "Create dependency between flow nodes", + "Set a dependency between two nodes in a composite task: --upstream --downstream means node_b waits for node_a to finish", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("upstream", { type: "string", demandOption: true, describe: "Upstream node name" }) .option("downstream", { type: "string", demandOption: true, describe: "Downstream node name" }), async (argv) => { @@ -3209,8 +3181,36 @@ export function registerTaskCommand(cli: Argv): void { dependencyNodeId: upstreamNodeId, dependencyProjectId: sc.projectId, }) + // Verify the edge actually persisted — bind API has async write behavior + let verified = false + for (let attempt = 0; attempt < 3; attempt++) { + if (attempt > 0) await new Promise((r) => setTimeout(r, 800)) + const verifyResp = await getFlowDag(sc, fileId) + const verifyNodes = (verifyResp.data ?? []) as Record[] + const verifyDownstream = verifyNodes.find((n) => Number(n.id) === downstreamNodeId) + const verifyDeps = (verifyDownstream?.dependencies ?? []) as Record[] + if (verifyDeps.some((d) => Number(d.dependencyNodeId) === upstreamNodeId)) { + verified = true; break + } + // Not persisted yet — retry bind + if (attempt < 2) { + await bindFlowNode(sc, { + currentFileId: fileId, + currentNodeId: downstreamNodeId, + currentProjectId: sc.projectId, + dependencyFileId: fileId, + dependencyNodeId: upstreamNodeId, + dependencyProjectId: sc.projectId, + }).catch(() => null) + } + } + logOperation("task flow bind", { ok: verified }) + if (!verified) { + success({ ...resp.data as object, warning: "Bind API returned success but edge not confirmed in DAG. Run 'cz-cli task flow dag' to check, and retry if missing." }, { format, aiMessage: `Dependency bind returned success but could not be verified in DAG. Retry: cz-cli task flow bind ${argv.task} --upstream ${argv.upstream} --downstream ${argv.downstream}` }) + return + } logOperation("task flow bind", { ok: true }) - success(resp.data, { format, aiMessage: `Dependency bound. Add more bindings as needed, then publish: 'cz-cli task flow submit ${argv.task}'.` }) + success(resp.data, { format, aiMessage: `Dependency bound. Add more bindings as needed, then publish: 'cz-cli task flow submit ${argv.task}'. [Known issue: CLI submit may not fully publish — verify in Studio UI and click Submit manually if the flow status does not change to Published].` }) } catch (err) { reportTaskError(err, format) } @@ -3218,10 +3218,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "unbind ", - "Remove dependency between flow nodes", + "Remove a dependency between two nodes in a composite task: pass --upstream and --downstream by name (IDs resolved automatically from DAG)", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("upstream", { type: "string", describe: "Upstream node name", demandOption: true }) .option("downstream", { type: "string", describe: "Downstream node name", demandOption: true }), async (argv) => { @@ -3246,7 +3246,7 @@ export function registerTaskCommand(cli: Argv): void { fileId, }) logOperation("task flow unbind", { ok: true }) - success(resp.data, { format, aiMessage: `Dependency removed. Verify DAG with 'cz-cli task flow dag ${argv.task}', then publish: 'cz-cli task flow submit ${argv.task}'.` }) + success(resp.data, { format, aiMessage: `Dependency removed. Verify DAG with 'cz-cli task flow dag ${argv.task}', then publish: 'cz-cli task flow submit ${argv.task}'. [Known issue: CLI submit may not fully publish — verify in Studio UI and click Submit manually if the flow status does not change to Published].` }) } catch (err) { reportTaskError(err, format) } @@ -3254,10 +3254,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "node-detail ", - "Get flow node detail", + "Get the content and config of a single node in a composite task", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("node-id", { type: "number", describe: "Node ID" }) .option("name", { type: "string", describe: "Node name (resolved via DAG)" }), async (argv) => { @@ -3280,14 +3280,14 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "node-save ", - "Save flow node script content", + "Write SQL/Python/Shell script to a node in a composite task. Identify the node by --node-id or --name", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("node-id", { type: "number", describe: "Node ID" }) .option("name", { type: "string", describe: "Node name (resolved via DAG)" }) - .option("content", { type: "string" }) - .option("file", { alias: "f", type: "string" }) + .option("content", { type: "string", describe: "Script content (SQL/Python/Shell). Mutually exclusive with --file" }) + .option("file", { alias: "f", type: "string", describe: "Read script content from a file. Mutually exclusive with --content" }) .option("param", { type: "array", string: true, describe: "Set param with manual default: key=value" }) .option("flow-param", { type: "array", string: true, describe: "Set param inherited from parent flow: key (no value needed)" }), async (argv) => { @@ -3336,7 +3336,7 @@ export function registerTaskCommand(cli: Argv): void { ...(allParams.length > 0 && { paramValueList: allParams }), }) logOperation("task flow node-save", { ok: true }) - success(resp.data, { format, aiMessage: `Node content saved. Next: set VC/schema with 'cz-cli task flow node-save-config ${argv.task} --name ${argv.name ?? argv["node-id"]} --vc ', then bind dependencies with 'flow bind' if needed, then publish with 'cz-cli task flow submit ${argv.task}'.` }) + success(resp.data, { format, aiMessage: `Node content saved. Next: set VC/schema with 'cz-cli task flow node-save-config ${argv.task} --name ${argv.name ?? argv["node-id"]} --vc ', then bind dependencies with 'flow bind' if needed, then publish with 'cz-cli task flow submit ${argv.task}'. [Known issue: CLI submit may not fully publish — verify in Studio UI and click Submit manually if the flow status does not change to Published].` }) } catch (err) { reportTaskError(err, format) } @@ -3344,15 +3344,15 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "node-save-config ", - "Save flow node schedule config", + "Configure VC, schema, retry, and cron for a node in a composite task. Required before flow submit", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("node-id", { type: "number", describe: "Node ID" }) .option("name", { type: "string", describe: "Node name (resolved via DAG)" }) - .option("cron", { type: "string" }) - .option("vc", { type: "string" }) - .option("schema", { type: "string" }), + .option("cron", { type: "string", describe: "Cron expression (7-field format: sec min hr dom mon dow year). e.g. '0 30 9 * * ? *' = daily 09:30" }) + .option("vc", { type: "string", describe: "Virtual cluster code for this node (run: cz-cli sql --sync 'SHOW VCLUSTERS' to list)" }) + .option("schema", { type: "string", describe: "Default schema name for this node" }), async (argv) => { const format = argv.format try { @@ -3374,7 +3374,7 @@ export function registerTaskCommand(cli: Argv): void { schemaName: argv.schema, }) logOperation("task flow node-save-config", { ok: true }) - success(resp.data, { format, aiMessage: `Node config saved. Repeat node-save/node-save-config for other nodes. Then bind dependencies: 'cz-cli task flow bind ${argv.task} --upstream --downstream '. When all nodes are ready, publish: 'cz-cli task flow submit ${argv.task}'.` }) + success(resp.data, { format, aiMessage: `Node config saved. Repeat node-save/node-save-config for other nodes. Then bind dependencies: 'cz-cli task flow bind ${argv.task} --upstream --downstream '. When all nodes are ready, publish: 'cz-cli task flow submit ${argv.task}'. [Known issue: CLI submit may not fully publish — verify in Studio UI and click Submit manually if the flow status does not change to Published].` }) } catch (err) { reportTaskError(err, format) } @@ -3382,10 +3382,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "submit ", - "Submit/publish flow (saves schedule config)", + "[Known issue] Save and attempt to publish the composite task (Flow). CLI submit may not fully publish — verify result in Studio UI and click Submit manually if the flow status does not change to Published", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("vc", { type: "string", describe: "Virtual cluster code" }) .option("cron", { type: "string", describe: "Cron expression" }) .option("retry-count", { type: "number", describe: "Retry count" }), @@ -3428,13 +3428,16 @@ export function registerTaskCommand(cli: Argv): void { schemaName: (oldData.schemaName as string | undefined) ?? "public", }), }) - // Poll for fileFlowStatus=100 && deployStatus=1 + // Poll for published state: deployStatus=1 (deployed) or deployStatus=3 (deployed with pending changes) + // fileFlowStatus=80=submitted, 100=deployed. Flow may settle at 80/deployStatus=3 which is still runnable. let published = false for (let i = 0; i < 10; i++) { await new Promise((r) => setTimeout(r, 1500)) const detail = await getTaskDetail(sc, fileId) const d = (detail.data ?? {}) as Record - if (Number(d.fileFlowStatus) === 100 && Number(d.deployStatus) === 1) { + const ffs = Number(d.fileFlowStatus) + const ds = Number(d.deployStatus) + if ((ffs === 100 && ds === 1) || (ffs === 80 && (ds === 1 || ds === 3))) { published = true; break } } @@ -3443,7 +3446,7 @@ export function registerTaskCommand(cli: Argv): void { task_id: fileId, published, studio_url: studioUrl(sc, fileId), - }, { format, aiMessage: published ? "Flow submitted successfully. Run with: 'cz-cli task flow run '." : "Flow saved but publish status not confirmed within timeout. Check Studio or retry 'cz-cli task flow submit '." }) + }, { format, aiMessage: published ? "Flow config saved. [Known issue: CLI submit may not fully publish — verify in Studio UI and click Submit manually if the flow status does not change to Published]. If published, run with: 'cz-cli task flow run '." : "Flow config saved but publish not confirmed. [Known issue: CLI submit may not fully publish — verify in Studio UI and click Submit manually if the flow status does not change to Published]. Then run with: 'cz-cli task flow run '." }) } catch (err) { reportTaskError(err, format) } @@ -3451,10 +3454,10 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "run ", - "Execute flow ad-hoc", + "Run a composite task immediately outside the schedule. Returns a schedule_instance_id — use flow instances to view node-level run status", (y) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("vc", { type: "string", describe: "Virtual cluster code (default: DEFAULT)" }) .option("param", { type: "array", string: true, describe: "Override param value: key=value (can repeat)" }), async (argv) => { @@ -3525,11 +3528,11 @@ export function registerTaskCommand(cli: Argv): void { ) .command( "instances ", - "List flow node instances", + "List node-level run records for a composite task instance. --flow-instance is the schedule_instance_id returned by flow run", (y) => y - .positional("task", { type: "string", demandOption: true }) - .option("instance", { type: "number", demandOption: true, describe: "Flow instance ID" }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) + .option("flow-instance", { type: "number", demandOption: true, describe: "Flow instance ID" }) .option("node-id", { type: "number", describe: "Flow node ID" }) .option("node-instance-id", { type: "number", describe: "Flow node instance ID" }), async (argv) => { @@ -3539,7 +3542,7 @@ export function registerTaskCommand(cli: Argv): void { const fileId = await resolveTaskId(sc, argv.task as string, format) const resp = await listFlowInstances(sc, { flowId: fileId, - flowInstanceId: argv.instance as number | undefined, + flowInstanceId: argv["flow-instance"] as number | undefined, flowNodeId: argv["node-id"] as number | undefined, flowNodeInstanceId: argv["node-instance-id"] as number | undefined, }) @@ -3572,11 +3575,21 @@ export function registerTaskCommand(cli: Argv): void { if (!argv.yes) { const ok = await confirm(`Delete folder ${folderId}? This is irreversible.`) if (!ok) { - success({ message: "Cancelled by user.", action: "task.delete-folder", executed: false }, { format }) + success({ message: "Requires -y to proceed. No action was executed.", action: "task.delete-folder", executed: false }, { format }) return } } - const resp = await deleteFolder(sc, { folderId, projectId: sc.projectId }) + const resp = await deleteFolder(sc, { folderId, projectId: sc.projectId }).catch(async (e: unknown) => { + const msg = e instanceof Error ? e.message : String(e) + if (msg.includes("存在文件")) { + const listResp = await listTasks(sc, { projectId: sc.projectId, page: 1, pageSize: 50, folderId }).catch(() => null) + const data = (listResp?.data && typeof listResp.data === "object" ? listResp.data : {}) as Record + const tasks = (Array.isArray(data.list) ? data.list : []) as Record[] + const names = tasks.map((t) => `${t.id ?? t.task_id}: ${t.dataFileName ?? t.fileName ?? t.task_name}`).join(", ") + handledError("FOLDER_NOT_EMPTY", `Folder ${folderId} still contains ${tasks.length} task(s): ${names || "(unable to list)"}. Delete or move them first.`, { format }) + } + throw e + }) logOperation("task delete-folder", { ok: true }) success(resp, { format }) } catch (err) { @@ -3599,7 +3612,7 @@ export function registerTaskCommand(cli: Argv): void { if (!argv.yes) { const ok = await confirm(`Delete task ${taskId}? This is irreversible.`) if (!ok) { - success({ message: "Cancelled by user.", action: "task.delete", executed: false }, { format }) + success({ message: "Requires -y to proceed. No action was executed.", action: "task.delete", executed: false }, { format }) return } } @@ -3625,7 +3638,13 @@ export function registerTaskCommand(cli: Argv): void { "/ide-admin/v1/scheduleTask/getDetail", { scheduleTaskId: fileId, projectId: sc.projectId }, { env: "prod" }, - ) + ).catch((e: unknown) => { + const msg = e instanceof Error ? e.message : String(e) + if (msg.includes("外部调用异常") || msg.includes("SYSTEM_EXCEPTION")) { + handledError("TASK_NOT_DEPLOYED", `Task is not deployed — no schedule info available. Run: cz-cli task deploy ${fileId} -y`, { format }) + } + throw e + }) logOperation("task schedule-info", { ok: true }) success(resp.data, { format, aiMessage: "This is the deployed (published) schedule state. For draft config use: cz-cli task content " }) } catch (err) { @@ -3676,7 +3695,9 @@ export function registerTaskCommand(cli: Argv): void { ...(argv.end ? { scheduleEndTime: argv.end as string } : {}), scheduleEnv: "prod", }) - const times = Array.isArray(resp.data) ? resp.data : [] + const apiTimes = Array.isArray(resp.data) ? resp.data as string[] : [] + // API may return empty — fallback to local calculation + const times = apiTimes.length > 0 ? apiTimes : cronNextRuns(cronExpress, argv.count as number) const limited = times.slice(0, argv.count as number) logOperation("task cron-preview", { ok: true }) success({ cron: cronExpress, next_runs: limited, count: limited.length }, { @@ -3689,11 +3710,11 @@ export function registerTaskCommand(cli: Argv): void { }, ) .command( - "search", + "search [name]", "Search tasks by name with resolved folder path", (y) => y - .option("name", { type: "string", describe: "Task name (fuzzy match)" }) + .positional("name", { type: "string", describe: "Task name (fuzzy match)" }) .option("type", { type: "string", describe: "Task type filter: SQL, PYTHON, SHELL, JDBC, etc." }) .option("status", { type: "string", @@ -3833,7 +3854,7 @@ export function registerTaskCommand(cli: Argv): void { extra: { total_matched: filtered.length, limit }, aiMessage: t("task_search_result", filtered.length, - filtered.length >= limit ? `(已截断,最多显示 ${limit} 条 / truncated at ${limit})` : "", + filtered.length >= limit ? ` (truncated at ${limit})` : "", ), }) } catch (err) { @@ -4020,7 +4041,7 @@ export function registerTaskCommand(cli: Argv): void { false as unknown as string, (y: Argv) => y - .positional("task", { type: "string", demandOption: true }) + .positional("task", { type: "string", demandOption: true, describe: "Task name or ID" }) .option("content", { type: "string" }) .option("file", { alias: "f", type: "string" }), async (argv: Record) => { diff --git a/packages/cz-cli/src/cron-adapter.ts b/packages/cz-cli/src/cron-adapter.ts index 6f960bc44..477405e95 100644 --- a/packages/cz-cli/src/cron-adapter.ts +++ b/packages/cz-cli/src/cron-adapter.ts @@ -168,6 +168,78 @@ function encodeFromUi(param: UiParam): string { return crontab.join(" ") } +/** Compute next N run times for a 7-field Quartz cron (sec min hr dom mon dow year). */ +export function cronNextRuns(expr: string, count = 5, from?: Date): string[] { + let fields: CronFields + try { fields = parseCron(expr) } catch { return [] } + + const DOW_NAMES: Record = { SUN:1,MON:2,TUE:3,WED:4,THU:5,FRI:6,SAT:7 } + const MON_NAMES: Record = { JAN:1,FEB:2,MAR:3,APR:4,MAY:5,JUN:6,JUL:7,AUG:8,SEP:9,OCT:10,NOV:11,DEC:12 } + + function normalize(field: string, nameMap: Record): string { + return field.replace(/[A-Z]+/g, (m) => String(nameMap[m] ?? m)) + } + + function expand(field: string, min: number, max: number, nameMap?: Record): Set { + const f = nameMap ? normalize(field.toUpperCase(), nameMap) : field + if (f === "*" || f === "?") { + const s = new Set() + for (let i = min; i <= max; i++) s.add(i) + return s + } + const s = new Set() + for (const part of f.split(",")) { + if (part.includes("/")) { + const [rangeStr, stepStr] = part.split("/") + const step = parseInt(stepStr, 10) + const [lo, hi] = rangeStr === "*" ? [min, max] : rangeStr.split("-").map(Number) + for (let v = lo ?? min; v <= (hi ?? max); v += step) s.add(v) + } else if (part.includes("-")) { + const [lo, hi] = part.split("-").map(Number) + for (let v = lo; v <= hi; v++) s.add(v) + } else { + const n = parseInt(part, 10) + if (!isNaN(n)) s.add(n) + } + } + return s + } + + const seconds = expand(fields.second, 0, 59) + const minutes = expand(fields.minute, 0, 59) + const hours = expand(fields.hour, 0, 23) + const months = expand(fields.month, 1, 12, MON_NAMES) + // day-of-week: 1=Sun…7=Sat in Quartz; JS getDay(): 0=Sun…6=Sat + const dowField = fields.week === "?" ? null : expand(fields.week, 1, 7, DOW_NAMES) + const domField = fields.day === "?" ? null : expand(fields.day, 1, 31) + + const results: string[] = [] + // Start from next second after `from` + const start = new Date(from ?? new Date()) + start.setMilliseconds(0) + start.setSeconds(start.getSeconds() + 1) + + let d = new Date(start) + const limit = new Date(start) + limit.setFullYear(limit.getFullYear() + 2) // search max 2 years ahead + + while (results.length < count && d < limit) { + if (!months.has(d.getMonth() + 1)) { d.setDate(1); d.setHours(0,0,0,0); d.setMonth(d.getMonth() + 1); continue } + if (domField && !domField.has(d.getDate())) { d.setDate(d.getDate() + 1); d.setHours(0,0,0,0); continue } + if (dowField) { + const jsDay = d.getDay() // 0=Sun + const quartzDay = jsDay === 0 ? 1 : jsDay + 1 // Sun=1, Mon=2…Sat=7 + if (!dowField.has(quartzDay)) { d.setDate(d.getDate() + 1); d.setHours(0,0,0,0); continue } + } + if (!hours.has(d.getHours())) { d.setHours(d.getHours() + 1); d.setMinutes(0,0,0); continue } + if (!minutes.has(d.getMinutes())) { d.setMinutes(d.getMinutes() + 1); d.setSeconds(0,0); continue } + if (!seconds.has(d.getSeconds())) { d.setSeconds(d.getSeconds() + 1); d.setMilliseconds(0); continue } + results.push(d.toISOString().replace("T", " ").replace(/\.\d+Z$/, " UTC")) + d.setSeconds(d.getSeconds() + 1) + } + return results +} + /** * Convert agent cron expression to Studio-compatible format. * Validates, decodes to UI params, re-encodes with constraints.