diff --git a/packages/sync-engine/src/database/migrations-embedded.ts b/packages/sync-engine/src/database/migrations-embedded.ts index b238b72c..12287ccb 100644 --- a/packages/sync-engine/src/database/migrations-embedded.ts +++ b/packages/sync-engine/src/database/migrations-embedded.ts @@ -9,6 +9,6 @@ export type EmbeddedMigration = { export const embeddedMigrations: EmbeddedMigration[] = [ { name: '0000_initial_migration.sql', - sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n -- Support both legacy "updated_at" and newer "_updated_at" columns.\n -- jsonb_populate_record silently ignores keys that are not present on NEW.\n NEW := jsonb_populate_record(\n NEW,\n jsonb_build_object(\n \'updated_at\', now(),\n \'_updated_at\', now()\n )\n );\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS VOID AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n\n IF current_count > max_requests THEN\n RAISE EXCEPTION \'Rate limit exceeded for %\', rate_key;\n END IF;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n', + sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n -- Support both legacy "updated_at" and newer "_updated_at" columns.\n -- jsonb_populate_record silently ignores keys that are not present on NEW.\n NEW := jsonb_populate_record(\n NEW,\n jsonb_build_object(\n \'updated_at\', now(),\n \'_updated_at\', now()\n )\n );\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS INTEGER AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n IF current_count > max_requests THEN\n RETURN 0;\n END IF;\n RETURN 1;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n', }, ] diff --git a/packages/sync-engine/src/database/migrations/0000_initial_migration.sql b/packages/sync-engine/src/database/migrations/0000_initial_migration.sql index 57ab2d94..838e10ed 100644 --- a/packages/sync-engine/src/database/migrations/0000_initial_migration.sql +++ b/packages/sync-engine/src/database/migrations/0000_initial_migration.sql @@ -169,14 +169,13 @@ CREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit( max_requests INTEGER, window_seconds INTEGER ) -RETURNS VOID AS $$ +RETURNS INTEGER AS $$ DECLARE now TIMESTAMPTZ := clock_timestamp(); window_length INTERVAL := make_interval(secs => window_seconds); current_count INTEGER; BEGIN PERFORM pg_advisory_xact_lock(hashtext(rate_key)); - INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start) VALUES (rate_key, 1, now) ON CONFLICT (key) DO UPDATE @@ -190,12 +189,11 @@ BEGIN THEN now ELSE "_rate_limits".window_start END; - SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key; - IF current_count > max_requests THEN - RAISE EXCEPTION 'Rate limit exceeded for %', rate_key; + RETURN 0; END IF; + RETURN 1; END; $$ LANGUAGE plpgsql; diff --git a/packages/sync-engine/src/database/postgres.ts b/packages/sync-engine/src/database/postgres.ts index 9381d5a6..fe6c7d7c 100644 --- a/packages/sync-engine/src/database/postgres.ts +++ b/packages/sync-engine/src/database/postgres.ts @@ -1542,23 +1542,17 @@ export class PostgresClient { } async waitForRateLimit(maxRate: number): Promise { - const sleepMs = Math.floor(Math.random() * 31) + 20 while (true) { - try { - await this.query(`SELECT "${this.syncSchema}".check_rate_limit($1, $2, $3)`, [ - 'stripe', - maxRate, - 1, - ]) + const result = await this.query(`SELECT "${this.syncSchema}".check_rate_limit($1, $2, $3)`, [ + 'stripe', + maxRate, + 1, + ]) + if (result.rows[0].check_rate_limit === 1) { return - } catch (err) { - const msg = (err as Error)?.message ?? '' - if (msg.includes('Rate limit exceeded')) { - await new Promise((r) => setTimeout(r, sleepMs)) - continue - } - throw err } + const sleepMs = Math.floor(Math.random() * 31) + 20 + await new Promise((r) => setTimeout(r, sleepMs)) } } } diff --git a/packages/sync-engine/src/tests/integration/postgres-rate-limit.test.ts b/packages/sync-engine/src/tests/integration/postgres-rate-limit.test.ts new file mode 100644 index 00000000..db0a76e8 --- /dev/null +++ b/packages/sync-engine/src/tests/integration/postgres-rate-limit.test.ts @@ -0,0 +1,39 @@ +import { describe, it, expect, beforeAll, afterAll } from 'vitest' +import { PostgresClient } from '../../database/postgres' +import { setupTestDatabase, type TestDatabase } from '../testSetup' + +describe('waitForRateLimit', () => { + let client: PostgresClient + let db: TestDatabase + + beforeAll(async () => { + db = await setupTestDatabase() + client = new PostgresClient({ + schema: 'stripe', + poolConfig: { connectionString: db.databaseUrl }, + }) + }) + + afterAll(async () => { + if (client) await client.pool.end() + if (db) await db.close() + }) + + it('rate-limited loop vs unlimited loop for 5 seconds each', async () => { + const duration = 5_000 + const maxRate = 50 + + await client.query('DELETE FROM stripe._rate_limits') + let rateLimitedCount = 0 + const rlStart = Date.now() + while (Date.now() - rlStart < duration) { + await client.waitForRateLimit(maxRate) + rateLimitedCount++ + } + + const expected = maxRate * (duration / 1000) + + expect(rateLimitedCount).toBeGreaterThanOrEqual(expected * 0.98) + expect(rateLimitedCount).toBeLessThanOrEqual(expected * 1.02) + }) +})