diff --git a/.gitignore b/.gitignore index b10c1bb043..4dfe62f9ee 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,12 @@ target # Commit message scratch files .github/meta/ +# Local connections config (may contain credentials) +.altimate-code/ + +# Pre-built native binaries (platform-specific, not for source control) +packages/opencode/*.node + # Local dev files opencode-dev logs/ diff --git a/.opencode/skills/data-parity/SKILL.md b/.opencode/skills/data-parity/SKILL.md new file mode 100644 index 0000000000..39afa6b616 --- /dev/null +++ b/.opencode/skills/data-parity/SKILL.md @@ -0,0 +1,341 @@ +--- +name: data-parity +description: Validate that two tables or query results are identical — or diagnose exactly how they differ. Discover schema, identify keys, profile cheaply, then diff. Use for migration validation, ETL regression, and query refactor verification. +--- + +# Data Parity (Table Diff) + +## CRITICAL: Always Start With a Plan + +**Before doing anything else**, generate a numbered TODO list for the user: + +``` +Here's my plan: +1. [ ] List available warehouse connections +2. [ ] Inspect schema, discover primary key candidates, and detect auto-timestamp columns +3. [ ] Confirm primary keys with you +4. [ ] Confirm which auto-timestamp columns to exclude +5. [ ] Check row counts on both sides +6. [ ] Run column-level profile (cheap — no row scan) +7. [ ] Ask whether to proceed with row-level diff (may be expensive for large tables) +8. [ ] Run targeted row-level diff on diverging columns only +9. [ ] Report findings +``` + +Update each item to `[x]` as you complete it. This plan should be visible before any tool is called. + +--- + +## CRITICAL: Use `data_diff` Tool — Never Write Manual Diff SQL + +**NEVER** write SQL to diff tables manually (e.g., `EXCEPT`, `FULL OUTER JOIN`, `MINUS`). +**ALWAYS** use the `data_diff` tool for any comparison operation. + +`sql_query` is only for: +- Schema inspection (`information_schema`, `SHOW COLUMNS`, `DESCRIBE`) +- Cardinality checks to identify keys +- Row count estimates + +Everything else — profile, row diff, value comparison — goes through `data_diff`. + +--- + +## Step 1: List Connections + +Use `warehouse_list` to show the user what connections are available and which warehouses map to source and target. + +--- + +## Step 2: Inspect Schema, Discover Primary Keys, and Detect Auto-Timestamp Columns + +Use `sql_query` to get columns, defaults, and identify key candidates: + +```sql +-- Postgres / Redshift / DuckDB +SELECT column_name, data_type, is_nullable, column_default +FROM information_schema.columns +WHERE table_schema = 'public' AND table_name = 'orders' +ORDER BY ordinal_position +``` + +```sql +-- Snowflake +SHOW COLUMNS IN TABLE orders +``` + +```sql +-- MySQL / MariaDB (also fetch EXTRA for ON UPDATE detection) +SELECT column_name, data_type, is_nullable, column_default, extra +FROM information_schema.columns +WHERE table_schema = 'mydb' AND table_name = 'orders' +ORDER BY ordinal_position +``` + +```sql +-- ClickHouse +DESCRIBE TABLE source_db.events +``` + +**Look for:** columns named `id`, `*_id`, `*_key`, `uuid`, or with `NOT NULL` + unique index. + +**Also look for auto-timestamp columns** — any column whose `column_default` contains a time-generating function: +- PostgreSQL/DuckDB/Redshift: `now()`, `CURRENT_TIMESTAMP`, `clock_timestamp()` +- MySQL/MariaDB: `CURRENT_TIMESTAMP` (in default or EXTRA) +- Snowflake: `CURRENT_TIMESTAMP()`, `SYSDATE()` +- SQL Server: `getdate()`, `sysdatetime()` +- Oracle: `SYSDATE`, `SYSTIMESTAMP` + +These columns auto-generate values on INSERT, so they inherently differ between source and target due to write timing — not because of actual data discrepancies. **Collect them for confirmation in Step 4.** + +If no obvious PK, run a cardinality check: + +```sql +SELECT + COUNT(*) AS total_rows, + COUNT(DISTINCT order_id) AS distinct_order_id, + COUNT(DISTINCT customer_id) AS distinct_customer_id +FROM orders +``` + +A valid key column: `distinct_count = total_rows`. + +For composite keys: +```sql +SELECT order_id, line_item_id, COUNT(*) AS cnt +FROM order_lines +GROUP BY order_id, line_item_id +HAVING COUNT(*) > 1 +LIMIT 5 +``` +If this returns 0 rows, `(order_id, line_item_id)` is a valid composite key. + +## Step 3: Confirm Keys With the User + +**Always confirm** the identified key columns before proceeding: + +> "I identified `order_id` as the primary key (150,000 distinct values = 150,000 rows, no NULLs). Does that look right, or should I use a different column?" + +Do not proceed to diff until the user confirms or corrects. + +--- + +## Step 4: Confirm Auto-Timestamp Column Exclusions + +If you detected any columns with auto-generating timestamp defaults in Step 2, **present them to the user and ask for confirmation** before excluding them. + +**Example prompt when auto-timestamp columns are found:** + +> "I found **3 columns** with auto-generating timestamp defaults that will inherently differ between source and target (due to when each row was written, not actual data differences): +> +> | Column | Default | Reason to exclude | +> |--------|---------|-------------------| +> | `created_at` | `DEFAULT now()` | Set on insert — reflects when this copy was written | +> | `updated_at` | `DEFAULT now()` | Set on insert — reflects when this copy was written | +> | `_loaded_at` | `DEFAULT CURRENT_TIMESTAMP` | ETL load timestamp | +> +> Should I **exclude** these from the comparison? Or do you want to include any of them (e.g., if you're verifying that `created_at` was preserved during migration)?" + +**If user confirms exclusion:** Omit those columns from `extra_columns` when calling `data_diff`. + +**If user wants to include some:** Add them explicitly to `extra_columns`. + +**If no auto-timestamp columns were detected:** Skip this step and proceed to Step 5. + +> **Why ask?** In migration validation, `created_at` should often be *identical* between source and target (it was migrated, not regenerated). But in ETL replication, `created_at` is freshly generated on each side and *should* differ. Only the user knows which case applies. + +--- + +## Step 5: Check Row Counts + +```sql +SELECT COUNT(*) FROM orders -- run on both source and target +``` + +Use counts to: +- Detect load completeness issues before row-level diff +- Choose the algorithm and decide whether to ask about cost +- If counts differ significantly (>5%), flag it immediately + +--- + +## Step 6: Column-Level Profile (Always Run This First) + +Profile is cheap — it runs aggregates, not row scans. **Always run profile before row-level diff.** + +``` +data_diff( + source="orders", + target="orders", + key_columns=["order_id"], + source_warehouse="postgres_prod", + target_warehouse="snowflake_dw", + algorithm="profile" +) +``` + +Profile tells you: +- Row count on each side +- Which columns have null count differences → NULL handling bug +- Min/max divergence per column → value transformation bug +- Which columns match exactly → safe to skip in row-level diff + +**Example output:** +``` +Column Profile Comparison + + ✓ order_id: match + ✓ customer_id: match + ✗ amount: DIFFER ← source min=10.00, target min=10.01 — rounding? + ✗ status: DIFFER ← source nulls=0, target nulls=47 — NULL mapping bug? + ✓ created_at: match +``` + +--- + +## Step 7: Ask Before Running Row-Level Diff on Large Tables + +After profiling, check row count and **ask the user** before proceeding: + +**If table has < 100K rows:** proceed automatically. + +**If table has 100K–10M rows:** +> "The table has 1.2M rows. Row-level diff will scan all rows on both sides — this may take 30–60 seconds and consume warehouse compute. Do you want to proceed? You can also provide a `where_clause` to limit the scope (e.g., `created_at >= '2024-01-01'`)." + +**If table has > 10M rows:** +> "The table has 50M rows. Full row-level diff could be expensive. Options: +> 1. Diff a recent window only (e.g., last 30 days) +> 2. Partition by a date/key column — shows which partition has problems without scanning everything +> 3. Proceed with full diff (may take several minutes) +> Which would you prefer?" + +--- + +## Step 8: Run Targeted Row-Level Diff + +Use only the columns that the profile said differ. This is faster and produces cleaner output. + +``` +data_diff( + source="orders", + target="orders", + key_columns=["order_id"], + extra_columns=["amount", "status"], // only diverging columns from profile + source_warehouse="postgres_prod", + target_warehouse="snowflake_dw", + algorithm="hashdiff" +) +``` + +### For large tables — use partition_column + +Split the table into groups and diff each independently. Three modes: + +``` +// Date column — partition by month +data_diff(source="lineitem", target="lineitem", + key_columns=["l_orderkey", "l_linenumber"], + source_warehouse="pg_source", target_warehouse="pg_target", + partition_column="l_shipdate", partition_granularity="month", + algorithm="hashdiff") + +// Numeric column — partition by key ranges of 100K +data_diff(source="orders", target="orders", + key_columns=["o_orderkey"], + source_warehouse="pg_source", target_warehouse="pg_target", + partition_column="o_orderkey", partition_bucket_size=100000, + algorithm="hashdiff") + +// Categorical column — partition by distinct values (string, enum, boolean) +data_diff(source="orders", target="orders", + key_columns=["o_orderkey"], + source_warehouse="pg_source", target_warehouse="pg_target", + partition_column="o_orderstatus", + algorithm="hashdiff") +``` + +Output includes aggregate diff + per-partition breakdown showing which group has problems. + +--- + +## Algorithm Selection + +| Algorithm | When to use | +|-----------|-------------| +| `profile` | **Always run first** — column stats (count, min, max, nulls). No row scan. | +| `joindiff` | Same database — single FULL OUTER JOIN. Fast, exact. | +| `hashdiff` | Cross-database or large tables — bisection with checksums. Scales to billions. | +| `cascade` | Auto-escalate: profile → hashdiff on diverging columns. | +| `auto` | JoinDiff if same warehouse, HashDiff if cross-database. | + +> **CRITICAL:** If `source_warehouse` ≠ `target_warehouse`, **never use `joindiff`** — it only sees one connection and always reports 0 differences. Use `hashdiff` or `auto`. + +--- + +## Output Interpretation + +### IDENTICAL +``` +✓ Tables are IDENTICAL + Rows checked: 1,000,000 +``` + +### DIFFER +``` +✗ Tables DIFFER + + Source rows: 150,000 + Target rows: 149,950 + Only in source: 50 → rows deleted in target (ETL missed deletes) + Only in target: 0 + Updated rows: 0 + Identical rows: 149,950 +``` + +| Pattern | Root cause | +|---------|-----------| +| `only_in_source > 0`, target = 0 | ETL dropped rows — check filters, incremental logic | +| `only_in_target > 0`, source = 0 | Target has extra rows — dedup issue or wrong join | +| `updated_rows > 0`, counts match | Silent value corruption — check type casts, rounding | +| Row counts differ significantly | Load completeness — check ETL watermarks | + +--- + +## CRITICAL: `extra_columns` Behavior + +The Rust engine **only compares columns listed in `extra_columns`**. If the list is empty, it compares key existence only — rows that match on key but differ in values will be silently reported as "identical". This is the most common source of false positives. + +**Auto-discovery (default for table names):** When `extra_columns` is omitted and the source is a plain table name, `data_diff` auto-discovers all non-key columns from the database catalog and excludes columns using two detection layers: + +1. **Name-pattern matching** — columns named like `updated_at`, `created_at`, `inserted_at`, `modified_at`, `publisher_last_updated_epoch_ms`, ETL metadata columns like `_fivetran_synced`, `_airbyte_extracted_at`, etc. +2. **Schema-level default detection** — columns with auto-generating timestamp defaults (`DEFAULT NOW()`, `DEFAULT CURRENT_TIMESTAMP`, `GETDATE()`, `SYSDATE()`, `SYSTIMESTAMP`, etc.), detected directly from the database catalog. This catches columns that don't follow naming conventions but still auto-generate values on INSERT. Works across PostgreSQL, MySQL, Snowflake, SQL Server, Oracle, ClickHouse, DuckDB, SQLite, and Redshift. + +The output lists which columns were auto-excluded and why. + +**SQL queries:** When source is a SQL query (not a table name), auto-discovery cannot work. You **must** provide `extra_columns` explicitly. If you don't, only key-level matching occurs. + +**When to override auto-exclusion:** If the user specifically wants to compare audit columns (e.g., verifying that `created_at` was preserved during migration), pass those columns explicitly in `extra_columns`. + +--- + +## Common Mistakes + +**Writing manual diff SQL instead of calling data_diff** +→ Never use EXCEPT, MINUS, or FULL OUTER JOIN to diff tables. Use `data_diff`. + +**Calling data_diff without confirming the key** +→ Confirm cardinality with the user first. A bad key gives meaningless results. + +**Using joindiff for cross-database tables** +→ JoinDiff can't see the remote table. Always returns 0 diffs. Use `hashdiff` or `auto`. + +**Skipping the profile step and jumping to full row diff** +→ Profile is free. It tells you which columns actually differ so you avoid scanning everything. + +**Running full diff on a billion-row table without asking** +→ Always ask the user before expensive operations. Offer filtering and partition options. + +**Omitting extra_columns when source is a SQL query** +→ Auto-discovery only works for table names. For SQL queries, always list the columns to compare explicitly. + +**Silently excluding auto-timestamp columns without asking the user** +→ Always present detected auto-timestamp columns (Step 4) and get explicit confirmation. In migration scenarios, `created_at` should be *identical* — excluding it silently hides real bugs. diff --git a/packages/drivers/src/bigquery.ts b/packages/drivers/src/bigquery.ts index f14e3b681d..abc7a8f05f 100644 --- a/packages/drivers/src/bigquery.ts +++ b/packages/drivers/src/bigquery.ts @@ -2,7 +2,7 @@ * BigQuery driver using the `@google-cloud/bigquery` package. */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let BigQueryModule: any @@ -37,8 +37,8 @@ export async function connect(config: ConnectionConfig): Promise { client = new BigQuery(options) }, - async execute(sql: string, limit?: number, binds?: any[]): Promise { - const effectiveLimit = limit ?? 1000 + async execute(sql: string, limit?: number, binds?: any[], execOptions?: ExecuteOptions): Promise { + const effectiveLimit = execOptions?.noLimit ? 0 : (limit ?? 1000) const query = sql.replace(/;\s*$/, "") const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql) @@ -58,7 +58,7 @@ export async function connect(config: ConnectionConfig): Promise { const [rows] = await client.query(options) const columns = rows.length > 0 ? Object.keys(rows[0]) : [] - const truncated = rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && rows.length > effectiveLimit const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows return { diff --git a/packages/drivers/src/clickhouse.ts b/packages/drivers/src/clickhouse.ts new file mode 100644 index 0000000000..ac0868043e --- /dev/null +++ b/packages/drivers/src/clickhouse.ts @@ -0,0 +1,135 @@ +/** + * ClickHouse driver using the HTTP API (no external package required). + * + * Uses ClickHouse's native HTTP interface on port 8123 with JSONCompactEachRow + * format for efficient row streaming. + */ + +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" + +export async function connect(config: ConnectionConfig): Promise { + const host = (config.host as string) ?? "127.0.0.1" + const port = (config.port as number) ?? 8123 + const database = (config.database as string) ?? "default" + const user = (config.user as string) ?? "default" + const password = (config.password as string) ?? "" + + const baseUrl = `http://${host}:${port}/` + + async function httpQuery(sql: string): Promise<{ columns: string[]; rows: unknown[][] }> { + // Use JSONCompactEachRowWithNamesAndTypes to get column names + typed rows + const params = new URLSearchParams({ + database, + default_format: "JSONCompactEachRowWithNamesAndTypes", + output_format_json_quote_64bit_integers: "0", + }) + + const headers: Record = { + "Content-Type": "text/plain", + "X-ClickHouse-Database": database, + } + + if (user !== "default" || password) { + // Basic auth via headers + headers["X-ClickHouse-User"] = user + if (password) headers["X-ClickHouse-Key"] = password + } + + const response = await fetch(`${baseUrl}?${params}`, { + method: "POST", + headers, + body: sql, + }) + + if (!response.ok) { + const body = await response.text() + throw new Error(`ClickHouse HTTP ${response.status}: ${body.slice(0, 500)}`) + } + + const text = await response.text() + const lines = text.trim().split("\n").filter(Boolean) + + if (lines.length < 2) { + // No rows (DDL or empty result) + return { columns: [], rows: [] } + } + + // First line: column names, Second line: column types, Rest: data rows + const columns: string[] = JSON.parse(lines[0]) + // lines[1] = types (ignored for now) + const rows: unknown[][] = lines.slice(2).map((line) => JSON.parse(line)) + + return { columns, rows } + } + + const connector: Connector = { + async connect() { + // Verify connection with a lightweight query + await httpQuery("SELECT 1") + }, + + async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) + const isSelectLike = /^\s*(SELECT|WITH)\b/i.test(sql) + + let query = sql + if (isSelectLike && effectiveLimit && !/\bLIMIT\b/i.test(sql)) { + query = `${sql.replace(/;\s*$/, "")} LIMIT ${effectiveLimit + 1}` + } + + const { columns, rows } = await httpQuery(query) + + const truncated = effectiveLimit > 0 && isSelectLike && rows.length > effectiveLimit + const finalRows = truncated ? rows.slice(0, effectiveLimit) : rows + + return { + columns, + rows: finalRows as unknown[][], + row_count: finalRows.length, + truncated, + } + }, + + async listSchemas(): Promise { + const { rows } = await httpQuery("SHOW DATABASES") + return rows.map((r) => String(r[0])) + }, + + async listTables(schema: string): Promise> { + const { rows } = await httpQuery(` + SELECT name, engine + FROM system.tables + WHERE database = '${schema}' + ORDER BY name + `) + return rows.map((r) => ({ + name: String(r[0]), + type: String(r[1]).toLowerCase().includes("view") ? "view" : "table", + })) + }, + + async describeTable(schema: string, table: string): Promise { + const { rows } = await httpQuery(` + SELECT + column_name, + data_type, + is_nullable + FROM information_schema.columns + WHERE table_schema = '${schema}' + AND table_name = '${table}' + ORDER BY ordinal_position + `) + return rows.map((row) => ({ + name: String(row[0]), + data_type: String(row[1]), + nullable: row[2] === "YES" || row[2] === "1", + })) + }, + + async close() { + // HTTP client — no persistent connection to close + }, + } + + return connector +} diff --git a/packages/drivers/src/databricks.ts b/packages/drivers/src/databricks.ts index ccb3d5f8f7..83e75dcd7c 100644 --- a/packages/drivers/src/databricks.ts +++ b/packages/drivers/src/databricks.ts @@ -2,7 +2,7 @@ * Databricks driver using the `@databricks/sql` package. */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let databricksModule: any @@ -44,8 +44,8 @@ export async function connect(config: ConnectionConfig): Promise { }) }, - async execute(sql: string, limit?: number, binds?: any[]): Promise { - const effectiveLimit = limit ?? 1000 + async execute(sql: string, limit?: number, binds?: any[], options?: ExecuteOptions): Promise { + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) let query = sql const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql) if ( @@ -65,7 +65,7 @@ export async function connect(config: ConnectionConfig): Promise { await operation.close() const columns = rows.length > 0 ? Object.keys(rows[0]) : [] - const truncated = rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && rows.length > effectiveLimit const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows return { diff --git a/packages/drivers/src/duckdb.ts b/packages/drivers/src/duckdb.ts index 2e6ea14839..d0d2b89506 100644 --- a/packages/drivers/src/duckdb.ts +++ b/packages/drivers/src/duckdb.ts @@ -2,7 +2,7 @@ * DuckDB driver using the `duckdb` package. */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let duckdb: any @@ -59,8 +59,8 @@ export async function connect(config: ConnectionConfig): Promise { connection = db.connect() }, - async execute(sql: string, limit?: number, binds?: any[]): Promise { - const effectiveLimit = limit ?? 1000 + async execute(sql: string, limit?: number, binds?: any[], options?: ExecuteOptions): Promise { + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) let finalSql = sql const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql) @@ -77,7 +77,7 @@ export async function connect(config: ConnectionConfig): Promise { : await query(finalSql) const columns = rows.length > 0 ? Object.keys(rows[0]) : [] - const truncated = rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && rows.length > effectiveLimit const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows return { diff --git a/packages/drivers/src/index.ts b/packages/drivers/src/index.ts index 73a8d7c2c1..8102d6e275 100644 --- a/packages/drivers/src/index.ts +++ b/packages/drivers/src/index.ts @@ -16,3 +16,4 @@ export { connect as connectOracle } from "./oracle" export { connect as connectDuckdb } from "./duckdb" export { connect as connectSqlite } from "./sqlite" export { connect as connectMongodb } from "./mongodb" +export { connect as connectClickhouse } from "./clickhouse" diff --git a/packages/drivers/src/mongodb.ts b/packages/drivers/src/mongodb.ts index 5757cf89f8..e2691c2931 100644 --- a/packages/drivers/src/mongodb.ts +++ b/packages/drivers/src/mongodb.ts @@ -14,7 +14,7 @@ * { "database": "mydb", "collection": "users", "command": "countDocuments", "filter": {} } */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" /** Supported MQL commands. */ type MqlCommand = @@ -241,7 +241,7 @@ export async function connect(config: ConnectionConfig): Promise { await client.connect() }, - async execute(query: string, limit?: number, _binds?: any[]): Promise { + async execute(query: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { let parsed: MqlQuery try { parsed = JSON.parse(query) as MqlQuery @@ -254,7 +254,7 @@ export async function connect(config: ConnectionConfig): Promise { } const db = resolveDb(parsed.database) - const effectiveLimit = limit ?? 1000 + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) const cmd = parsed.command // Commands that don't need a collection @@ -304,11 +304,15 @@ export async function connect(config: ConnectionConfig): Promise { if (parsed.sort) cursor = cursor.sort(parsed.sort) if (parsed.skip) cursor = cursor.skip(parsed.skip) // Cap user-specified limit against effectiveLimit to prevent OOM - const queryLimit = parsed.limit ? Math.min(parsed.limit, effectiveLimit) : effectiveLimit - cursor = cursor.limit(queryLimit + 1) + const queryLimit = effectiveLimit > 0 + ? (parsed.limit ? Math.min(parsed.limit, effectiveLimit) : effectiveLimit) + : (parsed.limit ?? 0) + if (queryLimit > 0) { + cursor = cursor.limit(queryLimit + 1) + } const docs = await cursor.toArray() - const truncated = docs.length > queryLimit + const truncated = queryLimit > 0 && docs.length > queryLimit const limited = truncated ? docs.slice(0, queryLimit) : docs if (limited.length === 0) { @@ -336,7 +340,7 @@ export async function connect(config: ConnectionConfig): Promise { // Cap or append $limit to prevent OOM. Skip for $out/$merge write pipelines. const pipeline = [...parsed.pipeline] const hasWrite = pipeline.some((stage) => "$out" in stage || "$merge" in stage) - if (!hasWrite) { + if (!hasWrite && effectiveLimit > 0) { const limitIdx = pipeline.findIndex((stage) => "$limit" in stage) if (limitIdx >= 0) { // Cap user-specified $limit against effectiveLimit @@ -351,7 +355,7 @@ export async function connect(config: ConnectionConfig): Promise { const docs = await coll.aggregate(pipeline).toArray() - const truncated = docs.length > effectiveLimit + const truncated = effectiveLimit > 0 && docs.length > effectiveLimit const limited = truncated ? docs.slice(0, effectiveLimit) : docs if (limited.length === 0) { @@ -386,7 +390,7 @@ export async function connect(config: ConnectionConfig): Promise { throw new Error("distinct requires a 'field' string") } const values = await coll.distinct(parsed.field, parsed.filter ?? {}) - const truncated = values.length > effectiveLimit + const truncated = effectiveLimit > 0 && values.length > effectiveLimit const limited = truncated ? values.slice(0, effectiveLimit) : values return { columns: [parsed.field], diff --git a/packages/drivers/src/mysql.ts b/packages/drivers/src/mysql.ts index 28c4a8def9..3859f5e993 100644 --- a/packages/drivers/src/mysql.ts +++ b/packages/drivers/src/mysql.ts @@ -2,7 +2,7 @@ * MySQL driver using the `mysql2` package. */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let mysql: any @@ -41,8 +41,8 @@ export async function connect(config: ConnectionConfig): Promise { pool = mysql.createPool(poolConfig) }, - async execute(sql: string, limit?: number, _binds?: any[]): Promise { - const effectiveLimit = limit ?? 1000 + async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) let query = sql const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql) if ( @@ -56,7 +56,7 @@ export async function connect(config: ConnectionConfig): Promise { const [rows, fields] = await pool.query(query) const columns = fields?.map((f: any) => f.name) ?? [] const rowsArr = Array.isArray(rows) ? rows : [] - const truncated = rowsArr.length > effectiveLimit + const truncated = effectiveLimit > 0 && rowsArr.length > effectiveLimit const limitedRows = truncated ? rowsArr.slice(0, effectiveLimit) : rowsArr diff --git a/packages/drivers/src/oracle.ts b/packages/drivers/src/oracle.ts index e3bab24819..39e4b11c37 100644 --- a/packages/drivers/src/oracle.ts +++ b/packages/drivers/src/oracle.ts @@ -2,7 +2,7 @@ * Oracle driver using the `oracledb` package (thin mode, pure JS). */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let oracledb: any @@ -37,8 +37,8 @@ export async function connect(config: ConnectionConfig): Promise { }) }, - async execute(sql: string, limit?: number, _binds?: any[]): Promise { - const effectiveLimit = limit ?? 1000 + async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) let query = sql const isSelectLike = /^\s*(SELECT|WITH)\b/i.test(sql) @@ -61,7 +61,7 @@ export async function connect(config: ConnectionConfig): Promise { const columns = result.metaData?.map((m: any) => m.name) ?? (rows.length > 0 ? Object.keys(rows[0]) : []) - const truncated = rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && rows.length > effectiveLimit const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows diff --git a/packages/drivers/src/postgres.ts b/packages/drivers/src/postgres.ts index cf0a475756..9169d61aaf 100644 --- a/packages/drivers/src/postgres.ts +++ b/packages/drivers/src/postgres.ts @@ -2,7 +2,7 @@ * PostgreSQL driver using the `pg` package. */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let pg: any @@ -39,7 +39,7 @@ export async function connect(config: ConnectionConfig): Promise { pool = new Pool(poolConfig) }, - async execute(sql: string, limit?: number, _binds?: any[]): Promise { + async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { const client = await pool.connect() try { if (config.statement_timeout) { @@ -49,7 +49,7 @@ export async function connect(config: ConnectionConfig): Promise { } let query = sql - const effectiveLimit = limit ?? 1000 + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql) // Add LIMIT only for SELECT-like queries and if not already present if ( @@ -62,7 +62,7 @@ export async function connect(config: ConnectionConfig): Promise { const result = await client.query(query) const columns = result.fields?.map((f: any) => f.name) ?? [] - const truncated = result.rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && result.rows.length > effectiveLimit const rows = truncated ? result.rows.slice(0, effectiveLimit) : result.rows diff --git a/packages/drivers/src/redshift.ts b/packages/drivers/src/redshift.ts index 752791174e..4ef3b85013 100644 --- a/packages/drivers/src/redshift.ts +++ b/packages/drivers/src/redshift.ts @@ -3,7 +3,7 @@ * Uses svv_ system views for introspection. */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let pg: any @@ -40,10 +40,10 @@ export async function connect(config: ConnectionConfig): Promise { pool = new Pool(poolConfig) }, - async execute(sql: string, limit?: number, _binds?: any[]): Promise { + async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { const client = await pool.connect() try { - const effectiveLimit = limit ?? 1000 + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) let query = sql const isSelectLike = /^\s*(SELECT|WITH|VALUES)\b/i.test(sql) if ( @@ -56,7 +56,7 @@ export async function connect(config: ConnectionConfig): Promise { const result = await client.query(query) const columns = result.fields?.map((f: any) => f.name) ?? [] - const truncated = result.rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && result.rows.length > effectiveLimit const rows = truncated ? result.rows.slice(0, effectiveLimit) : result.rows diff --git a/packages/drivers/src/snowflake.ts b/packages/drivers/src/snowflake.ts index 03cc1c84a7..6a37c6caaa 100644 --- a/packages/drivers/src/snowflake.ts +++ b/packages/drivers/src/snowflake.ts @@ -3,7 +3,7 @@ */ import * as fs from "fs" -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let snowflake: any @@ -232,8 +232,8 @@ export async function connect(config: ConnectionConfig): Promise { }) }, - async execute(sql: string, limit?: number, binds?: any[]): Promise { - const effectiveLimit = limit ?? 1000 + async execute(sql: string, limit?: number, binds?: any[], options?: ExecuteOptions): Promise { + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) let query = sql const isSelectLike = /^\s*(SELECT|WITH|VALUES|SHOW)\b/i.test(sql) if ( @@ -245,7 +245,7 @@ export async function connect(config: ConnectionConfig): Promise { } const result = await executeQuery(query, binds) - const truncated = result.rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && result.rows.length > effectiveLimit const rows = truncated ? result.rows.slice(0, effectiveLimit) : result.rows diff --git a/packages/drivers/src/sqlite.ts b/packages/drivers/src/sqlite.ts index 46d1e74ec8..48ef8321cd 100644 --- a/packages/drivers/src/sqlite.ts +++ b/packages/drivers/src/sqlite.ts @@ -4,7 +4,7 @@ */ import { Database } from "bun:sqlite" -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { const dbPath = (config.path as string) ?? ":memory:" @@ -22,9 +22,9 @@ export async function connect(config: ConnectionConfig): Promise { } }, - async execute(sql: string, limit?: number, _binds?: any[]): Promise { + async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { if (!db) throw new Error("SQLite connection not open") - const effectiveLimit = limit ?? 1000 + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) // Determine if this is a SELECT-like statement const trimmed = sql.trim().toLowerCase() @@ -60,7 +60,7 @@ export async function connect(config: ConnectionConfig): Promise { const stmt = db.prepare(query) const rows = stmt.all() as any[] const columns = rows.length > 0 ? Object.keys(rows[0]) : [] - const truncated = rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && rows.length > effectiveLimit const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows return { diff --git a/packages/drivers/src/sqlserver.ts b/packages/drivers/src/sqlserver.ts index b9aac91760..3ea1e390f3 100644 --- a/packages/drivers/src/sqlserver.ts +++ b/packages/drivers/src/sqlserver.ts @@ -2,7 +2,7 @@ * SQL Server driver using the `mssql` (tedious) package. */ -import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" +import type { ConnectionConfig, Connector, ConnectorResult, ExecuteOptions, SchemaColumn } from "./types" export async function connect(config: ConnectionConfig): Promise { let mssql: any @@ -42,8 +42,8 @@ export async function connect(config: ConnectionConfig): Promise { pool = await mssql.connect(mssqlConfig) }, - async execute(sql: string, limit?: number, _binds?: any[]): Promise { - const effectiveLimit = limit ?? 1000 + async execute(sql: string, limit?: number, _binds?: any[], options?: ExecuteOptions): Promise { + const effectiveLimit = options?.noLimit ? 0 : (limit ?? 1000) let query = sql const isSelectLike = /^\s*SELECT\b/i.test(sql) @@ -69,7 +69,7 @@ export async function connect(config: ConnectionConfig): Promise { : (result.recordset?.columns ? Object.keys(result.recordset.columns) : []) - const truncated = rows.length > effectiveLimit + const truncated = effectiveLimit > 0 && rows.length > effectiveLimit const limitedRows = truncated ? rows.slice(0, effectiveLimit) : rows return { diff --git a/packages/drivers/src/types.ts b/packages/drivers/src/types.ts index 31a7565134..3bc3760d6c 100644 --- a/packages/drivers/src/types.ts +++ b/packages/drivers/src/types.ts @@ -20,9 +20,15 @@ export interface SchemaColumn { nullable: boolean } +export interface ExecuteOptions { + /** Skip the default LIMIT injection and post-truncation. Use when the caller + * needs the complete, untruncated result set (e.g. data-diff pipelines). */ + noLimit?: boolean +} + export interface Connector { connect(): Promise - execute(sql: string, limit?: number, binds?: any[]): Promise + execute(sql: string, limit?: number, binds?: any[], options?: ExecuteOptions): Promise listSchemas(): Promise listTables(schema: string): Promise> describeTable(schema: string, table: string): Promise diff --git a/packages/opencode/src/altimate/native/connections/data-diff.ts b/packages/opencode/src/altimate/native/connections/data-diff.ts new file mode 100644 index 0000000000..0afc2c964a --- /dev/null +++ b/packages/opencode/src/altimate/native/connections/data-diff.ts @@ -0,0 +1,799 @@ +/** + * DataParity orchestrator — runs the cooperative Rust state machine against + * live database connections. + * + * The Rust engine (DataParitySession) never touches databases — it emits SQL + * for us to execute, we feed results back, and it decides the next step. + * This file is the bridge between that engine and altimate-code's drivers. + */ + +import type { DataDiffParams, DataDiffResult, PartitionDiffResult } from "../types" +import * as Registry from "./registry" + +// --------------------------------------------------------------------------- +// Query-source detection +// --------------------------------------------------------------------------- + +const SQL_KEYWORDS = /^\s*(SELECT|WITH|VALUES)\b/i + +/** + * Detect whether a string is an arbitrary SQL query (vs a plain table name). + * Plain table names may contain dots (schema.table, db.schema.table) but not spaces. + */ +function isQuery(input: string): boolean { + return SQL_KEYWORDS.test(input) +} + +/** + * If either source or target is an arbitrary query, wrap them in CTEs so the + * DataParity engine can treat them as tables named `__diff_source` / `__diff_target`. + * + * Returns `{ table1Name, table2Name, ctePrefix | null }`. + * + * When a CTE prefix is returned, it must be prepended to every SQL task emitted + * by the engine before execution. + */ +export function resolveTableSources( + source: string, + target: string, +): { table1Name: string; table2Name: string; ctePrefix: string | null } { + const source_is_query = isQuery(source) + const target_is_query = isQuery(target) + + if (!source_is_query && !target_is_query) { + // Both are plain table names — pass through unchanged + return { table1Name: source, table2Name: target, ctePrefix: null } + } + + // At least one is a query — wrap both in CTEs + const srcExpr = source_is_query ? source : `SELECT * FROM ${source}` + const tgtExpr = target_is_query ? target : `SELECT * FROM ${target}` + + const ctePrefix = `WITH __diff_source AS (\n${srcExpr}\n), __diff_target AS (\n${tgtExpr}\n)` + return { + table1Name: "__diff_source", + table2Name: "__diff_target", + ctePrefix, + } +} + +/** + * Inject a CTE prefix into a SQL statement from the engine. + * + * The engine emits standalone SELECT statements. We need to prepend our CTE + * definitions so `__diff_source`/`__diff_target` resolve correctly. + * + * Handles the case where the engine itself emits CTEs (starts with WITH …): + * WITH engine_cte AS (…) SELECT … FROM __diff_source + * becomes: + * WITH __diff_source AS (…), __diff_target AS (…), engine_cte AS (…) SELECT … + */ +export function injectCte(sql: string, ctePrefix: string): string { + const trimmed = sql.trimStart() + const withMatch = trimmed.match(/^WITH\s+/i) + + if (withMatch) { + // Engine also has CTEs — merge them: our CTEs first, then engine CTEs + const afterWith = trimmed.slice(withMatch[0].length) + // ctePrefix already starts with "WITH …" — strip "WITH " and append ", " + const ourDefs = ctePrefix.replace(/^WITH\s+/i, "") + return `WITH ${ourDefs},\n${afterWith}` + } + + // Plain SELECT — just prepend our CTE block + return `${ctePrefix}\n${trimmed}` +} + +// --------------------------------------------------------------------------- +// Executor +// --------------------------------------------------------------------------- + +type Rows = (string | null)[][] + +/** + * Execute a SQL statement against a named warehouse and return rows as string[][]. + */ +async function executeQuery(sql: string, warehouseName: string | undefined): Promise { + let connector + if (warehouseName) { + connector = await Registry.get(warehouseName) + } else { + const warehouses = Registry.list().warehouses + if (warehouses.length === 0) { + throw new Error("No default warehouse configured.") + } + connector = await Registry.get(warehouses[0].name) + } + + // Bypass the driver's default LIMIT — data-diff needs complete result sets. + const result = await connector.execute(sql, undefined, undefined, { noLimit: true }) + + // Normalise to string[][] — drivers return mixed types + return result.rows.map((row: unknown[]) => + row.map((v) => (v === null || v === undefined ? null : String(v))), + ) +} + +// --------------------------------------------------------------------------- +// Column auto-discovery and audit column exclusion +// --------------------------------------------------------------------------- + +/** + * Patterns that match audit/timestamp columns which should be excluded from + * value comparison by default. These columns typically differ between source + * and target due to ETL timing, sync metadata, or pipeline bookkeeping — + * not because of actual data discrepancies. + */ +const AUDIT_COLUMN_PATTERNS = [ + // Exact common names + /^(created|updated|modified|inserted|deleted|synced|published|ingested|loaded|extracted|refreshed)_(at|on|date|time|timestamp|ts|dt|epoch)$/i, + // Suffix patterns: *_at, *_on with temporal prefix + /_(created|updated|modified|inserted|deleted|synced|published|ingested|loaded|extracted|refreshed)$/i, + // ETL metadata columns + /^(etl|elt|dbt|pipeline|batch|sync|publish|ingest)_(created|updated|modified|loaded|run|timestamp|ts|time|at|epoch)/i, + /^(_sdc_|_airbyte_|_fivetran_|_stitch_|__hevo_)/i, + // Generic timestamp metadata + /^(last_updated|last_modified|date_updated|date_modified|date_created|row_updated|row_created)$/i, + /^(publisher_last_updated|publisher_updated)/i, + // Epoch variants + /(updated|modified|created|inserted|published|loaded|synced)_epoch/i, + /epoch_ms$/i, +] + +/** + * Check whether a column name matches known audit/timestamp patterns. + */ +function isAuditColumn(columnName: string): boolean { + return AUDIT_COLUMN_PATTERNS.some((pattern) => pattern.test(columnName)) +} + +// --------------------------------------------------------------------------- +// Auto-timestamp default detection (schema-level) +// --------------------------------------------------------------------------- + +/** + * Patterns that detect auto-generated timestamp/date defaults in column_default + * expressions. These functions produce the current time when a row is inserted + * (or updated), meaning the column value will inherently differ between source + * and target — not because of actual data discrepancies, but because of when + * each copy was written. + * + * Covers: PostgreSQL, MySQL/MariaDB, Snowflake, SQL Server, Oracle, + * ClickHouse, DuckDB, SQLite, Redshift, BigQuery, Databricks. + */ +const AUTO_TIMESTAMP_DEFAULT_PATTERNS = [ + // PostgreSQL, DuckDB, Redshift + /\bnow\s*\(\)/i, + /\bclock_timestamp\s*\(\)/i, + /\bstatement_timestamp\s*\(\)/i, + /\btransaction_timestamp\s*\(\)/i, + /\blocaltimestamp\b/i, + // Standard SQL — used by most dialects + /\bcurrent_timestamp\b/i, + // MySQL / MariaDB — "ON UPDATE CURRENT_TIMESTAMP" in the EXTRA column + /\bon\s+update\s+current_timestamp/i, + // Snowflake + /\bsysdate\s*\(\)/i, + // SQL Server + /\bgetdate\s*\(\)/i, + /\bsysdatetime\s*\(\)/i, + /\bsysutcdatetime\s*\(\)/i, + /\bsysdatetimeoffset\s*\(\)/i, + // Oracle + /\bSYSDATE\b/i, + /\bSYSTIMESTAMP\b/i, + // ClickHouse + /\btoday\s*\(\)/i, + // SQLite + /\bdatetime\s*\(\s*'now'/i, +] + +/** + * Check whether a column_default expression contains an auto-generating + * timestamp function. Also matches expressions that *contain* these functions + * (e.g. `(now() + '1 mon'::interval)`). + */ +function isAutoTimestampDefault(defaultExpr: string | null): boolean { + if (!defaultExpr) return false + return AUTO_TIMESTAMP_DEFAULT_PATTERNS.some((pattern) => pattern.test(defaultExpr)) +} + +// --------------------------------------------------------------------------- +// Column discovery (names + defaults) — dialect-aware +// --------------------------------------------------------------------------- + +interface ColumnInfo { + name: string + defaultExpr: string | null +} + +/** + * Build a query to discover column names and default expressions for a table. + * Returns both pieces of information in a single round-trip so we can detect + * auto-timestamp defaults without an extra query. + */ +function buildColumnDiscoverySQL(tableName: string, dialect: string): string { + // Parse schema.table or db.schema.table + const parts = tableName.split(".") + let schemaFilter = "" + let tableFilter = "" + + if (parts.length === 3) { + schemaFilter = `table_schema = '${parts[1]}'` + tableFilter = `table_name = '${parts[2]}'` + } else if (parts.length === 2) { + schemaFilter = `table_schema = '${parts[0]}'` + tableFilter = `table_name = '${parts[1]}'` + } else { + tableFilter = `table_name = '${parts[0]}'` + } + + switch (dialect) { + case "clickhouse": + // Returns: name, type, default_type, default_expression, ... + return `DESCRIBE TABLE ${tableName}` + case "snowflake": + // Returns: table_name, schema_name, column_name, data_type, null?, default, ... + return `SHOW COLUMNS IN TABLE ${tableName}` + case "mysql": + case "mariadb": { + // MySQL puts "on update CURRENT_TIMESTAMP" in the EXTRA column, not column_default + const conditions = [tableFilter] + if (schemaFilter) conditions.push(schemaFilter) + return `SELECT column_name, column_default, extra FROM information_schema.columns WHERE ${conditions.join(" AND ")} ORDER BY ordinal_position` + } + case "oracle": { + // Oracle uses ALL_TAB_COLUMNS (no information_schema) + const oracleTable = parts[parts.length - 1] + const conditions = [`TABLE_NAME = '${oracleTable.toUpperCase()}'`] + if (parts.length >= 2) { + conditions.push(`OWNER = '${parts[parts.length - 2].toUpperCase()}'`) + } + return `SELECT COLUMN_NAME, DATA_DEFAULT FROM ALL_TAB_COLUMNS WHERE ${conditions.join(" AND ")} ORDER BY COLUMN_ID` + } + case "sqlite": { + // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk + const table = parts[parts.length - 1] + return `PRAGMA table_info('${table}')` + } + default: { + // Postgres, Redshift, DuckDB, SQL Server, BigQuery, Databricks, etc. + const conditions = [tableFilter] + if (schemaFilter) conditions.push(schemaFilter) + return `SELECT column_name, column_default FROM information_schema.columns WHERE ${conditions.join(" AND ")} ORDER BY ordinal_position` + } + } +} + +/** + * Parse column info (name + default expression) from the discovery query result, + * handling dialect-specific output formats. + */ +function parseColumnInfo(rows: (string | null)[][], dialect: string): ColumnInfo[] { + switch (dialect) { + case "clickhouse": + // DESCRIBE: name[0], type[1], default_type[2], default_expression[3], ... + return rows.map((r) => ({ + name: r[0] ?? "", + defaultExpr: r[3] ?? null, + })).filter((c) => c.name) + case "snowflake": + // SHOW COLUMNS: table_name[0], schema_name[1], column_name[2], data_type[3], null?[4], default[5], ... + return rows.map((r) => ({ + name: r[2] ?? "", + defaultExpr: r[5] ?? null, + })).filter((c) => c.name) + case "oracle": + // ALL_TAB_COLUMNS: COLUMN_NAME[0], DATA_DEFAULT[1] + return rows.map((r) => ({ + name: r[0] ?? "", + defaultExpr: r[1] ?? null, + })).filter((c) => c.name) + case "sqlite": + // PRAGMA table_info: cid[0], name[1], type[2], notnull[3], dflt_value[4], pk[5] + return rows.map((r) => ({ + name: r[1] ?? "", + defaultExpr: r[4] ?? null, + })).filter((c) => c.name) + case "mysql": + case "mariadb": + // column_name[0], column_default[1], extra[2] + // Merge default + extra — MySQL puts "on update CURRENT_TIMESTAMP" in extra + return rows.map((r) => ({ + name: r[0] ?? "", + defaultExpr: [r[1], r[2]].filter(Boolean).join(" ") || null, + })).filter((c) => c.name) + default: + // Postgres, Redshift, DuckDB, SQL Server, BigQuery: column_name[0], column_default[1] + return rows.map((r) => ({ + name: r[0] ?? "", + defaultExpr: r[1] ?? null, + })).filter((c) => c.name) + } +} + +/** + * Auto-discover non-key, non-audit columns for a table. + * + * When the caller omits `extra_columns`, we query the source table's schema to + * find all columns, then exclude: + * 1. Key columns (already used for matching) + * 2. Audit/timestamp columns matched by name pattern (updated_at, created_at, etc.) + * 3. Columns with auto-generating timestamp defaults (DEFAULT NOW(), CURRENT_TIMESTAMP, + * GETDATE(), SYSDATE, etc.) — detected from the database catalog + * + * The schema-level default detection (layer 3) catches columns that don't follow + * naming conventions but still auto-generate values on INSERT — these inherently + * differ between source and target due to when each copy was written. + * + * Returns the list of columns to compare, or undefined if discovery fails + * (in which case the engine falls back to key-only comparison). + */ +async function discoverExtraColumns( + tableName: string, + keyColumns: string[], + dialect: string, + warehouseName: string | undefined, +): Promise<{ columns: string[]; excludedAudit: string[] } | undefined> { + // Only works for plain table names, not SQL queries + if (SQL_KEYWORDS.test(tableName)) return undefined + + try { + const sql = buildColumnDiscoverySQL(tableName, dialect) + const rows = await executeQuery(sql, warehouseName) + const columnInfos = parseColumnInfo(rows, dialect) + + if (columnInfos.length === 0) return undefined + + const keySet = new Set(keyColumns.map((k) => k.toLowerCase())) + const extraColumns: string[] = [] + const excludedAudit: string[] = [] + + for (const col of columnInfos) { + if (keySet.has(col.name.toLowerCase())) continue + if (isAuditColumn(col.name) || isAutoTimestampDefault(col.defaultExpr)) { + excludedAudit.push(col.name) + } else { + extraColumns.push(col.name) + } + } + + return { columns: extraColumns, excludedAudit } + } catch { + // Schema discovery failed — fall back to engine default (key-only) + return undefined + } +} + +// --------------------------------------------------------------------------- +// Main orchestrator +// --------------------------------------------------------------------------- + +const MAX_STEPS = 200 + +// --------------------------------------------------------------------------- +// Partition support +// --------------------------------------------------------------------------- + +/** + * Build a DATE_TRUNC expression appropriate for the warehouse dialect. + */ +function dateTruncExpr(granularity: string, column: string, dialect: string): string { + const g = granularity.toLowerCase() + switch (dialect) { + case "bigquery": + return `DATE_TRUNC(${column}, ${g.toUpperCase()})` + case "clickhouse": + return `toStartOf${g.charAt(0).toUpperCase() + g.slice(1)}(${column})` + case "mysql": + case "mariadb": { + const fmt = { day: "%Y-%m-%d", week: "%Y-%u", month: "%Y-%m-01", year: "%Y-01-01" }[g] ?? "%Y-%m-01" + return `DATE_FORMAT(${column}, '${fmt}')` + } + default: + // Postgres, Snowflake, Redshift, DuckDB, etc. + return `DATE_TRUNC('${g}', ${column})` + } +} + +/** + * Determine the partition mode based on which params are provided. + * - "date" → partition_granularity is set (or column looks like a date) + * - "numeric" → partition_bucket_size is set + * - "categorical" → neither — use DISTINCT values directly (string, enum, boolean) + */ +function partitionMode( + granularity: string | undefined, + bucketSize: number | undefined, +): "date" | "numeric" | "categorical" { + if (bucketSize != null) return "numeric" + if (granularity != null) return "date" + return "categorical" +} + +/** + * Build SQL to discover distinct partition values from the source table. + */ +function buildPartitionDiscoverySQL( + table: string, + partitionColumn: string, + granularity: string | undefined, + bucketSize: number | undefined, + dialect: string, + whereClause?: string, +): string { + const where = whereClause ? `WHERE ${whereClause}` : "" + const mode = partitionMode(granularity, bucketSize) + + let expr: string + if (mode === "numeric") { + expr = `FLOOR(${partitionColumn} / ${bucketSize}) * ${bucketSize}` + } else if (mode === "date") { + expr = dateTruncExpr(granularity!, partitionColumn, dialect) + } else { + // categorical — raw distinct values, no transformation + expr = partitionColumn + } + + return `SELECT DISTINCT ${expr} AS _p FROM ${table} ${where} ORDER BY _p` +} + +/** + * Build a WHERE clause that scopes to a single partition. + */ +function buildPartitionWhereClause( + partitionColumn: string, + partitionValue: string, + granularity: string | undefined, + bucketSize: number | undefined, + dialect: string, +): string { + const mode = partitionMode(granularity, bucketSize) + + if (mode === "numeric") { + const lo = Number(partitionValue) + const hi = lo + bucketSize! + return `${partitionColumn} >= ${lo} AND ${partitionColumn} < ${hi}` + } + + if (mode === "categorical") { + // Quote the value — works for strings, enums, booleans + const escaped = partitionValue.replace(/'/g, "''") + return `${partitionColumn} = '${escaped}'` + } + + // date mode + const expr = dateTruncExpr(granularity!, partitionColumn, dialect) + + // Cast the literal appropriately per dialect + switch (dialect) { + case "bigquery": + return `${expr} = '${partitionValue}'` + case "clickhouse": + return `${expr} = toDate('${partitionValue}')` + case "mysql": + case "mariadb": + return `${expr} = '${partitionValue}'` + default: + return `${expr} = '${partitionValue}'` + } +} + +/** + * Extract DiffStats from a successful outcome (if present). + * + * Rust serializes ReladiffOutcome as: {mode: "diff", diff_rows: [...], stats: {...}} + * stats fields: rows_table1, rows_table2, exclusive_table1, exclusive_table2, updated, unchanged + */ +function extractStats(outcome: unknown): { + rows_source: number + rows_target: number + differences: number + status: "identical" | "differ" +} { + const o = outcome as any + if (!o) return { rows_source: 0, rows_target: 0, differences: 0, status: "identical" } + + if (o.mode === "diff") { + const s = o.stats ?? {} + const exclusive1 = Number(s.exclusive_table1 ?? 0) + const exclusive2 = Number(s.exclusive_table2 ?? 0) + const updated = Number(s.updated ?? 0) + const differences = exclusive1 + exclusive2 + updated + return { + rows_source: Number(s.rows_table1 ?? 0), + rows_target: Number(s.rows_table2 ?? 0), + differences, + status: differences > 0 ? "differ" : "identical", + } + } + + return { rows_source: 0, rows_target: 0, differences: 0, status: "identical" } +} + +/** + * Merge two diff outcomes into one aggregated outcome. + * + * Both outcomes use the Rust shape: {mode: "diff", diff_rows: [...], stats: {...}} + */ +function mergeOutcomes(accumulated: unknown, next: unknown): unknown { + if (!accumulated) return next + if (!next) return accumulated + + const a = accumulated as any + const n = next as any + + const aS = a.stats ?? {} + const nS = n.stats ?? {} + + const rows_table1 = (Number(aS.rows_table1) || 0) + (Number(nS.rows_table1) || 0) + const rows_table2 = (Number(aS.rows_table2) || 0) + (Number(nS.rows_table2) || 0) + const exclusive_table1 = (Number(aS.exclusive_table1) || 0) + (Number(nS.exclusive_table1) || 0) + const exclusive_table2 = (Number(aS.exclusive_table2) || 0) + (Number(nS.exclusive_table2) || 0) + const updated = (Number(aS.updated) || 0) + (Number(nS.updated) || 0) + const unchanged = (Number(aS.unchanged) || 0) + (Number(nS.unchanged) || 0) + + const totalRows = rows_table1 + rows_table2 + const totalDiff = exclusive_table1 + exclusive_table2 + updated + const diff_percent = totalRows > 0 ? (totalDiff / totalRows) * 100 : 0 + + return { + mode: "diff", + diff_rows: [...(a.diff_rows ?? []), ...(n.diff_rows ?? [])].slice(0, 100), + stats: { rows_table1, rows_table2, exclusive_table1, exclusive_table2, updated, unchanged, diff_percent }, + } +} + +/** + * Run a partitioned diff: discover partition values, diff each partition independently, + * then aggregate results. + */ +async function runPartitionedDiff(params: DataDiffParams): Promise { + const resolveDialect = (warehouse: string | undefined): string => { + if (warehouse) { + const cfg = Registry.getConfig(warehouse) + return cfg?.type ?? "generic" + } + const warehouses = Registry.list().warehouses + return warehouses[0]?.type ?? "generic" + } + + const sourceDialect = resolveDialect(params.source_warehouse) + const { table1Name } = resolveTableSources(params.source, params.target) + + // Discover partition values from source + const discoverySql = buildPartitionDiscoverySQL( + table1Name, + params.partition_column!, + params.partition_granularity, + params.partition_bucket_size, + sourceDialect, + params.where_clause, + ) + + let partitionValues: string[] + try { + const rows = await executeQuery(discoverySql, params.source_warehouse) + partitionValues = rows.map((r) => String(r[0] ?? "")).filter(Boolean) + } catch (e) { + return { success: false, error: `Partition discovery failed: ${e}`, steps: 0 } + } + + if (partitionValues.length === 0) { + return { success: true, steps: 1, outcome: { Match: { row_count: 0, algorithm: "partitioned" } }, partition_results: [] } + } + + // Diff each partition + const partitionResults: PartitionDiffResult[] = [] + let aggregatedOutcome: unknown = null + let totalSteps = 1 + + for (const pVal of partitionValues) { + const partWhere = buildPartitionWhereClause( + params.partition_column!, + pVal, + params.partition_granularity, + params.partition_bucket_size, + sourceDialect, + ) + const fullWhere = params.where_clause ? `(${params.where_clause}) AND (${partWhere})` : partWhere + + const result = await runDataDiff({ + ...params, + where_clause: fullWhere, + partition_column: undefined, // prevent recursion + }) + + totalSteps += result.steps + + if (!result.success) { + partitionResults.push({ partition: pVal, rows_source: 0, rows_target: 0, differences: 0, status: "error", error: result.error }) + continue + } + + const stats = extractStats(result.outcome) + partitionResults.push({ partition: pVal, ...stats }) + aggregatedOutcome = aggregatedOutcome == null ? result.outcome : mergeOutcomes(aggregatedOutcome, result.outcome) + } + + return { + success: true, + steps: totalSteps, + outcome: aggregatedOutcome ?? { Match: { row_count: 0, algorithm: "partitioned" } }, + partition_results: partitionResults, + } +} + +export async function runDataDiff(params: DataDiffParams): Promise { + // Dispatch to partitioned diff if partition_column is set + if (params.partition_column) { + return runPartitionedDiff(params) + } + + // Dynamically import NAPI module (not available in test environments without the binary) + let DataParitySession: new (specJson: string) => { + start(): string + step(responsesJson: string): string + } + + try { + const core = await import("@altimateai/altimate-core") + DataParitySession = (core as any).DataParitySession + if (!DataParitySession) throw new Error("DataParitySession not exported from @altimateai/altimate-core") + } catch (e) { + return { + success: false, + error: `altimate-core NAPI module unavailable: ${e}`, + steps: 0, + } + } + + // Resolve sources (plain table names vs arbitrary queries) + const { table1Name, table2Name, ctePrefix } = resolveTableSources( + params.source, + params.target, + ) + + // Parse optional qualified names: "db.schema.table" → { database, schema, table } + const parseQualified = (name: string) => { + const parts = name.split(".") + if (parts.length === 3) return { database: parts[0], schema: parts[1], table: parts[2] } + if (parts.length === 2) return { schema: parts[0], table: parts[1] } + return { table: name } + } + + const table1Ref = parseQualified(table1Name) + const table2Ref = parseQualified(table2Name) + + // Resolve dialect from warehouse config + const resolveDialect = (warehouse: string | undefined): string => { + if (warehouse) { + const cfg = Registry.getConfig(warehouse) + return cfg?.type ?? "generic" + } + const warehouses = Registry.list().warehouses + return warehouses[0]?.type ?? "generic" + } + + const dialect1 = resolveDialect(params.source_warehouse) + const dialect2 = resolveDialect(params.target_warehouse ?? params.source_warehouse) + + // Auto-discover extra_columns when not explicitly provided. + // The Rust engine only compares columns listed in extra_columns — if the list is + // empty, it compares key existence only and reports all matched rows as "identical" + // even when non-key values differ. This auto-discovery prevents that silent bug. + let extraColumns = params.extra_columns + let excludedAuditColumns: string[] = [] + + if (!extraColumns || extraColumns.length === 0) { + const discovered = await discoverExtraColumns( + params.source, + params.key_columns, + dialect1, + params.source_warehouse, + ) + if (discovered) { + extraColumns = discovered.columns + excludedAuditColumns = discovered.excludedAudit + } + } + + // Build session spec + const spec = { + table1: table1Ref, + table2: table2Ref, + dialect1, + dialect2, + config: { + algorithm: params.algorithm ?? "auto", + key_columns: params.key_columns, + extra_columns: extraColumns ?? [], + ...(params.where_clause ? { where_clause: params.where_clause } : {}), + ...(params.numeric_tolerance != null ? { numeric_tolerance: params.numeric_tolerance } : {}), + ...(params.timestamp_tolerance_ms != null + ? { timestamp_tolerance_ms: params.timestamp_tolerance_ms } + : {}), + }, + } + + // Create session + let session: InstanceType + try { + session = new DataParitySession(JSON.stringify(spec)) + } catch (e) { + return { + success: false, + error: `Failed to create DataParitySession: ${e}`, + steps: 0, + } + } + + // Route SQL tasks to the correct warehouse + const warehouseFor = (tableSide: string): string | undefined => + tableSide === "Table2" ? (params.target_warehouse ?? params.source_warehouse) : params.source_warehouse + + // Cooperative loop + let actionJson = session.start() + let stepCount = 0 + + while (stepCount < MAX_STEPS) { + const action = JSON.parse(actionJson) as { + type: string + tasks?: Array<{ id: string; table_side: string; sql: string; expected_shape: string }> + outcome?: unknown + message?: string + } + + if (action.type === "Done") { + return { + success: true, + steps: stepCount, + outcome: action.outcome, + ...(excludedAuditColumns.length > 0 ? { excluded_audit_columns: excludedAuditColumns } : {}), + } + } + + if (action.type === "Error") { + return { + success: false, + error: action.message ?? "Unknown engine error", + steps: stepCount, + } + } + + if (action.type !== "ExecuteSql") { + return { + success: false, + error: `Unexpected action type: ${action.type}`, + steps: stepCount, + } + } + + stepCount++ + + // Execute all SQL tasks in parallel + const tasks = action.tasks ?? [] + const responses = await Promise.all( + tasks.map(async (task) => { + const warehouse = warehouseFor(task.table_side) + // Inject CTE definitions if we're in query-comparison mode + const sql = ctePrefix ? injectCte(task.sql, ctePrefix) : task.sql + try { + const rows = await executeQuery(sql, warehouse) + return { id: task.id, rows } + } catch (e) { + // Return error shape — engine will produce an Error action on next step + return { id: task.id, rows: [], error: String(e) } + } + }), + ) + + actionJson = session.step(JSON.stringify(responses)) + } + + return { + success: false, + error: `Exceeded maximum step limit (${MAX_STEPS}). The diff may require more iterations for this table size.`, + steps: stepCount, + } +} diff --git a/packages/opencode/src/altimate/native/connections/register.ts b/packages/opencode/src/altimate/native/connections/register.ts index 7267a142d0..b93e1e1570 100644 --- a/packages/opencode/src/altimate/native/connections/register.ts +++ b/packages/opencode/src/altimate/native/connections/register.ts @@ -10,6 +10,7 @@ import { register } from "../dispatcher" import * as Registry from "./registry" import { discoverContainers } from "./docker-discovery" import { parseDbtProfiles } from "./dbt-profiles" +import { runDataDiff } from "./data-diff" import type { SqlExecuteParams, SqlExecuteResult, @@ -29,6 +30,8 @@ import type { SchemaInspectResult, DbtProfilesParams, DbtProfilesResult, + DataDiffParams, + DataDiffResult, } from "../types" import type { ConnectionConfig } from "@altimateai/drivers" import { Telemetry } from "../../../telemetry" @@ -425,6 +428,11 @@ register("dbt.profiles", async (params: DbtProfilesParams): Promise => { + return runDataDiff(params) +}) + } // end registerAll // Auto-register on module load diff --git a/packages/opencode/src/altimate/native/connections/registry.ts b/packages/opencode/src/altimate/native/connections/registry.ts index 5aaafdd640..2fa7e24237 100644 --- a/packages/opencode/src/altimate/native/connections/registry.ts +++ b/packages/opencode/src/altimate/native/connections/registry.ts @@ -128,6 +128,7 @@ const DRIVER_MAP: Record = { sqlite: "@altimateai/drivers/sqlite", mongodb: "@altimateai/drivers/mongodb", mongo: "@altimateai/drivers/mongodb", + clickhouse: "@altimateai/drivers/clickhouse", } async function createConnector(name: string, config: ConnectionConfig): Promise { @@ -193,6 +194,9 @@ async function createConnector(name: string, config: ConnectionConfig): Promise< case "@altimateai/drivers/mongodb": mod = await import("@altimateai/drivers/mongodb") break + case "@altimateai/drivers/clickhouse": + mod = await import("@altimateai/drivers/clickhouse") + break default: throw new Error(`No static import available for driver: ${driverPath}`) } diff --git a/packages/opencode/src/altimate/native/types.ts b/packages/opencode/src/altimate/native/types.ts index 8d0f3978fc..8d789a244b 100644 --- a/packages/opencode/src/altimate/native/types.ts +++ b/packages/opencode/src/altimate/native/types.ts @@ -955,6 +955,75 @@ export interface LocalTestResult { error?: string } +// --- Data Diff --- + +export interface DataDiffParams { + /** Source table name (e.g. "orders", "db.schema.orders") or full SQL query */ + source: string + /** Target table name or SQL query */ + target: string + /** Primary key columns that uniquely identify each row */ + key_columns: string[] + /** Source warehouse connection name */ + source_warehouse?: string + /** Target warehouse connection name (defaults to source_warehouse) */ + target_warehouse?: string + /** Extra columns to compare beyond the key */ + extra_columns?: string[] + /** Algorithm: "auto" | "joindiff" | "hashdiff" | "profile" | "cascade" */ + algorithm?: string + /** Optional WHERE filter applied to both tables */ + where_clause?: string + /** Absolute numeric tolerance */ + numeric_tolerance?: number + /** Timestamp tolerance in milliseconds */ + timestamp_tolerance_ms?: number + /** + * Column to partition on before diffing. The table is split into groups by + * this column and each group is diffed independently. Results are aggregated. + * Use for large tables where bisection alone is too slow or imprecise. + * + * Examples: "l_shipdate" (date column), "l_orderkey" (numeric column) + */ + partition_column?: string + /** + * Granularity for date partition columns: "day" | "week" | "month" | "year". + * For numeric columns, ignored — use partition_bucket_size instead. + * Defaults to "month". + */ + partition_granularity?: "day" | "week" | "month" | "year" + /** + * For numeric partition columns: size of each bucket. + * E.g. 100000 splits l_orderkey into [0, 100000), [100000, 200000), … + */ + partition_bucket_size?: number +} + +export interface PartitionDiffResult { + /** The partition value (date string or numeric bucket start) */ + partition: string + /** Source row count in this partition */ + rows_source: number + /** Target row count in this partition */ + rows_target: number + /** Total differences found (exclusive + updated) */ + differences: number + /** "identical" | "differ" | "error" */ + status: "identical" | "differ" | "error" + error?: string +} + +export interface DataDiffResult { + success: boolean + steps: number + outcome?: unknown + error?: string + /** Per-partition breakdown when partition_column is used */ + partition_results?: PartitionDiffResult[] + /** Columns auto-excluded from comparison (audit/timestamp columns like updated_at, created_at) */ + excluded_audit_columns?: string[] +} + // --- Method registry --- export const BridgeMethods = { @@ -998,6 +1067,8 @@ export const BridgeMethods = { // --- local testing --- "local.schema_sync": {} as { params: LocalSchemaSyncParams; result: LocalSchemaSyncResult }, "local.test": {} as { params: LocalTestParams; result: LocalTestResult }, + // --- data diff --- + "data.diff": {} as { params: DataDiffParams; result: DataDiffResult }, // --- altimate-core (existing) --- "altimate_core.validate": {} as { params: AltimateCoreValidateParams; result: AltimateCoreResult }, "altimate_core.lint": {} as { params: AltimateCoreLintParams; result: AltimateCoreResult }, diff --git a/packages/opencode/src/altimate/tools/data-diff.ts b/packages/opencode/src/altimate/tools/data-diff.ts new file mode 100644 index 0000000000..bf99487483 --- /dev/null +++ b/packages/opencode/src/altimate/tools/data-diff.ts @@ -0,0 +1,257 @@ +import z from "zod" +import { Tool } from "../../tool/tool" +import { Dispatcher } from "../native" + +export const DataDiffTool = Tool.define("data_diff", { + description: [ + "Compare two database tables or query results row-by-row to find differences.", + "", + "Two use cases:", + "1. Migration validation — compare the same table across two databases:", + ' source="orders" source_warehouse="postgres_prod" target_warehouse="snowflake_dw"', + "2. Query optimization — compare results of two SQL queries on the same database:", + ' source="SELECT id, amount FROM orders WHERE ..." target="SELECT id, amount FROM orders_v2 WHERE ..."', + "", + "Algorithms:", + "- auto: JoinDiff if same dialect, HashDiff if cross-database (default)", + "- joindiff: FULL OUTER JOIN (fast, same-database only)", + "- hashdiff: Bisection with checksums (cross-database, any scale)", + "- profile: Column-level statistics comparison", + ].join("\n"), + parameters: z.object({ + source: z.string().describe( + "Source table name (e.g. 'orders', 'db.schema.orders') or a full SQL query starting with SELECT/WITH", + ), + target: z.string().describe( + "Target table name or SQL query to compare against source", + ), + key_columns: z + .array(z.string()) + .describe("Primary key columns that uniquely identify each row (e.g. ['id'] or ['order_id', 'line_item'])"), + source_warehouse: z.string().optional().describe("Source warehouse connection name"), + target_warehouse: z.string().optional().describe( + "Target warehouse connection name. Omit to use the same warehouse as source (query comparison mode)", + ), + extra_columns: z + .array(z.string()) + .optional() + .describe( + "Columns to compare beyond the key columns. " + + "IMPORTANT: If omitted AND source is a plain table name, columns are auto-discovered from the schema " + + "(excluding key columns, audit/timestamp columns matched by name like updated_at/created_at, " + + "and columns with auto-generating timestamp defaults like DEFAULT NOW()/CURRENT_TIMESTAMP/GETDATE()/SYSDATE). " + + "If omitted AND source is a SQL query, ONLY key columns are compared — value changes in non-key columns will NOT be detected. " + + "Always provide explicit extra_columns when comparing SQL queries to ensure value-level comparison." + ), + algorithm: z + .enum(["auto", "joindiff", "hashdiff", "profile", "cascade"]) + .optional() + .default("auto") + .describe("Comparison algorithm"), + where_clause: z.string().optional().describe("Optional WHERE filter applied to both tables"), + numeric_tolerance: z + .number() + .optional() + .describe("Absolute tolerance for numeric comparisons (e.g. 0.01 for cent-level tolerance)"), + timestamp_tolerance_ms: z + .number() + .optional() + .describe("Tolerance for timestamp comparisons in milliseconds"), + partition_column: z + .string() + .optional() + .describe( + "Column to partition on before diffing. Splits the table into groups and diffs each independently. " + + "Three modes depending on which other params you set:\n" + + " • Date column → set partition_granularity (day/week/month/year). E.g. partition_column='l_shipdate', partition_granularity='month'\n" + + " • Numeric column → set partition_bucket_size. E.g. partition_column='l_orderkey', partition_bucket_size=100000\n" + + " • Categorical → set neither. Works for string/enum/boolean columns like 'status', 'region', 'country'. Groups by distinct values.\n" + + "Results are aggregated with a per-partition breakdown showing which groups have differences.", + ), + partition_granularity: z + .enum(["day", "week", "month", "year"]) + .optional() + .describe("For date partition columns: truncation granularity. Omit for numeric or categorical columns."), + partition_bucket_size: z + .number() + .optional() + .describe("For numeric partition columns: size of each bucket. E.g. 100000 splits l_orderkey into ranges of 100K. Omit for date or categorical columns."), + }), + async execute(args, ctx) { + // Require read permission — data diff executes SELECT queries + await ctx.ask({ + permission: "sql_execute_read", + patterns: [args.source.slice(0, 120), args.target.slice(0, 120)], + always: ["*"], + metadata: {}, + }) + + try { + const result = await Dispatcher.call("data.diff", { + source: args.source, + target: args.target, + key_columns: args.key_columns, + source_warehouse: args.source_warehouse, + target_warehouse: args.target_warehouse, + extra_columns: args.extra_columns, + algorithm: args.algorithm, + where_clause: args.where_clause, + numeric_tolerance: args.numeric_tolerance, + timestamp_tolerance_ms: args.timestamp_tolerance_ms, + partition_column: args.partition_column, + partition_granularity: args.partition_granularity, + partition_bucket_size: args.partition_bucket_size, + }) + + if (!result.success) { + return { + title: "Data diff: ERROR", + metadata: { success: false, steps: result.steps }, + output: `Data diff failed: ${result.error}`, + } + } + + const outcome = result.outcome as any + let output = formatOutcome(outcome, args.source, args.target) + + if (result.partition_results?.length) { + output += formatPartitionResults(result.partition_results, args.partition_column!) + } + + // Report auto-excluded columns so the LLM and user know what was skipped + const excluded = (result as any).excluded_audit_columns as string[] | undefined + if (excluded && excluded.length > 0) { + output += `\n\n Note: ${excluded.length} column${excluded.length === 1 ? "" : "s"} auto-excluded from comparison (audit name patterns + auto-timestamp defaults like NOW()/CURRENT_TIMESTAMP): ${excluded.join(", ")}` + } + + return { + title: `Data diff: ${summarize(outcome)}`, + metadata: { success: true, steps: result.steps }, + output, + } + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + return { + title: "Data diff: ERROR", + metadata: { success: false, steps: 0, error: msg }, + output: `Data diff failed: ${msg}`, + } + } + }, +}) + +function summarize(outcome: any): string { + if (!outcome) return "complete" + + // Rust serializes ReladiffOutcome as {mode: "diff"|"profile"|..., stats: {...}, diff_rows: [...]} + if (outcome.mode === "diff") { + const s = outcome.stats ?? {} + const e1 = Number(s.exclusive_table1 ?? 0) + const e2 = Number(s.exclusive_table2 ?? 0) + const upd = Number(s.updated ?? 0) + if (e1 === 0 && e2 === 0 && upd === 0) return "IDENTICAL ✓" + const parts: string[] = [] + if (e1 > 0) parts.push(`${e1} only in source`) + if (e2 > 0) parts.push(`${e2} only in target`) + if (upd > 0) parts.push(`${upd} updated`) + return parts.join(", ") + } + if (outcome.mode === "profile") return "profile complete" + if (outcome.mode === "cascade") return "cascade complete" + + return "complete" +} + +function formatOutcome(outcome: any, source: string, target: string): string { + if (!outcome) return "Comparison complete." + + const lines: string[] = [] + + // Rust serializes ReladiffOutcome as {mode: "diff", diff_rows: [...], stats: {...}} + // stats: rows_table1, rows_table2, exclusive_table1, exclusive_table2, updated, unchanged + if (outcome.mode === "diff") { + const s = outcome.stats ?? {} + const rows1 = Number(s.rows_table1 ?? 0) + const rows2 = Number(s.rows_table2 ?? 0) + const e1 = Number(s.exclusive_table1 ?? 0) + const e2 = Number(s.exclusive_table2 ?? 0) + const updated = Number(s.updated ?? 0) + const unchanged = Number(s.unchanged ?? 0) + + if (e1 === 0 && e2 === 0 && updated === 0) { + lines.push(`✓ Tables are IDENTICAL`) + if (rows1 > 0) lines.push(` Rows checked: ${rows1.toLocaleString()}`) + return lines.join("\n") + } + + lines.push(`✗ Tables DIFFER`) + lines.push(``) + lines.push(` Source: ${source}`) + lines.push(` Target: ${target}`) + lines.push(``) + + if (rows1 > 0) lines.push(` Source rows: ${rows1.toLocaleString()}`) + if (rows2 > 0) lines.push(` Target rows: ${rows2.toLocaleString()}`) + if (e1 > 0) lines.push(` Only in source: ${e1.toLocaleString()}`) + if (e2 > 0) lines.push(` Only in target: ${e2.toLocaleString()}`) + if (updated > 0) lines.push(` Updated rows: ${updated.toLocaleString()}`) + if (unchanged > 0) lines.push(` Identical rows: ${unchanged.toLocaleString()}`) + + const diffRows = outcome.diff_rows ?? [] + if (diffRows.length > 0) { + lines.push(``) + lines.push(` Sample differences (first ${Math.min(diffRows.length, 5)}):`) + for (const d of diffRows.slice(0, 5)) { + const label = d.sign === "-" ? "source only" : "target only" + lines.push(` [${label}] ${d.values?.join(" | ")}`) + } + } + + return lines.join("\n") + } + + if (outcome.mode === "profile") { + const cols = outcome.column_stats ?? outcome.columns ?? [] + lines.push(`Column Profile Comparison`) + lines.push(``) + for (const col of cols) { + const verdict = col.verdict === "match" ? "✓" : col.verdict === "within_tolerance" ? "~" : "✗" + lines.push(` ${verdict} ${col.column}: ${col.verdict}`) + if (col.source_stats && col.target_stats) { + lines.push(` source: count=${col.source_stats.count} nulls=${col.source_stats.null_count} min=${col.source_stats.min} max=${col.source_stats.max}`) + lines.push(` target: count=${col.target_stats.count} nulls=${col.target_stats.null_count} min=${col.target_stats.min} max=${col.target_stats.max}`) + } + } + return lines.join("\n") + } + + return JSON.stringify(outcome, null, 2) +} + +function formatPartitionResults( + partitions: Array<{ partition: string; rows_source: number; rows_target: number; differences: number; status: string; error?: string }>, + partitionColumn: string, +): string { + const lines: string[] = ["", `Partition breakdown (by ${partitionColumn}):`] + + const clean = partitions.filter((p) => p.status === "identical") + const dirty = partitions.filter((p) => p.status === "differ") + const errored = partitions.filter((p) => p.status === "error") + + if (dirty.length === 0 && errored.length === 0) { + lines.push(` ✓ All ${partitions.length} partitions identical`) + return lines.join("\n") + } + + for (const p of dirty) { + lines.push(` ✗ ${p.partition} source=${p.rows_source.toLocaleString()} target=${p.rows_target.toLocaleString()} diff=${p.differences.toLocaleString()}`) + } + for (const p of errored) { + lines.push(` ! ${p.partition} ERROR: ${p.error}`) + } + if (clean.length > 0) { + lines.push(` ✓ ${clean.length} partition${clean.length === 1 ? "" : "s"} identical`) + } + + return lines.join("\n") +} diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 075291248f..e5fc1bf9c9 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -57,6 +57,7 @@ import { SqlFormatTool } from "../altimate/tools/sql-format" import { SqlFixTool } from "../altimate/tools/sql-fix" import { SqlAutocompleteTool } from "../altimate/tools/sql-autocomplete" import { SqlDiffTool } from "../altimate/tools/sql-diff" +import { DataDiffTool } from "../altimate/tools/data-diff" import { FinopsQueryHistoryTool } from "../altimate/tools/finops-query-history" import { FinopsAnalyzeCreditsTool } from "../altimate/tools/finops-analyze-credits" import { FinopsExpensiveQueriesTool } from "../altimate/tools/finops-expensive-queries" @@ -233,6 +234,7 @@ export namespace ToolRegistry { SqlFixTool, SqlAutocompleteTool, SqlDiffTool, + DataDiffTool, FinopsQueryHistoryTool, FinopsAnalyzeCreditsTool, FinopsExpensiveQueriesTool,