Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/sync-engine/src/database/migrations-embedded.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ export type EmbeddedMigration = {
export const embeddedMigrations: EmbeddedMigration[] = [
{
name: '0000_initial_migration.sql',
sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n -- Support both legacy "updated_at" and newer "_updated_at" columns.\n -- jsonb_populate_record silently ignores keys that are not present on NEW.\n NEW := jsonb_populate_record(\n NEW,\n jsonb_build_object(\n \'updated_at\', now(),\n \'_updated_at\', now()\n )\n );\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS VOID AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n\n IF current_count > max_requests THEN\n RAISE EXCEPTION \'Rate limit exceeded for %\', rate_key;\n END IF;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n',
sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n -- Support both legacy "updated_at" and newer "_updated_at" columns.\n -- jsonb_populate_record silently ignores keys that are not present on NEW.\n NEW := jsonb_populate_record(\n NEW,\n jsonb_build_object(\n \'updated_at\', now(),\n \'_updated_at\', now()\n )\n );\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS INTEGER AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n IF current_count > max_requests THEN\n RETURN 0;\n END IF;\n RETURN 1;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n',
},
]
Loading
Loading