diff --git a/CLAUDE.md b/CLAUDE.md index 67140f9..4822baf 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -41,10 +41,12 @@ Bun + TypeScript + Ink TUI for Dagster collection-flow pipelines. TS port of the ## Available commands -`status`, `runs`, `run`, `logs`, `errors`, `tail`, `materialise`, `cancel`, `recheck`, `reload`, `stale`, `sensors`, `asset`, `graph`, `config`, `diff`, `inspect`, `sample`, `es-check`, `new-asset`, `start`, `debug`, `duckdb`. (23 commands.) +`status`, `runs`, `run`, `logs`, `errors`, `tail`, `materialise`, `launch`, `cancel`, `recheck`, `reload`, `stale`, `sensors`, `asset`, `graph`, `config`, `diff`, `inspect`, `sample`, `es-check`, `new-asset`, `start`, `debug`, `duckdb`. (24 commands.) TUI menu items: Runs, Assets, Jobs, Sensors, Elasticsearch (gated on `ELASTICSEARCH_*`/`ELASTICO_*` env vars), DuckDB (spawns `colflow duckdb` in a new terminal), Reload Dagster. +`launch` and `materialise` accept run config like `dg`: `--config ` (JSON or YAML, repeatable, shallow-merged left to right) and `--config-json ` (merged last, wins). Both resolve via `resolveRunConfig` in `src/commands/_runconfig.ts` and flow into the `runConfigData` field of `ExecutionParams` in `launchRun` / `launchAssetRun`. Without either flag the config is `{}`, so prior default-config behaviour is unchanged. Use it to set resource config, e.g. `colflow launch image_qc_pipeline --config-json '{"resources":{"colour_target_qc":{"config":{"use_vision":true}}}}'`. + ## GraphQL schema notes - `metadataEntries` is a union — use typed inline fragments (`IntMetadataEntry`, `TextMetadataEntry`, `PathMetadataEntry`, `JsonMetadataEntry`, `BoolMetadataEntry`, `FloatMetadataEntry`, `MarkdownMetadataEntry`, `UrlMetadataEntry`, `TableSchemaMetadataEntry`, `TableMetadataEntry`). diff --git a/README.md b/README.md index e292d89..6388f34 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,9 @@ colflow status # Latest run summary colflow runs --limit 10 # List recent runs colflow logs # Print run logs colflow materialise asset1 asset2 # Launch a run +colflow launch # Launch a job +colflow launch --config-json '{"resources":{"r":{"config":{"k":true}}}}' # with run config +colflow launch --config run.yaml # run config from a JSON/YAML file colflow inspect output/data.parquet # Inspect Parquet schema colflow es-check # Elasticsearch cluster + indices summary colflow duckdb # Mount parquets and open duckdb --ui @@ -100,7 +103,8 @@ When developing from source, replace `colflow` with `bun run dev` (e.g. `bun run | `logs ` | Print run logs (--step, --level) | | `errors ` | Failures for a run | | `tail ` | Stream run events (--interval) | -| `materialise ...` | Launch a run for assets | +| `materialise ...` | Launch a run for assets (--config, --config-json) | +| `launch ` | Launch a run for a job (--config, --config-json) | | `cancel ` | Cancel a run | | `recheck ...` | Re-run asset checks | | `reload` | Reload Dagster code location | @@ -125,6 +129,8 @@ When developing from source, replace `colflow` with `bun run dev` (e.g. `bun run | `--url ` | Dagster URL (env: DAGSTER_URL, default: http://localhost:3000) | | `--auth ` | Dagster Cloud token (env: DAGSTER_AUTH) | | `--json` | JSON output where supported | +| `--config ` | Run config file, JSON or YAML, for `launch` / `materialise`. Repeatable; files shallow-merge left to right. | +| `--config-json ` | Inline JSON run config for `launch` / `materialise`. Merged last, wins over `--config`. | ## Environment Variables diff --git a/bun.lockb b/bun.lockb index bdb1df2..c354b83 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 8acc00d..a3d3522 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "ink": "^7.0.3", "meow": "^14.1.0", "react": "^19.2.6", + "yaml": "^2.9.0", "zod": "^4.4.3" } } diff --git a/src/cli.tsx b/src/cli.tsx index ff9cc3f..a1c0892 100644 --- a/src/cli.tsx +++ b/src/cli.tsx @@ -70,6 +70,8 @@ const cli = meow( --status Filter (runs) --step Filter (logs) --level Filter (logs) + --config Run config file, JSON or YAML (launch, materialise; repeatable, merged left to right) + --config-json Inline JSON run config (launch, materialise; merged last, wins over --config) --help Show this help `, { @@ -99,6 +101,8 @@ const cli = meow( run1: { type: 'string' }, run2: { type: 'string' }, detach: { type: 'boolean', default: false }, + config: { type: 'string', isMultiple: true }, + configJson: { type: 'string' }, }, }, ) @@ -153,10 +157,24 @@ async function main() { await runErrors({ url, auth, json, id: needArg('id') }) return case 'materialise': - await runMaterialise({ url, auth, json, assets: rest }) + await runMaterialise({ + url, + auth, + json, + assets: rest, + config: cli.flags.config, + configJson: cli.flags.configJson, + }) return case 'launch': - await runLaunch({ url, auth, json, job: cli.flags.job ?? rest[0] ?? '' }) + await runLaunch({ + url, + auth, + json, + job: cli.flags.job ?? rest[0] ?? '', + config: cli.flags.config, + configJson: cli.flags.configJson, + }) return case 'cancel': await runCancel({ url, auth, json, id: needArg('id') }) diff --git a/src/client/index.ts b/src/client/index.ts index 37cc4a7..f1489ea 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -910,7 +910,11 @@ export async function fetchSensors(client: GraphQLClient): Promise r.sensorState) } -export async function launchAssetRun(client: GraphQLClient, assetNames: string[]): Promise { +export async function launchAssetRun( + client: GraphQLClient, + assetNames: string[], + runConfigData: Record = {}, +): Promise { const repo = await getRepository(client) const data = await client.request<{ launchRun: { @@ -926,6 +930,7 @@ export async function launchAssetRun(client: GraphQLClient, assetNames: string[] repositoryLocationName: repo.location.name, }, stepKeys: assetNames, + runConfigData, }, }) const r = data.launchRun @@ -934,7 +939,11 @@ export async function launchAssetRun(client: GraphQLClient, assetNames: string[] return r.run.runId } -export async function launchRun(client: GraphQLClient, jobName: string): Promise { +export async function launchRun( + client: GraphQLClient, + jobName: string, + runConfigData: Record = {}, +): Promise { const repo = await getRepository(client) const data = await client.request<{ launchRun: { @@ -951,7 +960,7 @@ export async function launchRun(client: GraphQLClient, jobName: string): Promise repositoryName: repo.name, repositoryLocationName: repo.location.name, }, - runConfigData: {}, + runConfigData, }, }) const r = data.launchRun diff --git a/src/commands/_runconfig.ts b/src/commands/_runconfig.ts new file mode 100644 index 0000000..abb0ba9 --- /dev/null +++ b/src/commands/_runconfig.ts @@ -0,0 +1,74 @@ +import { readFileSync } from 'node:fs' +import { extname } from 'node:path' +import { parse as parseYaml } from 'yaml' + +export interface RunConfigFlags { + config?: string[] + configJson?: string +} + +/** + * Resolve Dagster run config from CLI flags, matching `dg launch` semantics. + * + * `--config ` may be passed multiple times; files are parsed (JSON or + * YAML by extension) and shallow-merged left to right, so later files win. + * `--config-json ` is parsed as JSON and merged last (highest + * precedence), for the common scripting case where a file is overkill. + * + * Returns `{}` when no config flags are present, preserving prior behaviour. + */ +export function resolveRunConfig(flags: RunConfigFlags): Record { + const paths = flags.config ?? [] + let merged: Record = {} + + for (const p of paths) { + let raw: string + try { + raw = readFileSync(p, 'utf8') + } catch (err) { + throw new Error(`--config: cannot read ${p}: ${(err as Error).message}`) + } + const parsed = parseConfigFile(p, raw) + merged = { ...merged, ...parsed } + } + + if (flags.configJson != null) { + let parsed: unknown + try { + parsed = JSON.parse(flags.configJson) + } catch (err) { + throw new Error(`--config-json: invalid JSON: ${(err as Error).message}`) + } + if (!isObject(parsed)) throw new Error('--config-json: must be a JSON object') + merged = { ...merged, ...parsed } + } + + return merged +} + +function parseConfigFile(path: string, raw: string): Record { + const ext = extname(path).toLowerCase() + let parsed: unknown + if (ext === '.json') { + try { + parsed = JSON.parse(raw) + } catch (err) { + throw new Error(`--config: invalid JSON in ${path}: ${(err as Error).message}`) + } + } else { + // .yaml / .yml / anything else: YAML is a JSON superset, so this also + // parses plain JSON files that lack a .json extension. + try { + parsed = parseYaml(raw) + } catch (err) { + throw new Error(`--config: invalid YAML in ${path}: ${(err as Error).message}`) + } + } + if (!isObject(parsed)) + throw new Error(`--config: ${path} must contain a mapping at the top level`) + return parsed +} + +function isObject(v: unknown): v is Record { + return typeof v === 'object' && v !== null && !Array.isArray(v) +} diff --git a/src/commands/launch.ts b/src/commands/launch.ts index 7f25238..0c8d2d3 100644 --- a/src/commands/launch.ts +++ b/src/commands/launch.ts @@ -1,19 +1,23 @@ import { launchRun, makeClient } from '../client/index.ts' +import { resolveRunConfig } from './_runconfig.ts' interface Opts { url: string auth?: string json: boolean job: string + config?: string[] + configJson?: string } -export async function runLaunch({ url, auth, json, job }: Opts): Promise { +export async function runLaunch({ url, auth, json, job, config, configJson }: Opts): Promise { if (!job) { process.stderr.write('Provide a job name.\n') process.exit(2) } + const runConfig = resolveRunConfig({ config, configJson }) const client = makeClient({ url, auth }) - const runId = await launchRun(client, job) + const runId = await launchRun(client, job, runConfig) if (json) { process.stdout.write(`${JSON.stringify({ runId, job }, null, 2)}\n`) return diff --git a/src/commands/materialise.ts b/src/commands/materialise.ts index 50ffd8c..69f49a4 100644 --- a/src/commands/materialise.ts +++ b/src/commands/materialise.ts @@ -1,19 +1,30 @@ import { launchAssetRun, makeClient } from '../client/index.ts' +import { resolveRunConfig } from './_runconfig.ts' interface Opts { url: string auth?: string json: boolean assets: string[] + config?: string[] + configJson?: string } -export async function runMaterialise({ url, auth, json, assets }: Opts): Promise { +export async function runMaterialise({ + url, + auth, + json, + assets, + config, + configJson, +}: Opts): Promise { if (assets.length === 0) { process.stderr.write('Provide one or more asset names.\n') process.exit(2) } + const runConfig = resolveRunConfig({ config, configJson }) const client = makeClient({ url, auth }) - const runId = await launchAssetRun(client, assets) + const runId = await launchAssetRun(client, assets, runConfig) if (json) { process.stdout.write(`${JSON.stringify({ runId, assets }, null, 2)}\n`) return diff --git a/tests/runconfig.test.ts b/tests/runconfig.test.ts new file mode 100644 index 0000000..13abd53 --- /dev/null +++ b/tests/runconfig.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, test } from 'bun:test' +import { mkdtempSync, writeFileSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { resolveRunConfig } from '../src/commands/_runconfig.ts' + +const tmp = mkdtempSync(join(tmpdir(), 'colflow-runconfig-')) + +function write(name: string, body: string): string { + const p = join(tmp, name) + writeFileSync(p, body) + return p +} + +describe('resolveRunConfig', () => { + test('no flags returns empty object', () => { + expect(resolveRunConfig({})).toEqual({}) + }) + + test('inline JSON parsed', () => { + const cfg = resolveRunConfig({ + configJson: '{"resources":{"r":{"config":{"use_vision":true}}}}', + }) + expect(cfg).toEqual({ resources: { r: { config: { use_vision: true } } } }) + }) + + test('JSON file parsed by extension', () => { + const p = write('c.json', '{"ops":{"a":{}}}') + expect(resolveRunConfig({ config: [p] })).toEqual({ ops: { a: {} } }) + }) + + test('YAML file parsed', () => { + const p = write('c.yaml', 'resources:\n r:\n config:\n n: 3\n') + expect(resolveRunConfig({ config: [p] })).toEqual({ resources: { r: { config: { n: 3 } } } }) + }) + + test('multiple files shallow-merge left to right', () => { + const a = write('a.json', '{"x":1,"y":1}') + const b = write('b.json', '{"y":2,"z":3}') + expect(resolveRunConfig({ config: [a, b] })).toEqual({ x: 1, y: 2, z: 3 }) + }) + + test('inline JSON wins over file', () => { + const a = write('base.json', '{"x":1}') + expect(resolveRunConfig({ config: [a], configJson: '{"x":9}' })).toEqual({ x: 9 }) + }) + + test('missing file throws', () => { + expect(() => resolveRunConfig({ config: [join(tmp, 'nope.json')] })).toThrow(/cannot read/) + }) + + test('invalid inline JSON throws', () => { + expect(() => resolveRunConfig({ configJson: '{not json' })).toThrow(/invalid JSON/) + }) + + test('non-object inline JSON throws', () => { + expect(() => resolveRunConfig({ configJson: '[1,2]' })).toThrow(/must be a JSON object/) + }) + + test('non-mapping file throws', () => { + const p = write('arr.json', '[1,2,3]') + expect(() => resolveRunConfig({ config: [p] })).toThrow(/must contain a mapping/) + }) +})