From 87ae62bfd87acd2c16cdd48d1ac455b8dc41320e Mon Sep 17 00:00:00 2001 From: Luke Watson-Davies Date: Mon, 1 Jun 2026 09:29:29 +0100 Subject: [PATCH] Add run config passthrough to launch and materialise Both commands now accept run config like dg launch does: --config JSON or YAML file, repeatable, shallow-merged left to right --config-json inline JSON, merged last (wins over --config) Config resolves via resolveRunConfig (src/commands/_runconfig.ts) and flows into the runConfigData field of ExecutionParams in launchRun / launchAssetRun, which previously hardcoded {}. With no flag the config is {}, so default-config behaviour is unchanged. Motivating case: setting resource config on image_qc_pipeline (use_vision) without editing the resource default. colflow launch previously launched every job with default config only. Adds the yaml dependency for YAML config files and unit tests for the resolver (merge order, file/inline precedence, error paths). --- CLAUDE.md | 4 +- README.md | 8 +++- bun.lockb | Bin 29783 -> 30135 bytes package.json | 1 + src/cli.tsx | 22 ++++++++++- src/client/index.ts | 15 ++++++-- src/commands/_runconfig.ts | 74 ++++++++++++++++++++++++++++++++++++ src/commands/launch.ts | 8 +++- src/commands/materialise.ts | 15 +++++++- tests/runconfig.test.ts | 64 +++++++++++++++++++++++++++++++ 10 files changed, 200 insertions(+), 11 deletions(-) create mode 100644 src/commands/_runconfig.ts create mode 100644 tests/runconfig.test.ts diff --git a/CLAUDE.md b/CLAUDE.md index 67140f9..4822baf 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -41,10 +41,12 @@ Bun + TypeScript + Ink TUI for Dagster collection-flow pipelines. TS port of the ## Available commands -`status`, `runs`, `run`, `logs`, `errors`, `tail`, `materialise`, `cancel`, `recheck`, `reload`, `stale`, `sensors`, `asset`, `graph`, `config`, `diff`, `inspect`, `sample`, `es-check`, `new-asset`, `start`, `debug`, `duckdb`. (23 commands.) +`status`, `runs`, `run`, `logs`, `errors`, `tail`, `materialise`, `launch`, `cancel`, `recheck`, `reload`, `stale`, `sensors`, `asset`, `graph`, `config`, `diff`, `inspect`, `sample`, `es-check`, `new-asset`, `start`, `debug`, `duckdb`. (24 commands.) TUI menu items: Runs, Assets, Jobs, Sensors, Elasticsearch (gated on `ELASTICSEARCH_*`/`ELASTICO_*` env vars), DuckDB (spawns `colflow duckdb` in a new terminal), Reload Dagster. +`launch` and `materialise` accept run config like `dg`: `--config ` (JSON or YAML, repeatable, shallow-merged left to right) and `--config-json ` (merged last, wins). Both resolve via `resolveRunConfig` in `src/commands/_runconfig.ts` and flow into the `runConfigData` field of `ExecutionParams` in `launchRun` / `launchAssetRun`. Without either flag the config is `{}`, so prior default-config behaviour is unchanged. Use it to set resource config, e.g. `colflow launch image_qc_pipeline --config-json '{"resources":{"colour_target_qc":{"config":{"use_vision":true}}}}'`. + ## GraphQL schema notes - `metadataEntries` is a union — use typed inline fragments (`IntMetadataEntry`, `TextMetadataEntry`, `PathMetadataEntry`, `JsonMetadataEntry`, `BoolMetadataEntry`, `FloatMetadataEntry`, `MarkdownMetadataEntry`, `UrlMetadataEntry`, `TableSchemaMetadataEntry`, `TableMetadataEntry`). diff --git a/README.md b/README.md index e292d89..6388f34 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,9 @@ colflow status # Latest run summary colflow runs --limit 10 # List recent runs colflow logs # Print run logs colflow materialise asset1 asset2 # Launch a run +colflow launch # Launch a job +colflow launch --config-json '{"resources":{"r":{"config":{"k":true}}}}' # with run config +colflow launch --config run.yaml # run config from a JSON/YAML file colflow inspect output/data.parquet # Inspect Parquet schema colflow es-check # Elasticsearch cluster + indices summary colflow duckdb # Mount parquets and open duckdb --ui @@ -100,7 +103,8 @@ When developing from source, replace `colflow` with `bun run dev` (e.g. `bun run | `logs ` | Print run logs (--step, --level) | | `errors ` | Failures for a run | | `tail ` | Stream run events (--interval) | -| `materialise ...` | Launch a run for assets | +| `materialise ...` | Launch a run for assets (--config, --config-json) | +| `launch ` | Launch a run for a job (--config, --config-json) | | `cancel ` | Cancel a run | | `recheck ...` | Re-run asset checks | | `reload` | Reload Dagster code location | @@ -125,6 +129,8 @@ When developing from source, replace `colflow` with `bun run dev` (e.g. `bun run | `--url ` | Dagster URL (env: DAGSTER_URL, default: http://localhost:3000) | | `--auth ` | Dagster Cloud token (env: DAGSTER_AUTH) | | `--json` | JSON output where supported | +| `--config ` | Run config file, JSON or YAML, for `launch` / `materialise`. Repeatable; files shallow-merge left to right. | +| `--config-json ` | Inline JSON run config for `launch` / `materialise`. Merged last, wins over `--config`. | ## Environment Variables diff --git a/bun.lockb b/bun.lockb index bdb1df27f7aee8d7400229365f3029b8c6b46ae0..c354b834a8588740c24eb1157ce8439b35c20033 100755 GIT binary patch delta 4923 zcmeHLeQ;FO6@T|-ce8JkO|p=LT{a9E))}~&E%Km1lpJxkb?>I48*nsdeRV(k0;PoLqPd7 zL6W9HGl2$-ez{CXI}!s*8>Y<^WSVGSvR^)+qtV8I{DF?zAqw?$$QV$5fSFVe6+uIa zUpWQFRtzK({fY^*D9X5ct$|)v0?Iweyh*FTnI{d@VhYHnM0&^+&@aYps;38%Ym_5+ zV=aYS7T0M@QczxCq~9e4WHOQ|IbeCkC`q%R5etFj&|)x3p9R#el6IWQy6EL*_J z!4Lv13EE(+P>v#Zl4fmdl3)Kfu*tN1t{~X~guI%$2kO`dKpVNDC-uk;!or zggQzl*3TR;iTcv~mJTpX->*Cnng>rIms=5YSbTz+qL`?4Ev8Zx!I78{V{s6G`KfZt+&brX=F+tuk1h` z_g5x|(XT%VR;tB54eNpYt|2evE#Z}9~0OQ`7ajQl@UC9_cO1bEGr#H0t36HA^_2)dTOE?K_}!1|IZ0M^4$X$g{4 zyAol7U}%XY>tNk!5^>Pd*Cm%@8f%Fqx8VmkzY5?~r9GpP%csZqH^^vrxquDS#1vU_ z#hC!-W724xH$_4Jw&zfCmeIl`SS?y&DU&m6l;#lj9WAkBI~aQ{vE*6}0OvOXoLF*@ z7BWF%$!n<3{zSp@y%{iy)&L&;--TB`7Q|I5`{p{G+KAio_O{10uTlYVD zXln7}T`LNY6}2zB;J#b=>BJu&Zr_u)QvX8D@aFm-zJ1L1>WlZdUj6)L^hik8Qm;WK zkZo*;PL4Ixonuv@r91g^g-sbFls6 zR56OKfE_F_(}MA;u+hHpAzD;uCVhb_?9@;YB4d%6o&w7zQ5d49!8!|7kwXJu;R)z< zkt!V2Rum%ZMD!Xgk1P}5AK0b|su)Knz-}ux)3}MMD4-1!;h)n?uY(nlqZs~yZ7WvA zL>dCy?t*_#RXAy@6aJOJKd=(=xZodHze^P!x(K$@4gX41QA&L!@XrJPz@DFS%iBk9n)>*0wKMjC|eP+t`siK;? zeDJT_OsBzW$W|T_GiU>zGwCHf1LUX(i6CvkvzCVNoJB>GLSi;;#Z#qoc-E1p@>V*d z*QN61pEsBGu5Q=5)jfj)Q#>*k4DOwBL3^R&(~QQ=xH954(@O(W&J6DLFHWI)y~k7) z`=kiZBg5QE*R|`>#O73h=RGbLT8i_Xu_`C)>6Ll8bl=<=ojAdzcAyPd0jvZ%fKFgJ zz}qQr!ybTlI2XX%bqH7lGy}Yy^VZMS>i}FDwCxf{JAcwh=`?j-aBczKiU4fd+D6Zv zng#HoGaa}I;M1}a;1h8vzy}eZq$hX{WZ_ckUR)UK)i$8FdtC3_DZoayNq#{~!iXsbb;thJVu~b~3 zw;FBs==Y=4E8X?{!IOcFzlUy_%gt6krqm`|dHlO`n4XyP*0wJaZj{HrOe;;HM_#Xf z;5te?t}>s?BW1|6w8@*5iwS}u?5%sz9vPnf_8=6zQPU+d9ci-KVai!#v&X;2 zpLx8$_~AWOjVNcYFm_w1ZGlbfpgZvFr~U<{Vh_EvV3qutO!u~Uu|28?P$ivjv5|Y> z=qR^WVoWfJde8XMo0{MLS7N%DHq{4vUMU(-tO)qc z^&>0(li|pE&#%}1y<-p5Fdp1MSWkPP=BHz!Qn}tlAB3#-Zx65Lk6e&MlNWih55Lot zNp~;Gw#R=Je7bYjq42(HD(cm@Vd%e9xTftTGnbSkN-JH{rDI6=S=(K-W%ode+$nYsr6@t zCOvqg#FkD^z;yg?gx~p?arK*HKDwb0q>HUK(N15s+DtonD@HoIgvKnM&6_6@g%`U= z=VPzn_4lJmzEk41lxs%bjFD0|4L-TJKxpSyiL->4w-XDR209&s=a!J5HQj5-xxAFF ZbkC%oWzW#2PW#~U?3azHi2Xp@_~?;umr**#*GP>L?BkM z#Y9IsIGGA~T8k*s^r6@~3dUlx%kY3p0QC;-g|j*>$nR z^Q+e_-j&a9QDj-NCz#Ry>C!L=|JoE~;MU&jCD*@|E-Cf;X?S&mZhal&H+B@5BVUBHD2;HHr(+5@w_Cv3iACtgUKWE-0XL@i zIe5y(Ga~ZMSB3SCfOGq0aMnjR>AVNVrz?tQAQhD~2qs|74*oWH3iwHI2l!ENp3b3& ze2~V~)O(`r!>*_iVSh3I$;0h)KAB-LE#5wIcjfP5oyq-+7GL|V;d3UFprYtn;h=5N z0Z~t*(E)XvN^e;F>XRxJ#Q3#$urTvsIf}+LpPCm%$I#e~w743Lu{37&sZT^vz2;Xx zj-q?OicEA`^J_biDa+ACkcjhXr-0Z>G`VAZS_+aRj99(NOrOO1wY_MplnQ9nUIAJF zWFnizXG=l0umXGu7pjRaHFu(QIi0XB)J9NaU5)+G;w*}?0?16aNBh(^3tfo!s|PGp zVfEWiL$(+)!%!yKv{E%YhF-Gz&7l}YDU$~GqsGIV*`Vz&Kvh6U5m}scdrYa8juo#) zt4R*p4O9=LvcuX_5m~g9{T+zi3-_!+X4D$UwgadH2=U4yXhV0u^42^H0kta%U=vo$X2}i_$LZddHtrrLh{O>5x2T_co zl}8dX(#t>|)`%3@N1bD(4O9GD2R5W1LJN(h_|y|t`eces#|Roi+molQ6tzD8|}0BMJ2VQ2E;dM z2*0EBHh#_Iwg<#A+9rQT?E&*6*mNssyM3W~0=4C`wrr}REa95CpT->l^Iw6lk@#xb zG%X+o>FBhD{cw^`FdwrgBp3eheoAGkUeqL}c%vsK73zyGwwuhO! zN-C5}v7$&$C#Glgvl!b}md|oFhy0f1t2pap!DM01detCquK_V$7{SN}Sw6!H{saS=DZy@jja2x5Cde86!i@OJOps~$wKIW6Gmg{Q znL3@CX{D-6U6|;u%pmQ|w9?EkcLbdU|Xap!HUl(@TlpmzdJS+VHXc}b}1j(Lnr5y#jNT)HN7l4ZA=wb%lJ|{?9 z3#@b&XePPm1}P7>!uRItB8$d>P65>w>S7k{DMWm85g$-4)i@DfA>woDB9G1kodar~ zr;7p_nuqwDh|i^qxzyl7eDe?=kduTP@d1V0x^U59po1>N=h20S+B}HQjrf3y$mT_S z9>nL>MG1`pjR55o>7tA_6(K$^;sdIn%woh>g!qbeod4rkhZlhAunzNSPZr`Uw$kg6 zRa1Tm;w!PzcS>}zfL;YU1yoh4i(0y?G$`t59KU`lFAItQ?ZNLNdK*Y@ScwMIb)6aGT;PQTPj1nMzg51NC{RwSxG(#g_nI`ARj2FActCID{e)?jU6i zh%W`czxYy^4f=2yeo%Hc!11hpZuWur2*rp#EZb~_@a|HH7|_ea5d_@APYczoz~O##?*vb z)J8ZXP?B5e!N%O-7aG4Uij9|KVac2C25S39Rk60nS>(jBY{k?gn1RDTYW(r$Hz^2- z%jtE(#oe^O$sryf+e(Lcn8upCX~t{IU3uRX^~84-X~pG4NJlB7*->J=+TQESyS02U zXDQ?*P8Zwa2Og$!;7xjL>1#VKM@2Ltc}-AHvt2zaXlJuWy&&ip&Gt0o-B|L_^E>MH z*FF?>ioHst51Jh~>n2)KW?*`Z2WZVYU%>RU`#(U(^6-d-RJFo^L*sCZ1KWHP>J2ow z!jWb?am}ayQP+Lw#Gy$aL-gzlujrwRD{e?LUex6u^~J|+qdn;5b$Xmg;bz(ubfm3A z;$me7P8KIOwfy;#C0Wd`D#pMbB|Kkp+;rpJJw0nY`3msVPP^To(vDHh`jraMchhA5{G278O8Psz&Y7KhCm#qzAN_5*eskeb(4i2Z#zb%ZQkKKuZv5d%pceMZj diff --git a/package.json b/package.json index 8acc00d..a3d3522 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "ink": "^7.0.3", "meow": "^14.1.0", "react": "^19.2.6", + "yaml": "^2.9.0", "zod": "^4.4.3" } } diff --git a/src/cli.tsx b/src/cli.tsx index ff9cc3f..a1c0892 100644 --- a/src/cli.tsx +++ b/src/cli.tsx @@ -70,6 +70,8 @@ const cli = meow( --status Filter (runs) --step Filter (logs) --level Filter (logs) + --config Run config file, JSON or YAML (launch, materialise; repeatable, merged left to right) + --config-json Inline JSON run config (launch, materialise; merged last, wins over --config) --help Show this help `, { @@ -99,6 +101,8 @@ const cli = meow( run1: { type: 'string' }, run2: { type: 'string' }, detach: { type: 'boolean', default: false }, + config: { type: 'string', isMultiple: true }, + configJson: { type: 'string' }, }, }, ) @@ -153,10 +157,24 @@ async function main() { await runErrors({ url, auth, json, id: needArg('id') }) return case 'materialise': - await runMaterialise({ url, auth, json, assets: rest }) + await runMaterialise({ + url, + auth, + json, + assets: rest, + config: cli.flags.config, + configJson: cli.flags.configJson, + }) return case 'launch': - await runLaunch({ url, auth, json, job: cli.flags.job ?? rest[0] ?? '' }) + await runLaunch({ + url, + auth, + json, + job: cli.flags.job ?? rest[0] ?? '', + config: cli.flags.config, + configJson: cli.flags.configJson, + }) return case 'cancel': await runCancel({ url, auth, json, id: needArg('id') }) diff --git a/src/client/index.ts b/src/client/index.ts index 37cc4a7..f1489ea 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -910,7 +910,11 @@ export async function fetchSensors(client: GraphQLClient): Promise r.sensorState) } -export async function launchAssetRun(client: GraphQLClient, assetNames: string[]): Promise { +export async function launchAssetRun( + client: GraphQLClient, + assetNames: string[], + runConfigData: Record = {}, +): Promise { const repo = await getRepository(client) const data = await client.request<{ launchRun: { @@ -926,6 +930,7 @@ export async function launchAssetRun(client: GraphQLClient, assetNames: string[] repositoryLocationName: repo.location.name, }, stepKeys: assetNames, + runConfigData, }, }) const r = data.launchRun @@ -934,7 +939,11 @@ export async function launchAssetRun(client: GraphQLClient, assetNames: string[] return r.run.runId } -export async function launchRun(client: GraphQLClient, jobName: string): Promise { +export async function launchRun( + client: GraphQLClient, + jobName: string, + runConfigData: Record = {}, +): Promise { const repo = await getRepository(client) const data = await client.request<{ launchRun: { @@ -951,7 +960,7 @@ export async function launchRun(client: GraphQLClient, jobName: string): Promise repositoryName: repo.name, repositoryLocationName: repo.location.name, }, - runConfigData: {}, + runConfigData, }, }) const r = data.launchRun diff --git a/src/commands/_runconfig.ts b/src/commands/_runconfig.ts new file mode 100644 index 0000000..abb0ba9 --- /dev/null +++ b/src/commands/_runconfig.ts @@ -0,0 +1,74 @@ +import { readFileSync } from 'node:fs' +import { extname } from 'node:path' +import { parse as parseYaml } from 'yaml' + +export interface RunConfigFlags { + config?: string[] + configJson?: string +} + +/** + * Resolve Dagster run config from CLI flags, matching `dg launch` semantics. + * + * `--config ` may be passed multiple times; files are parsed (JSON or + * YAML by extension) and shallow-merged left to right, so later files win. + * `--config-json ` is parsed as JSON and merged last (highest + * precedence), for the common scripting case where a file is overkill. + * + * Returns `{}` when no config flags are present, preserving prior behaviour. + */ +export function resolveRunConfig(flags: RunConfigFlags): Record { + const paths = flags.config ?? [] + let merged: Record = {} + + for (const p of paths) { + let raw: string + try { + raw = readFileSync(p, 'utf8') + } catch (err) { + throw new Error(`--config: cannot read ${p}: ${(err as Error).message}`) + } + const parsed = parseConfigFile(p, raw) + merged = { ...merged, ...parsed } + } + + if (flags.configJson != null) { + let parsed: unknown + try { + parsed = JSON.parse(flags.configJson) + } catch (err) { + throw new Error(`--config-json: invalid JSON: ${(err as Error).message}`) + } + if (!isObject(parsed)) throw new Error('--config-json: must be a JSON object') + merged = { ...merged, ...parsed } + } + + return merged +} + +function parseConfigFile(path: string, raw: string): Record { + const ext = extname(path).toLowerCase() + let parsed: unknown + if (ext === '.json') { + try { + parsed = JSON.parse(raw) + } catch (err) { + throw new Error(`--config: invalid JSON in ${path}: ${(err as Error).message}`) + } + } else { + // .yaml / .yml / anything else: YAML is a JSON superset, so this also + // parses plain JSON files that lack a .json extension. + try { + parsed = parseYaml(raw) + } catch (err) { + throw new Error(`--config: invalid YAML in ${path}: ${(err as Error).message}`) + } + } + if (!isObject(parsed)) + throw new Error(`--config: ${path} must contain a mapping at the top level`) + return parsed +} + +function isObject(v: unknown): v is Record { + return typeof v === 'object' && v !== null && !Array.isArray(v) +} diff --git a/src/commands/launch.ts b/src/commands/launch.ts index 7f25238..0c8d2d3 100644 --- a/src/commands/launch.ts +++ b/src/commands/launch.ts @@ -1,19 +1,23 @@ import { launchRun, makeClient } from '../client/index.ts' +import { resolveRunConfig } from './_runconfig.ts' interface Opts { url: string auth?: string json: boolean job: string + config?: string[] + configJson?: string } -export async function runLaunch({ url, auth, json, job }: Opts): Promise { +export async function runLaunch({ url, auth, json, job, config, configJson }: Opts): Promise { if (!job) { process.stderr.write('Provide a job name.\n') process.exit(2) } + const runConfig = resolveRunConfig({ config, configJson }) const client = makeClient({ url, auth }) - const runId = await launchRun(client, job) + const runId = await launchRun(client, job, runConfig) if (json) { process.stdout.write(`${JSON.stringify({ runId, job }, null, 2)}\n`) return diff --git a/src/commands/materialise.ts b/src/commands/materialise.ts index 50ffd8c..69f49a4 100644 --- a/src/commands/materialise.ts +++ b/src/commands/materialise.ts @@ -1,19 +1,30 @@ import { launchAssetRun, makeClient } from '../client/index.ts' +import { resolveRunConfig } from './_runconfig.ts' interface Opts { url: string auth?: string json: boolean assets: string[] + config?: string[] + configJson?: string } -export async function runMaterialise({ url, auth, json, assets }: Opts): Promise { +export async function runMaterialise({ + url, + auth, + json, + assets, + config, + configJson, +}: Opts): Promise { if (assets.length === 0) { process.stderr.write('Provide one or more asset names.\n') process.exit(2) } + const runConfig = resolveRunConfig({ config, configJson }) const client = makeClient({ url, auth }) - const runId = await launchAssetRun(client, assets) + const runId = await launchAssetRun(client, assets, runConfig) if (json) { process.stdout.write(`${JSON.stringify({ runId, assets }, null, 2)}\n`) return diff --git a/tests/runconfig.test.ts b/tests/runconfig.test.ts new file mode 100644 index 0000000..13abd53 --- /dev/null +++ b/tests/runconfig.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, test } from 'bun:test' +import { mkdtempSync, writeFileSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { resolveRunConfig } from '../src/commands/_runconfig.ts' + +const tmp = mkdtempSync(join(tmpdir(), 'colflow-runconfig-')) + +function write(name: string, body: string): string { + const p = join(tmp, name) + writeFileSync(p, body) + return p +} + +describe('resolveRunConfig', () => { + test('no flags returns empty object', () => { + expect(resolveRunConfig({})).toEqual({}) + }) + + test('inline JSON parsed', () => { + const cfg = resolveRunConfig({ + configJson: '{"resources":{"r":{"config":{"use_vision":true}}}}', + }) + expect(cfg).toEqual({ resources: { r: { config: { use_vision: true } } } }) + }) + + test('JSON file parsed by extension', () => { + const p = write('c.json', '{"ops":{"a":{}}}') + expect(resolveRunConfig({ config: [p] })).toEqual({ ops: { a: {} } }) + }) + + test('YAML file parsed', () => { + const p = write('c.yaml', 'resources:\n r:\n config:\n n: 3\n') + expect(resolveRunConfig({ config: [p] })).toEqual({ resources: { r: { config: { n: 3 } } } }) + }) + + test('multiple files shallow-merge left to right', () => { + const a = write('a.json', '{"x":1,"y":1}') + const b = write('b.json', '{"y":2,"z":3}') + expect(resolveRunConfig({ config: [a, b] })).toEqual({ x: 1, y: 2, z: 3 }) + }) + + test('inline JSON wins over file', () => { + const a = write('base.json', '{"x":1}') + expect(resolveRunConfig({ config: [a], configJson: '{"x":9}' })).toEqual({ x: 9 }) + }) + + test('missing file throws', () => { + expect(() => resolveRunConfig({ config: [join(tmp, 'nope.json')] })).toThrow(/cannot read/) + }) + + test('invalid inline JSON throws', () => { + expect(() => resolveRunConfig({ configJson: '{not json' })).toThrow(/invalid JSON/) + }) + + test('non-object inline JSON throws', () => { + expect(() => resolveRunConfig({ configJson: '[1,2]' })).toThrow(/must be a JSON object/) + }) + + test('non-mapping file throws', () => { + const p = write('arr.json', '[1,2,3]') + expect(() => resolveRunConfig({ config: [p] })).toThrow(/must contain a mapping/) + }) +})