Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ Bun + TypeScript + Ink TUI for Dagster collection-flow pipelines. TS port of the

## Available commands

`status`, `runs`, `run`, `logs`, `errors`, `tail`, `materialise`, `cancel`, `recheck`, `reload`, `stale`, `sensors`, `asset`, `graph`, `config`, `diff`, `inspect`, `sample`, `es-check`, `new-asset`, `start`, `debug`, `duckdb`. (23 commands.)
`status`, `runs`, `run`, `logs`, `errors`, `tail`, `materialise`, `launch`, `cancel`, `recheck`, `reload`, `stale`, `sensors`, `asset`, `graph`, `config`, `diff`, `inspect`, `sample`, `es-check`, `new-asset`, `start`, `debug`, `duckdb`. (24 commands.)

TUI menu items: Runs, Assets, Jobs, Sensors, Elasticsearch (gated on `ELASTICSEARCH_*`/`ELASTICO_*` env vars), DuckDB (spawns `colflow duckdb` in a new terminal), Reload Dagster.

`launch` and `materialise` accept run config like `dg`: `--config <path>` (JSON or YAML, repeatable, shallow-merged left to right) and `--config-json <inline>` (merged last, wins). Both resolve via `resolveRunConfig` in `src/commands/_runconfig.ts` and flow into the `runConfigData` field of `ExecutionParams` in `launchRun` / `launchAssetRun`. Without either flag the config is `{}`, so prior default-config behaviour is unchanged. Use it to set resource config, e.g. `colflow launch image_qc_pipeline --config-json '{"resources":{"colour_target_qc":{"config":{"use_vision":true}}}}'`.

## GraphQL schema notes

- `metadataEntries` is a union — use typed inline fragments (`IntMetadataEntry`, `TextMetadataEntry`, `PathMetadataEntry`, `JsonMetadataEntry`, `BoolMetadataEntry`, `FloatMetadataEntry`, `MarkdownMetadataEntry`, `UrlMetadataEntry`, `TableSchemaMetadataEntry`, `TableMetadataEntry`).
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ colflow status # Latest run summary
colflow runs --limit 10 # List recent runs
colflow logs <id> # Print run logs
colflow materialise asset1 asset2 # Launch a run
colflow launch <job> # Launch a job
colflow launch <job> --config-json '{"resources":{"r":{"config":{"k":true}}}}' # with run config
colflow launch <job> --config run.yaml # run config from a JSON/YAML file
colflow inspect output/data.parquet # Inspect Parquet schema
colflow es-check # Elasticsearch cluster + indices summary
colflow duckdb # Mount parquets and open duckdb --ui
Expand All @@ -100,7 +103,8 @@ When developing from source, replace `colflow` with `bun run dev` (e.g. `bun run
| `logs <id>` | Print run logs (--step, --level) |
| `errors <id>` | Failures for a run |
| `tail <id>` | Stream run events (--interval) |
| `materialise <name>...` | Launch a run for assets |
| `materialise <name>...` | Launch a run for assets (--config, --config-json) |
| `launch <job>` | Launch a run for a job (--config, --config-json) |
| `cancel <id>` | Cancel a run |
| `recheck <a:check>...` | Re-run asset checks |
| `reload` | Reload Dagster code location |
Expand All @@ -125,6 +129,8 @@ When developing from source, replace `colflow` with `bun run dev` (e.g. `bun run
| `--url <url>` | Dagster URL (env: DAGSTER_URL, default: http://localhost:3000) |
| `--auth <token>` | Dagster Cloud token (env: DAGSTER_AUTH) |
| `--json` | JSON output where supported |
| `--config <path>` | Run config file, JSON or YAML, for `launch` / `materialise`. Repeatable; files shallow-merge left to right. |
| `--config-json <json>` | Inline JSON run config for `launch` / `materialise`. Merged last, wins over `--config`. |

## Environment Variables

Expand Down
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"ink": "^7.0.3",
"meow": "^14.1.0",
"react": "^19.2.6",
"yaml": "^2.9.0",
"zod": "^4.4.3"
}
}
22 changes: 20 additions & 2 deletions src/cli.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const cli = meow(
--status <s> Filter (runs)
--step <key> Filter (logs)
--level <l> Filter (logs)
--config <path> Run config file, JSON or YAML (launch, materialise; repeatable, merged left to right)
--config-json <json> Inline JSON run config (launch, materialise; merged last, wins over --config)
--help Show this help
`,
{
Expand Down Expand Up @@ -99,6 +101,8 @@ const cli = meow(
run1: { type: 'string' },
run2: { type: 'string' },
detach: { type: 'boolean', default: false },
config: { type: 'string', isMultiple: true },
configJson: { type: 'string' },
},
},
)
Expand Down Expand Up @@ -153,10 +157,24 @@ async function main() {
await runErrors({ url, auth, json, id: needArg('id') })
return
case 'materialise':
await runMaterialise({ url, auth, json, assets: rest })
await runMaterialise({
url,
auth,
json,
assets: rest,
config: cli.flags.config,
configJson: cli.flags.configJson,
})
return
case 'launch':
await runLaunch({ url, auth, json, job: cli.flags.job ?? rest[0] ?? '' })
await runLaunch({
url,
auth,
json,
job: cli.flags.job ?? rest[0] ?? '',
config: cli.flags.config,
configJson: cli.flags.configJson,
})
return
case 'cancel':
await runCancel({ url, auth, json, id: needArg('id') })
Expand Down
15 changes: 12 additions & 3 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,11 @@ export async function fetchSensors(client: GraphQLClient): Promise<SensorState[]
return (data.sensorsOrError.results ?? []).map((r) => r.sensorState)
}

export async function launchAssetRun(client: GraphQLClient, assetNames: string[]): Promise<string> {
export async function launchAssetRun(
client: GraphQLClient,
assetNames: string[],
runConfigData: Record<string, unknown> = {},
): Promise<string> {
const repo = await getRepository(client)
const data = await client.request<{
launchRun: {
Expand All @@ -926,6 +930,7 @@ export async function launchAssetRun(client: GraphQLClient, assetNames: string[]
repositoryLocationName: repo.location.name,
},
stepKeys: assetNames,
runConfigData,
},
})
const r = data.launchRun
Expand All @@ -934,7 +939,11 @@ export async function launchAssetRun(client: GraphQLClient, assetNames: string[]
return r.run.runId
}

export async function launchRun(client: GraphQLClient, jobName: string): Promise<string> {
export async function launchRun(
client: GraphQLClient,
jobName: string,
runConfigData: Record<string, unknown> = {},
): Promise<string> {
const repo = await getRepository(client)
const data = await client.request<{
launchRun: {
Expand All @@ -951,7 +960,7 @@ export async function launchRun(client: GraphQLClient, jobName: string): Promise
repositoryName: repo.name,
repositoryLocationName: repo.location.name,
},
runConfigData: {},
runConfigData,
},
})
const r = data.launchRun
Expand Down
74 changes: 74 additions & 0 deletions src/commands/_runconfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { readFileSync } from 'node:fs'
import { extname } from 'node:path'
import { parse as parseYaml } from 'yaml'

export interface RunConfigFlags {
config?: string[]
configJson?: string
}

/**
* Resolve Dagster run config from CLI flags, matching `dg launch` semantics.
*
* `--config <path>` may be passed multiple times; files are parsed (JSON or
* YAML by extension) and shallow-merged left to right, so later files win.
* `--config-json <inline>` is parsed as JSON and merged last (highest
* precedence), for the common scripting case where a file is overkill.
*
* Returns `{}` when no config flags are present, preserving prior behaviour.
*/
export function resolveRunConfig(flags: RunConfigFlags): Record<string, unknown> {
const paths = flags.config ?? []
let merged: Record<string, unknown> = {}

for (const p of paths) {
let raw: string
try {
raw = readFileSync(p, 'utf8')
} catch (err) {
throw new Error(`--config: cannot read ${p}: ${(err as Error).message}`)
}
const parsed = parseConfigFile(p, raw)
merged = { ...merged, ...parsed }
}

if (flags.configJson != null) {
let parsed: unknown
try {
parsed = JSON.parse(flags.configJson)
} catch (err) {
throw new Error(`--config-json: invalid JSON: ${(err as Error).message}`)
}
if (!isObject(parsed)) throw new Error('--config-json: must be a JSON object')
merged = { ...merged, ...parsed }
}

return merged
}

function parseConfigFile(path: string, raw: string): Record<string, unknown> {
const ext = extname(path).toLowerCase()
let parsed: unknown
if (ext === '.json') {
try {
parsed = JSON.parse(raw)
} catch (err) {
throw new Error(`--config: invalid JSON in ${path}: ${(err as Error).message}`)
}
} else {
// .yaml / .yml / anything else: YAML is a JSON superset, so this also
// parses plain JSON files that lack a .json extension.
try {
parsed = parseYaml(raw)
} catch (err) {
throw new Error(`--config: invalid YAML in ${path}: ${(err as Error).message}`)
}
}
if (!isObject(parsed))
throw new Error(`--config: ${path} must contain a mapping at the top level`)
return parsed
}

function isObject(v: unknown): v is Record<string, unknown> {
return typeof v === 'object' && v !== null && !Array.isArray(v)
}
8 changes: 6 additions & 2 deletions src/commands/launch.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import { launchRun, makeClient } from '../client/index.ts'
import { resolveRunConfig } from './_runconfig.ts'

interface Opts {
url: string
auth?: string
json: boolean
job: string
config?: string[]
configJson?: string
}

export async function runLaunch({ url, auth, json, job }: Opts): Promise<void> {
export async function runLaunch({ url, auth, json, job, config, configJson }: Opts): Promise<void> {
if (!job) {
process.stderr.write('Provide a job name.\n')
process.exit(2)
}
const runConfig = resolveRunConfig({ config, configJson })
const client = makeClient({ url, auth })
const runId = await launchRun(client, job)
const runId = await launchRun(client, job, runConfig)
if (json) {
process.stdout.write(`${JSON.stringify({ runId, job }, null, 2)}\n`)
return
Expand Down
15 changes: 13 additions & 2 deletions src/commands/materialise.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
import { launchAssetRun, makeClient } from '../client/index.ts'
import { resolveRunConfig } from './_runconfig.ts'

interface Opts {
url: string
auth?: string
json: boolean
assets: string[]
config?: string[]
configJson?: string
}

export async function runMaterialise({ url, auth, json, assets }: Opts): Promise<void> {
export async function runMaterialise({
url,
auth,
json,
assets,
config,
configJson,
}: Opts): Promise<void> {
if (assets.length === 0) {
process.stderr.write('Provide one or more asset names.\n')
process.exit(2)
}
const runConfig = resolveRunConfig({ config, configJson })
const client = makeClient({ url, auth })
const runId = await launchAssetRun(client, assets)
const runId = await launchAssetRun(client, assets, runConfig)
if (json) {
process.stdout.write(`${JSON.stringify({ runId, assets }, null, 2)}\n`)
return
Expand Down
64 changes: 64 additions & 0 deletions tests/runconfig.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { describe, expect, test } from 'bun:test'
import { mkdtempSync, writeFileSync } from 'node:fs'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { resolveRunConfig } from '../src/commands/_runconfig.ts'

const tmp = mkdtempSync(join(tmpdir(), 'colflow-runconfig-'))

function write(name: string, body: string): string {
const p = join(tmp, name)
writeFileSync(p, body)
return p
}

describe('resolveRunConfig', () => {
test('no flags returns empty object', () => {
expect(resolveRunConfig({})).toEqual({})
})

test('inline JSON parsed', () => {
const cfg = resolveRunConfig({
configJson: '{"resources":{"r":{"config":{"use_vision":true}}}}',
})
expect(cfg).toEqual({ resources: { r: { config: { use_vision: true } } } })
})

test('JSON file parsed by extension', () => {
const p = write('c.json', '{"ops":{"a":{}}}')
expect(resolveRunConfig({ config: [p] })).toEqual({ ops: { a: {} } })
})

test('YAML file parsed', () => {
const p = write('c.yaml', 'resources:\n r:\n config:\n n: 3\n')
expect(resolveRunConfig({ config: [p] })).toEqual({ resources: { r: { config: { n: 3 } } } })
})

test('multiple files shallow-merge left to right', () => {
const a = write('a.json', '{"x":1,"y":1}')
const b = write('b.json', '{"y":2,"z":3}')
expect(resolveRunConfig({ config: [a, b] })).toEqual({ x: 1, y: 2, z: 3 })
})

test('inline JSON wins over file', () => {
const a = write('base.json', '{"x":1}')
expect(resolveRunConfig({ config: [a], configJson: '{"x":9}' })).toEqual({ x: 9 })
})

test('missing file throws', () => {
expect(() => resolveRunConfig({ config: [join(tmp, 'nope.json')] })).toThrow(/cannot read/)
})

test('invalid inline JSON throws', () => {
expect(() => resolveRunConfig({ configJson: '{not json' })).toThrow(/invalid JSON/)
})

test('non-object inline JSON throws', () => {
expect(() => resolveRunConfig({ configJson: '[1,2]' })).toThrow(/must be a JSON object/)
})

test('non-mapping file throws', () => {
const p = write('arr.json', '[1,2,3]')
expect(() => resolveRunConfig({ config: [p] })).toThrow(/must contain a mapping/)
})
})
Loading