diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..18b4ebb --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,93 @@ +# Replication plugin + +Pulls rows from an external Postgres or MySQL source into the StarbaseDB +Durable Object SQLite, on a configurable per-table interval. Watermarks are +persisted in `_starbase_replication_watermarks` so polling is append-only, +and every tick is recorded in `_starbase_replication_log` for observability. + +## Why a pull plugin + +External data lives in a primary database somewhere far from the edge. This +plugin lets a StarbaseDB instance act as a close-to-edge read replica that +can be queried alongside (or instead of) the primary. It's pull-based so it +works for any source the host can reach over TCP, no per-provider push +infrastructure required. + +## Configuration + +Single env var: `REPLICATION_CONFIG_JSON`. Example: + +```json +[ + { + "source": "postgres", + "conn": "postgres://user:pass@host:5432/db", + "intervalSeconds": 300, + "tables": [ + { + "name": "users", + "watermark": "updated_at", + "primaryKey": "id" + }, + { "name": "events", "watermark": "id" } + ] + }, + { + "source": "mysql", + "conn": "mysql://user:pass@host:3306/db", + "intervalSeconds": 60, + "tables": [ + { + "name": "audit_log", + "watermark": "ts", + "target": "external_audit_log" + } + ] + } +] +``` + +Field reference: + +| Field | Required | Notes | +| --------------------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `source` | yes | `postgres`, `mysql`, or `mock` (tests only). | +| `conn` | yes\* | Connection string. Required for built-in adapters. | +| `intervalSeconds` | yes | Poll cadence for this source. Per-table cadence inherits. | +| `pageSize` | no | Rows per round-trip (default 1000). | +| `tables[].name` | yes | Source table name. Postgres accepts `schema.table` (defaults to `public`). | +| `tables[].watermark` | yes | Column compared with `>` to pull only new rows. Must be monotonically non-decreasing for that table. | +| `tables[].primaryKey` | no | Column or array of columns. Enables `INSERT OR REPLACE` for upserts. If omitted, append-only. | +| `tables[].target` | no | Override the SQLite table name. Defaults to `name`. | + +## Wiring + +1. Ensure `[triggers]` is enabled in `wrangler.toml` with at least the + smallest interval you want (e.g. `crons = ["* * * * *"]`). +2. Set `REPLICATION_CONFIG_JSON` in `wrangler.toml`'s `[vars]` block (or as a + secret). +3. Deploy. The plugin self-registers via `src/index.ts` and the + `scheduled()` handler. + +The plugin already participates in the `fetch()` plugin chain so admin +operators can: + +- `POST /replication/run` — manually fire all due tables (admin token) +- `GET /replication/status` — read current watermarks (admin token) + +That is the entire HTTP surface. There is no admin CRUD API, no web UI, +and no mutable runtime config — everything else is a `SELECT` against the +two replication tables, runnable through the normal `/query` endpoint. + +## Failure semantics + +If pulling one table errors, the watermark is **not** advanced for that +table, the failure is recorded in `_starbase_replication_log`, and other +tables in the same tick still run. The next tick re-attempts from the prior +watermark. + +## Plugging in custom adapters + +Pass `adapterFactory` to the plugin constructor to ship your own adapter +(e.g. SQL Server, ClickHouse, REST). It must implement +`ReplicationAdapter` from `plugins/replication/types.ts`. diff --git a/plugins/replication/adapters/mysql.ts b/plugins/replication/adapters/mysql.ts new file mode 100644 index 0000000..fe4806e --- /dev/null +++ b/plugins/replication/adapters/mysql.ts @@ -0,0 +1,110 @@ +import { createConnection, type Connection } from 'mysql2/promise' +import type { + ColumnDef, + PullPage, + ReplicationAdapter, + SqlScalar, +} from '../types' + +function mysqlTypeToSqlite(t: string): ColumnDef['sqliteType'] { + const tt = t.toLowerCase() + if ( + tt.includes('int') || + tt === 'bit' || + tt === 'bool' || + tt === 'boolean' + ) { + return 'INTEGER' + } + if ( + tt.includes('decimal') || + tt.includes('numeric') || + tt.includes('float') || + tt.includes('double') + ) { + return 'REAL' + } + if (tt.includes('blob') || tt.includes('binary')) return 'BLOB' + return 'TEXT' +} + +export class MysqlAdapter implements ReplicationAdapter { + private connPromise: Promise + + constructor(connectionString: string) { + this.connPromise = createConnection(connectionString) + } + + private async conn() { + return this.connPromise + } + + async describe(table: string): Promise { + const conn = await this.conn() + const [rows] = (await conn.query( + `SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY + FROM information_schema.COLUMNS + WHERE TABLE_NAME = ? + AND TABLE_SCHEMA = DATABASE() + ORDER BY ORDINAL_POSITION`, + [table] + )) as [ + { + COLUMN_NAME: string + DATA_TYPE: string + COLUMN_KEY: string + }[], + unknown, + ] + + if (rows.length === 0) { + throw new Error( + `replication: table "${table}" not found in source database` + ) + } + + return rows.map((r) => ({ + name: r.COLUMN_NAME, + sqliteType: mysqlTypeToSqlite(r.DATA_TYPE), + primaryKey: r.COLUMN_KEY === 'PRI', + })) + } + + async *pull(opts: { + table: string + watermarkColumn: string + watermark: SqlScalar | null + pageSize: number + }): AsyncIterable { + const conn = await this.conn() + const tbl = `\`${opts.table.replace(/`/g, '``')}\`` + const wm = `\`${opts.watermarkColumn.replace(/`/g, '``')}\`` + + let cursor = opts.watermark + // eslint-disable-next-line no-constant-condition + while (true) { + const sql = + cursor === null + ? `SELECT * FROM ${tbl} ORDER BY ${wm} ASC LIMIT ${Math.floor(opts.pageSize)}` + : `SELECT * FROM ${tbl} WHERE ${wm} > ? ORDER BY ${wm} ASC LIMIT ${Math.floor(opts.pageSize)}` + + const [rows] = (await conn.query( + sql, + cursor === null ? [] : [cursor] + )) as [Record[], unknown] + + if (rows.length === 0) return + + const last = rows[rows.length - 1][opts.watermarkColumn] ?? null + yield { rows, nextWatermark: last } + cursor = last + + if (rows.length < opts.pageSize) return + } + } + + async close(): Promise { + const conn = await this.conn() + await conn.end() + } +} diff --git a/plugins/replication/adapters/postgres.ts b/plugins/replication/adapters/postgres.ts new file mode 100644 index 0000000..8c58bb4 --- /dev/null +++ b/plugins/replication/adapters/postgres.ts @@ -0,0 +1,133 @@ +import postgres from 'postgres' +import type { + ColumnDef, + PullPage, + ReplicationAdapter, + SqlScalar, +} from '../types' + +/** + * Map a Postgres type name (information_schema.columns.data_type) to a + * SQLite affinity. Anything we don't explicitly recognise becomes TEXT, + * which is the safe choice for SQLite (it stores all values as TEXT under + * the TEXT affinity rule). + */ +function pgTypeToSqlite(pgType: string): ColumnDef['sqliteType'] { + const t = pgType.toLowerCase() + if ( + t.includes('int') || + t === 'bigserial' || + t === 'serial' || + t === 'smallserial' + ) { + return 'INTEGER' + } + if ( + t.includes('numeric') || + t.includes('decimal') || + t.includes('real') || + t.includes('double') || + t === 'money' + ) { + return 'REAL' + } + if (t === 'boolean') return 'INTEGER' + if (t === 'bytea') return 'BLOB' + return 'TEXT' +} + +export class PostgresAdapter implements ReplicationAdapter { + private client: ReturnType + + constructor(connectionString: string) { + // Keep the pool small — replication runs on Cloudflare Workers where + // long-lived connections are an anti-pattern. fetch_types=false skips + // a startup query that doesn't work with PgBouncer/Hyperdrive style + // poolers. + this.client = postgres(connectionString, { + max: 2, + fetch_types: false, + }) + } + + async describe(table: string): Promise { + const parts = parseQualified(table) + const rows = await this.client< + { column_name: string; data_type: string }[] + >` + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = ${parts.schema} + AND table_name = ${parts.name} + ORDER BY ordinal_position + ` + + if (rows.length === 0) { + throw new Error( + `replication: table "${table}" not found in source (schema=${parts.schema})` + ) + } + + const pkRows = await this.client<{ column_name: string }[]>` + SELECT a.attname AS column_name + FROM pg_index i + JOIN pg_attribute a + ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = ${`${parts.schema}.${parts.name}`}::regclass + AND i.indisprimary + ` + const pkSet = new Set(pkRows.map((r) => r.column_name)) + + return rows.map((r) => ({ + name: r.column_name, + sqliteType: pgTypeToSqlite(r.data_type), + primaryKey: pkSet.has(r.column_name), + })) + } + + async *pull(opts: { + table: string + watermarkColumn: string + watermark: SqlScalar | null + pageSize: number + }): AsyncIterable { + const parts = parseQualified(opts.table) + const qualified = `"${parts.schema}"."${parts.name}"` + const wmCol = `"${opts.watermarkColumn.replace(/"/g, '""')}"` + + let cursor = opts.watermark + // Loop pages until a short page is returned. Bounded by `pageSize` + // and the natural end of the table; the `> cursor` predicate makes it + // strictly progressing. + // eslint-disable-next-line no-constant-condition + while (true) { + const rows: Record[] = + cursor === null + ? await this.client.unsafe( + `SELECT * FROM ${qualified} ORDER BY ${wmCol} ASC LIMIT ${opts.pageSize}` + ) + : await this.client.unsafe( + `SELECT * FROM ${qualified} WHERE ${wmCol} > $1 ORDER BY ${wmCol} ASC LIMIT ${opts.pageSize}`, + [cursor as any] + ) + + if (rows.length === 0) return + + const last = rows[rows.length - 1][opts.watermarkColumn] ?? null + yield { rows, nextWatermark: last } + cursor = last + + if (rows.length < opts.pageSize) return + } + } + + async close(): Promise { + await this.client.end({ timeout: 5 }) + } +} + +function parseQualified(table: string): { schema: string; name: string } { + const idx = table.indexOf('.') + if (idx === -1) return { schema: 'public', name: table } + return { schema: table.slice(0, idx), name: table.slice(idx + 1) } +} diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..3e889ff --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,417 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest' +import { ReplicationPlugin } from './index' +import { buildCreateTable, buildInsert } from './sql' +import type { + ColumnDef, + PullPage, + ReplicationAdapter, + ReplicationConfig, + SqlScalar, +} from './types' +import type { DataSource } from '../../src/types' + +/** + * In-memory mock of the DO RPC surface. We don't actually run SQL — we + * record every call and answer SELECTs from per-table maps so the plugin's + * watermark/log behaviour is observable without standing up a real DO. + */ +class FakeRpc { + public calls: { sql: string; params: unknown[] }[] = [] + private watermarks = new Map() + + executeQuery = vi.fn(async (q: { sql: string; params?: unknown[] }) => { + const params = q.params ?? [] + this.calls.push({ sql: q.sql, params }) + + // Watermark UPSERT + if (/INSERT INTO _starbase_replication_watermarks/.test(q.sql)) { + const [source, table, , value] = params as [ + string, + string, + string, + string | null, + ] + this.watermarks.set(`${source}::${table}`, value ?? null) + return [] + } + + // Watermark SELECT + if ( + /SELECT last_value FROM _starbase_replication_watermarks/.test( + q.sql + ) + ) { + const [source, table] = params as [string, string] + const v = this.watermarks.get(`${source}::${table}`) + return v === undefined ? [] : [{ last_value: v }] + } + + // Status SELECT + if (/SELECT source,/.test(q.sql)) return [] + + return [] + }) +} + +function makeDataSource(): DataSource { + const rpc = new FakeRpc() + return { rpc, source: 'internal' } as unknown as DataSource +} + +class MockAdapter implements ReplicationAdapter { + public describeCalls = 0 + public closeCalls = 0 + constructor( + private columns: ColumnDef[], + private pages: PullPage[], + public errorOn?: 'describe' | 'pull' + ) {} + + async describe(_table: string): Promise { + this.describeCalls++ + if (this.errorOn === 'describe') throw new Error('describe blew up') + return this.columns + } + + async *pull(_opts: { + table: string + watermarkColumn: string + watermark: SqlScalar | null + pageSize: number + }): AsyncIterable { + if (this.errorOn === 'pull') throw new Error('pull blew up') + for (const p of this.pages) yield p + } + + async close(): Promise { + this.closeCalls++ + } +} + +function findRpc(ds: DataSource): FakeRpc { + return ds.rpc as unknown as FakeRpc +} + +describe('ReplicationPlugin — buildCreateTable', () => { + it('renders a CREATE TABLE with the reflected columns and PK', () => { + const sql = buildCreateTable('users', [ + { name: 'id', sqliteType: 'INTEGER', primaryKey: true }, + { name: 'email', sqliteType: 'TEXT' }, + { name: 'updated_at', sqliteType: 'TEXT' }, + ]) + expect(sql).toContain('CREATE TABLE IF NOT EXISTS "users"') + expect(sql).toContain('"id" INTEGER') + expect(sql).toContain('"email" TEXT') + expect(sql).toContain('PRIMARY KEY ("id")') + }) + + it('omits PRIMARY KEY clause when no column is flagged', () => { + const sql = buildCreateTable('events', [ + { name: 'id', sqliteType: 'INTEGER' }, + { name: 'payload', sqliteType: 'TEXT' }, + ]) + expect(sql).not.toContain('PRIMARY KEY') + }) + + it('throws on zero columns rather than emit invalid SQL', () => { + expect(() => buildCreateTable('x', [])).toThrow(/zero columns/) + }) +}) + +describe('ReplicationPlugin — buildInsert', () => { + it('uses INSERT OR REPLACE when a primary key is configured', () => { + const sql = buildInsert('users', ['id', 'email'], true) + expect(sql).toMatch(/^INSERT OR REPLACE/) + expect(sql).toContain('("id", "email")') + expect(sql).toContain('VALUES (?, ?)') + }) + + it('falls back to INSERT OR IGNORE for append-only tables', () => { + const sql = buildInsert('events', ['id', 'payload'], false) + expect(sql).toMatch(/^INSERT OR IGNORE/) + }) +}) + +describe('ReplicationPlugin — config parsing', () => { + it('rejects non-array REPLICATION_CONFIG_JSON', () => { + expect( + () => + new ReplicationPlugin({ + env: { REPLICATION_CONFIG_JSON: '{}' }, + }) + ).toThrow(/array of source configs/) + }) + + it('rejects malformed JSON', () => { + expect( + () => + new ReplicationPlugin({ + env: { REPLICATION_CONFIG_JSON: 'not json' }, + }) + ).toThrow(/not valid JSON/) + }) + + it('rejects sources with non-positive intervalSeconds', () => { + expect( + () => + new ReplicationPlugin({ + env: { + REPLICATION_CONFIG_JSON: JSON.stringify([ + { + source: 'mock', + intervalSeconds: 0, + tables: [{ name: 't', watermark: 'id' }], + }, + ]), + }, + }) + ).toThrow(/intervalSeconds/) + }) + + it('parses a valid config without throwing', () => { + const plugin = new ReplicationPlugin({ + config: [ + { + source: 'mock', + intervalSeconds: 60, + tables: [ + { + name: 'users', + watermark: 'updated_at', + primaryKey: 'id', + }, + ], + }, + ], + }) + expect(plugin).toBeInstanceOf(ReplicationPlugin) + }) + + it('treats missing config as a disabled (no-op) plugin', async () => { + const plugin = new ReplicationPlugin() + const ds = makeDataSource() + const summary = await plugin.runDue(ds) + expect(summary).toEqual([]) + }) +}) + +describe('ReplicationPlugin — runDue', () => { + let config: ReplicationConfig + let cols: ColumnDef[] + beforeEach(() => { + cols = [ + { name: 'id', sqliteType: 'INTEGER', primaryKey: true }, + { name: 'email', sqliteType: 'TEXT' }, + { name: 'updated_at', sqliteType: 'TEXT' }, + ] + config = [ + { + source: 'mock', + intervalSeconds: 60, + tables: [ + { + name: 'users', + watermark: 'updated_at', + primaryKey: 'id', + }, + ], + }, + ] + }) + + it('advances the watermark after a successful pull', async () => { + const adapter = new MockAdapter(cols, [ + { + rows: [ + { id: 1, email: 'a@x', updated_at: '2026-01-01T00:00:00Z' }, + { id: 2, email: 'b@x', updated_at: '2026-01-02T00:00:00Z' }, + ], + nextWatermark: '2026-01-02T00:00:00Z', + }, + ]) + + const plugin = new ReplicationPlugin({ + config, + adapterFactory: () => adapter, + }) + const ds = makeDataSource() + const summary = await plugin.runDue(ds) + + expect(summary).toEqual([ + { source: 'mock', table: 'users', rows: 2, ok: true }, + ]) + const rpc = findRpc(ds) + const wm = rpc.calls.find((c) => + /INSERT INTO _starbase_replication_watermarks/.test(c.sql) + ) + expect(wm?.params[3]).toBe('2026-01-02T00:00:00Z') + }) + + it('does not re-pull rows already covered by the watermark', async () => { + const adapter = new MockAdapter(cols, [ + { + rows: [ + { id: 1, email: 'a@x', updated_at: '2026-01-01T00:00:00Z' }, + ], + nextWatermark: '2026-01-01T00:00:00Z', + }, + ]) + const pullSpy = vi.spyOn(adapter, 'pull') + + const plugin = new ReplicationPlugin({ + config, + adapterFactory: () => adapter, + }) + const ds = makeDataSource() + + await plugin.runDue(ds, { now: 0 }) + // Second tick at +30s — interval is 60s so this should be a no-op. + const summary = await plugin.runDue(ds, { now: 30_000 }) + expect(summary).toEqual([]) + expect(pullSpy).toHaveBeenCalledTimes(1) + + // Third tick at +120s — interval has elapsed, should re-poll with the + // stored watermark passed in. + await plugin.runDue(ds, { now: 120_000 }) + expect(pullSpy).toHaveBeenCalledTimes(2) + expect(pullSpy.mock.calls[1][0].watermark).toBe('2026-01-01T00:00:00Z') + }) + + it('issues CREATE TABLE only on the first sync per table', async () => { + const adapter = new MockAdapter(cols, [ + { + rows: [{ id: 1, email: 'a@x', updated_at: '2026-01-01' }], + nextWatermark: '2026-01-01', + }, + ]) + const plugin = new ReplicationPlugin({ + config, + adapterFactory: () => adapter, + }) + const ds = makeDataSource() + await plugin.runDue(ds, { now: 0, force: true }) + await plugin.runDue(ds, { now: 1_000_000, force: true }) + + expect(adapter.describeCalls).toBe(1) + const rpc = findRpc(ds) + const creates = rpc.calls.filter((c) => + /CREATE TABLE IF NOT EXISTS "users"/.test(c.sql) + ) + expect(creates.length).toBe(1) + }) + + it('runs multiple tables independently — one failure does not block others', async () => { + const goodCols: ColumnDef[] = [ + { name: 'id', sqliteType: 'INTEGER', primaryKey: true }, + { name: 'val', sqliteType: 'TEXT' }, + ] + const goodAdapter = new MockAdapter(goodCols, [ + { rows: [{ id: 1, val: 'a' }], nextWatermark: 1 }, + ]) + const badAdapter = new MockAdapter(goodCols, [], 'pull') + + const multiConfig: ReplicationConfig = [ + { + source: 'mock', + intervalSeconds: 60, + tables: [{ name: 'good', watermark: 'id' }], + }, + { + source: 'postgres', + conn: 'irrelevant', + intervalSeconds: 60, + tables: [{ name: 'bad', watermark: 'id' }], + }, + ] + const plugin = new ReplicationPlugin({ + config: multiConfig, + adapterFactory: (s) => + s.source === 'mock' ? goodAdapter : badAdapter, + }) + const ds = makeDataSource() + const summary = await plugin.runDue(ds) + + const good = summary.find((s) => s.table === 'good') + const bad = summary.find((s) => s.table === 'bad') + expect(good?.ok).toBe(true) + expect(good?.rows).toBe(1) + expect(bad?.ok).toBe(false) + expect(bad?.error).toMatch(/pull blew up/) + + // Failure should be recorded in the audit log. + const rpc = findRpc(ds) + const errLog = rpc.calls.find( + (c) => + /INSERT INTO _starbase_replication_log/.test(c.sql) && + (c.params as unknown[])[2] === 'bad' + ) + expect(errLog).toBeDefined() + expect((errLog!.params as unknown[])[4]).toBe(0) // ok = false + }) + + it('does not advance the watermark when the adapter throws', async () => { + const adapter = new MockAdapter(cols, [], 'pull') + const plugin = new ReplicationPlugin({ + config, + adapterFactory: () => adapter, + }) + const ds = makeDataSource() + await plugin.runDue(ds) + const rpc = findRpc(ds) + const wmUpserts = rpc.calls.filter((c) => + /INSERT INTO _starbase_replication_watermarks/.test(c.sql) + ) + expect(wmUpserts).toHaveLength(0) + }) + + it('streams multi-page pulls and persists watermark per page', async () => { + const pageCols: ColumnDef[] = [ + { name: 'id', sqliteType: 'INTEGER', primaryKey: true }, + ] + const adapter = new MockAdapter(pageCols, [ + { rows: [{ id: 1 }, { id: 2 }], nextWatermark: 2 }, + { rows: [{ id: 3 }, { id: 4 }], nextWatermark: 4 }, + ]) + const pageConfig: ReplicationConfig = [ + { + source: 'mock', + intervalSeconds: 60, + tables: [{ name: 't', watermark: 'id', primaryKey: 'id' }], + }, + ] + const plugin = new ReplicationPlugin({ + config: pageConfig, + adapterFactory: () => adapter, + }) + const ds = makeDataSource() + await plugin.runDue(ds) + const rpc = findRpc(ds) + const wmUpserts = rpc.calls.filter((c) => + /INSERT INTO _starbase_replication_watermarks/.test(c.sql) + ) + // One upsert per page. + expect(wmUpserts.length).toBe(2) + expect(wmUpserts[0].params[3]).toBe('2') + expect(wmUpserts[1].params[3]).toBe('4') + }) +}) + +describe('ReplicationPlugin — close', () => { + it('closes every adapter that has been instantiated', async () => { + const a = new MockAdapter([{ name: 'id', sqliteType: 'INTEGER' }], []) + const plugin = new ReplicationPlugin({ + config: [ + { + source: 'mock', + intervalSeconds: 60, + tables: [{ name: 't', watermark: 'id' }], + }, + ], + adapterFactory: () => a, + }) + const ds = makeDataSource() + await plugin.runDue(ds) + await plugin.close() + expect(a.closeCalls).toBe(1) + }) +}) diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..e0caf98 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,457 @@ +import { StarbaseApp } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource, QueryResult } from '../../src/types' +import { createResponse } from '../../src/utils' + +import { MysqlAdapter } from './adapters/mysql' +import { PostgresAdapter } from './adapters/postgres' +import { + CREATE_LOG_TABLE, + CREATE_WATERMARK_TABLE, + INSERT_LOG, + SELECT_WATERMARK, + UPSERT_WATERMARK, + buildCreateTable, + buildInsert, +} from './sql' +import type { + ReplicationAdapter, + ReplicationConfig, + ReplicationSourceConfig, + SqlScalar, + TableConfig, +} from './types' + +export type { ReplicationAdapter, ReplicationConfig } from './types' + +export interface ReplicationPluginOptions { + /** + * Replication configuration. If omitted the plugin will read + * `REPLICATION_CONFIG_JSON` from the environment passed in via + * `env`. Both forms are supported so users can pick whatever fits + * their wrangler setup. + */ + config?: ReplicationConfig + /** Optional env-style record from which to pull `REPLICATION_CONFIG_JSON`. */ + env?: { REPLICATION_CONFIG_JSON?: string } + /** + * Adapter factory override. Useful for tests and for plugging in + * adapters this package doesn't ship (e.g. SQL Server, ClickHouse). + * If supplied it takes precedence over the built-in postgres/mysql + * adapters. + */ + adapterFactory?: (source: ReplicationSourceConfig) => ReplicationAdapter +} + +interface ScheduledTable { + source: ReplicationSourceConfig + table: TableConfig + nextRunAt: number + schemaInitialized: boolean +} + +/** + * Pulls rows from external relational databases into the StarbaseDB-managed + * SQLite Durable Object on a configurable per-table interval. The plugin is + * intentionally small: it does not expose an admin REST API, it does not + * manage push or bidirectional replication, and it does not introduce a new + * scheduling primitive — it leans on the existing DO alarm via the same + * pattern the CronPlugin uses. + */ +export class ReplicationPlugin extends StarbasePlugin { + public pathPrefix = '/replication' + + private dataSource?: DataSource + private adapters = new Map() + private schedule: ScheduledTable[] = [] + private readonly config: ReplicationConfig + private readonly adapterFactory?: ( + source: ReplicationSourceConfig + ) => ReplicationAdapter + private initialized = false + + constructor(opts: ReplicationPluginOptions = {}) { + super('starbasedb:replication', { requiresAuth: true }) + + this.adapterFactory = opts.adapterFactory + + if (opts.config) { + this.config = opts.config + } else { + const raw = opts.env?.REPLICATION_CONFIG_JSON + this.config = raw ? parseConfig(raw) : [] + } + + this.schedule = this.config.flatMap((source) => + source.tables.map((table) => ({ + source, + table, + nextRunAt: 0, + schemaInitialized: false, + })) + ) + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + await this.init() + await next() + }) + + // Manual trigger — admins can poke this to force a sync without + // waiting for the next interval. This is the only HTTP surface the + // plugin exposes; configuration lives entirely in env vars. + app.post(`${this.pathPrefix}/run`, async (c) => { + const config = c.get('config') + if (config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized', 401) + } + + const ds = c.get('dataSource') + const summary = await this.runDue(ds, { force: true }) + return createResponse(summary, undefined, 200) + }) + + // Status endpoint so users can see what's been pulled. Single + // read-only handler rather than a full CRUD admin API. + app.get(`${this.pathPrefix}/status`, async (c) => { + const config = c.get('config') + if (config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized', 401) + } + const ds = c.get('dataSource') + const watermarks = (await ds.rpc.executeQuery({ + sql: `SELECT source, "table", watermark_column, last_value, last_run_ts FROM _starbase_replication_watermarks`, + params: [], + })) as QueryResult[] + return createResponse({ watermarks }, undefined, 200) + }) + } + + private async init() { + if (this.initialized || !this.dataSource) return + await this.dataSource.rpc.executeQuery({ + sql: CREATE_WATERMARK_TABLE, + params: [], + }) + await this.dataSource.rpc.executeQuery({ + sql: CREATE_LOG_TABLE, + params: [], + }) + this.initialized = true + } + + /** + * Public entry point used by the scheduled handler in `src/index.ts`. The + * caller supplies the live DataSource (it isn't available at construction + * time) and the plugin runs every table whose interval has elapsed. + * + * Returns a summary so the scheduled() handler can be observed in the + * Cloudflare logs. + */ + public async runDue( + dataSource: DataSource, + opts: { force?: boolean; now?: number } = {} + ): Promise< + { + source: string + table: string + rows: number + ok: boolean + error?: string + }[] + > { + this.dataSource = dataSource + await this.init() + + const now = opts.now ?? Date.now() + const summary: { + source: string + table: string + rows: number + ok: boolean + error?: string + }[] = [] + + for (const slot of this.schedule) { + if (!opts.force && slot.nextRunAt > now) continue + try { + const rows = await this.runOne(slot, now) + summary.push({ + source: slot.source.source, + table: slot.table.name, + rows, + ok: true, + }) + slot.nextRunAt = now + slot.source.intervalSeconds * 1000 + await this.log(slot, now, rows, true) + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + summary.push({ + source: slot.source.source, + table: slot.table.name, + rows: 0, + ok: false, + error: msg, + }) + // Back off this table by its interval so a broken source + // doesn't busy-loop the worker, but still let other tables + // continue this tick. + slot.nextRunAt = now + slot.source.intervalSeconds * 1000 + await this.log(slot, now, 0, false, msg) + } + } + + return summary + } + + private async runOne(slot: ScheduledTable, now: number): Promise { + if (!this.dataSource) throw new Error('replication: no dataSource') + + const ds = this.dataSource + const adapter = this.getAdapter(slot.source) + const targetTable = slot.table.target ?? slot.table.name + + // Materialise the SQLite table if it's the first sync. + if (!slot.schemaInitialized) { + const cols = await adapter.describe(slot.table.name) + // Adapter-reflected primary key is overridden by config if + // supplied so users can replicate non-PK tables with a custom + // dedup column. + const pkOverride = normalisePk(slot.table.primaryKey) + const finalCols = pkOverride + ? cols.map((c) => ({ + ...c, + primaryKey: pkOverride.includes(c.name), + })) + : cols + await ds.rpc.executeQuery({ + sql: buildCreateTable(targetTable, finalCols), + params: [], + }) + slot.schemaInitialized = true + } + + // Read prior watermark. + const wmRows = (await ds.rpc.executeQuery({ + sql: SELECT_WATERMARK, + params: [slot.source.source, slot.table.name], + })) as unknown as { last_value: string | null }[] + + let watermark: SqlScalar | null = + wmRows.length > 0 ? wmRows[0].last_value : null + + const pageSize = slot.source.pageSize ?? 1000 + let pulled = 0 + + for await (const page of adapter.pull({ + table: slot.table.name, + watermarkColumn: slot.table.watermark, + watermark, + pageSize, + })) { + if (page.rows.length === 0) continue + + const colNames = Object.keys(page.rows[0]) + const insertSql = buildInsert( + targetTable, + colNames, + Boolean(slot.table.primaryKey) + ) + + for (const row of page.rows) { + const params = colNames.map((c) => normaliseScalar(row[c])) + await ds.rpc.executeQuery({ sql: insertSql, params }) + } + + pulled += page.rows.length + watermark = page.nextWatermark + + // Persist progress after every page so a mid-run failure on the + // next page doesn't redo work. + await ds.rpc.executeQuery({ + sql: UPSERT_WATERMARK, + params: [ + slot.source.source, + slot.table.name, + slot.table.watermark, + serialiseWatermark(watermark), + now, + ], + }) + } + + if (pulled === 0) { + // Still record the run so users can see liveness. + await ds.rpc.executeQuery({ + sql: UPSERT_WATERMARK, + params: [ + slot.source.source, + slot.table.name, + slot.table.watermark, + serialiseWatermark(watermark), + now, + ], + }) + } + + return pulled + } + + private async log( + slot: ScheduledTable, + now: number, + rows: number, + ok: boolean, + error?: string + ) { + if (!this.dataSource) return + await this.dataSource.rpc.executeQuery({ + sql: INSERT_LOG, + params: [ + now, + slot.source.source, + slot.table.name, + rows, + ok ? 1 : 0, + error ?? null, + ], + }) + } + + private getAdapter(source: ReplicationSourceConfig): ReplicationAdapter { + const idx = this.config.indexOf(source) + const cached = this.adapters.get(idx) + if (cached) return cached + + const built = this.adapterFactory + ? this.adapterFactory(source) + : defaultAdapterFactory(source) + this.adapters.set(idx, built) + return built + } + + /** Release pooled adapter connections. Call on Worker shutdown. */ + public async close() { + for (const a of this.adapters.values()) { + try { + await a.close() + } catch (e) { + console.error('replication: adapter close failed', e) + } + } + this.adapters.clear() + } +} + +function defaultAdapterFactory( + source: ReplicationSourceConfig +): ReplicationAdapter { + if (source.source === 'postgres') { + if (!source.conn) + throw new Error('replication: postgres source missing `conn`') + return new PostgresAdapter(source.conn) + } + if (source.source === 'mysql') { + if (!source.conn) + throw new Error('replication: mysql source missing `conn`') + return new MysqlAdapter(source.conn) + } + throw new Error( + `replication: no built-in adapter for source type "${source.source}". Provide adapterFactory.` + ) +} + +function parseConfig(raw: string): ReplicationConfig { + let parsed: unknown + try { + parsed = JSON.parse(raw) + } catch (e) { + throw new Error( + `replication: REPLICATION_CONFIG_JSON is not valid JSON: ${ + e instanceof Error ? e.message : e + }` + ) + } + if (!Array.isArray(parsed)) { + throw new Error( + 'replication: REPLICATION_CONFIG_JSON must be a JSON array of source configs' + ) + } + return parsed.map((src, i) => validateSource(src, i)) +} + +function validateSource(src: unknown, i: number): ReplicationSourceConfig { + if (!src || typeof src !== 'object') + throw new Error(`replication: config[${i}] is not an object`) + const s = src as Record + if (typeof s.source !== 'string') + throw new Error(`replication: config[${i}].source must be a string`) + if (typeof s.intervalSeconds !== 'number' || s.intervalSeconds <= 0) + throw new Error( + `replication: config[${i}].intervalSeconds must be a positive number` + ) + if (!Array.isArray(s.tables) || s.tables.length === 0) + throw new Error( + `replication: config[${i}].tables must be a non-empty array` + ) + return { + source: s.source as ReplicationSourceConfig['source'], + conn: typeof s.conn === 'string' ? s.conn : undefined, + intervalSeconds: s.intervalSeconds, + pageSize: typeof s.pageSize === 'number' ? s.pageSize : undefined, + tables: (s.tables as unknown[]).map((t, j) => validateTable(t, i, j)), + } +} + +function validateTable(t: unknown, i: number, j: number): TableConfig { + if (!t || typeof t !== 'object') + throw new Error( + `replication: config[${i}].tables[${j}] is not an object` + ) + const r = t as Record + if (typeof r.name !== 'string') + throw new Error( + `replication: config[${i}].tables[${j}].name must be a string` + ) + if (typeof r.watermark !== 'string') + throw new Error( + `replication: config[${i}].tables[${j}].watermark must be a string` + ) + return { + name: r.name, + watermark: r.watermark, + primaryKey: + typeof r.primaryKey === 'string' + ? r.primaryKey + : Array.isArray(r.primaryKey) + ? (r.primaryKey as string[]) + : undefined, + target: typeof r.target === 'string' ? r.target : undefined, + } +} + +function normalisePk(pk: TableConfig['primaryKey']): string[] | null { + if (!pk) return null + return Array.isArray(pk) ? pk : [pk] +} + +/** Convert adapter-side scalars into something the DO's executeQuery accepts. */ +function normaliseScalar(v: SqlScalar | undefined): SqlScalar { + if (v === undefined) return null + if (v instanceof Date) return v.toISOString() + if (typeof v === 'boolean') return v ? 1 : 0 + if (typeof v === 'bigint') { + // SQLite INTEGER is 8 bytes so safe within bigint range, but the RPC + // bridge is happier with strings or numbers. + return Number.isSafeInteger(Number(v)) ? Number(v) : v.toString() + } + return v +} + +function serialiseWatermark(v: SqlScalar | null): string | null { + if (v === null || v === undefined) return null + if (v instanceof Date) return v.toISOString() + return String(v) +} diff --git a/plugins/replication/meta.json b/plugins/replication/meta.json new file mode 100644 index 0000000..c45d563 --- /dev/null +++ b/plugins/replication/meta.json @@ -0,0 +1,18 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "_starbase_replication_watermarks": "Per-table cursor used by append-only pulls.", + "_starbase_replication_log": "Audit trail of replication ticks (rows pulled, ok, error)." + }, + "secrets": {}, + "variables": { + "REPLICATION_CONFIG_JSON": "JSON-encoded array of source configs. See plugins/replication/README.md." + } + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +} diff --git a/plugins/replication/sql.ts b/plugins/replication/sql.ts new file mode 100644 index 0000000..791cfc2 --- /dev/null +++ b/plugins/replication/sql.ts @@ -0,0 +1,92 @@ +/** + * SQL builders for the replication plugin. Centralised so the schema is + * defined in one place and tests can assert against the exact statements. + */ + +import type { ColumnDef } from './types' + +export const WATERMARK_TABLE = '_starbase_replication_watermarks' +export const LOG_TABLE = '_starbase_replication_log' + +export const CREATE_WATERMARK_TABLE = ` + CREATE TABLE IF NOT EXISTS ${WATERMARK_TABLE} ( + source TEXT NOT NULL, + "table" TEXT NOT NULL, + watermark_column TEXT NOT NULL, + last_value TEXT, + last_run_ts INTEGER, + PRIMARY KEY (source, "table") + ) +` + +export const CREATE_LOG_TABLE = ` + CREATE TABLE IF NOT EXISTS ${LOG_TABLE} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + source TEXT NOT NULL, + "table" TEXT NOT NULL, + rows_pulled INTEGER NOT NULL, + ok INTEGER NOT NULL, + error TEXT + ) +` + +/** + * Render a CREATE TABLE IF NOT EXISTS statement that mirrors the external + * schema returned by an adapter. + */ +export function buildCreateTable( + targetTable: string, + columns: ColumnDef[] +): string { + if (columns.length === 0) { + throw new Error( + `replication: cannot create table "${targetTable}" with zero columns` + ) + } + + const cols = columns + .map((c) => ` "${c.name}" ${c.sqliteType}`) + .join(',\n') + + const pks = columns.filter((c) => c.primaryKey).map((c) => `"${c.name}"`) + + const pkClause = pks.length ? `,\n PRIMARY KEY (${pks.join(', ')})` : '' + + return `CREATE TABLE IF NOT EXISTS "${targetTable}" (\n${cols}${pkClause}\n)` +} + +/** + * Render a parameterised INSERT for a row. When `primaryKey` is supplied the + * statement uses `INSERT OR REPLACE` so re-pulled rows update in place; when + * omitted it falls back to `INSERT OR IGNORE` to keep append-only loads + * idempotent against retries. + */ +export function buildInsert( + targetTable: string, + columns: string[], + hasPrimaryKey: boolean +): string { + const verb = hasPrimaryKey ? 'INSERT OR REPLACE' : 'INSERT OR IGNORE' + const colList = columns.map((c) => `"${c}"`).join(', ') + const placeholders = columns.map(() => '?').join(', ') + return `${verb} INTO "${targetTable}" (${colList}) VALUES (${placeholders})` +} + +export const UPSERT_WATERMARK = ` + INSERT INTO ${WATERMARK_TABLE} (source, "table", watermark_column, last_value, last_run_ts) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(source, "table") DO UPDATE SET + watermark_column = excluded.watermark_column, + last_value = excluded.last_value, + last_run_ts = excluded.last_run_ts +` + +export const SELECT_WATERMARK = ` + SELECT last_value FROM ${WATERMARK_TABLE} WHERE source = ? AND "table" = ? +` + +export const INSERT_LOG = ` + INSERT INTO ${LOG_TABLE} (ts, source, "table", rows_pulled, ok, error) + VALUES (?, ?, ?, ?, ?, ?) +` diff --git a/plugins/replication/types.ts b/plugins/replication/types.ts new file mode 100644 index 0000000..68c3858 --- /dev/null +++ b/plugins/replication/types.ts @@ -0,0 +1,93 @@ +/** + * Public types for the StarbaseDB replication plugin. + * + * The plugin pulls rows from an external relational source (Postgres, MySQL, + * or any user-provided adapter) into the internal Durable Object SQLite, on a + * configurable per-table interval, advancing a per-table watermark on every + * successful run. + */ + +export type SqlScalar = string | number | boolean | null | bigint | Date + +export interface ColumnDef { + /** Column name as it should appear in SQLite. */ + name: string + /** + * SQLite affinity for the column ("TEXT", "INTEGER", "REAL", "BLOB", + * "NUMERIC"). Adapters are expected to translate their dialect's native + * types to one of these. + */ + sqliteType: 'TEXT' | 'INTEGER' | 'REAL' | 'BLOB' | 'NUMERIC' + /** Whether the column is part of the primary key. */ + primaryKey?: boolean +} + +export interface PullPage { + rows: Record[] + /** + * The watermark value that should be persisted after this page is written. + * Adapters MUST return a watermark that is monotonically non-decreasing + * for a given (table, watermarkColumn). + */ + nextWatermark: SqlScalar | null +} + +export interface ReplicationAdapter { + /** + * Reflect the external table's schema. Called once per table, on the + * first sync, to provision a matching SQLite table in the DO. + */ + describe(table: string): Promise + + /** + * Pull rows whose `watermarkColumn` is strictly greater than `watermark` + * (or all rows if `watermark` is null), ordered ascending by + * `watermarkColumn`. Implementations may yield multiple pages; the plugin + * advances the watermark after every page. + * + * Implementations should respect `pageSize` as an upper bound on rows per + * page so a single sync run cannot OOM the worker. + */ + pull(opts: { + table: string + watermarkColumn: string + watermark: SqlScalar | null + pageSize: number + }): AsyncIterable + + /** Release any pooled resources. Called once when the plugin shuts down. */ + close(): Promise +} + +export interface TableConfig { + /** Source-side table name. */ + name: string + /** Column the plugin compares to advance the watermark. */ + watermark: string + /** + * Optional primary key column(s) used to upsert rows in SQLite. If + * omitted, rows are appended (suitable for append-only logs keyed on the + * watermark column). + */ + primaryKey?: string | string[] + /** Optional override for the SQLite table name. Defaults to `name`. */ + target?: string +} + +export interface ReplicationSourceConfig { + /** Adapter type. `postgres`, `mysql`, or `mock` (for tests). */ + source: 'postgres' | 'mysql' | 'mock' + /** Connection string consumed by the adapter. */ + conn?: string + /** + * How often this source should be pulled, in seconds. Each table inherits + * this interval unless overridden. + */ + intervalSeconds: number + /** Tables to replicate from this source. */ + tables: TableConfig[] + /** Override the page size for this source (default 1000). */ + pageSize?: number +} + +export type ReplicationConfig = ReplicationSourceConfig[] diff --git a/src/index.ts b/src/index.ts index 4d08932..4c33c89 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { QueryLogPlugin } from '../plugins/query-log' import { StatsPlugin } from '../plugins/stats' import { CronPlugin } from '../plugins/cron' import { InterfacePlugin } from '../plugins/interface' +import { ReplicationPlugin } from '../plugins/replication' export { StarbaseDBDurableObject } from './do' @@ -56,9 +57,25 @@ export interface Env { HYPERDRIVE: Hyperdrive + // External-to-internal replication. JSON-encoded ReplicationConfig. + // See plugins/replication/README.md for the schema. Empty/unset disables. + REPLICATION_CONFIG_JSON?: string + // ## DO NOT REMOVE: TEMPLATE INTERFACE ## } +// Module-level singleton so the scheduled() handler and fetch() handler share +// the same plugin instance (and cached adapter pool). +let sharedReplicationPlugin: ReplicationPlugin | undefined +function getReplicationPlugin(env: Env): ReplicationPlugin { + if (!sharedReplicationPlugin) { + sharedReplicationPlugin = new ReplicationPlugin({ + env: { REPLICATION_CONFIG_JSON: env.REPLICATION_CONFIG_JSON }, + }) + } + return sharedReplicationPlugin +} + export default { /** * This is the standard fetch handler for a Cloudflare Worker @@ -210,6 +227,7 @@ export default { }, ctx) const interfacePlugin = new InterfacePlugin() + const replicationPlugin = getReplicationPlugin(env) const plugins = [ webSocketPlugin, @@ -226,6 +244,7 @@ export default { cronPlugin, new StatsPlugin(), interfacePlugin, + replicationPlugin, ] satisfies StarbasePlugin[] const starbase = new StarbaseDB({ @@ -330,4 +349,43 @@ export default { ) } }, + + /** + * Cloudflare Cron Trigger entry point. The plugin owns its own + * per-table scheduling on top of this — the trigger only needs to fire + * often enough to be the smallest interval the user wants. A single + * `* * * * *` trigger is sufficient for any user-configured interval + * down to one minute. + * + * If `REPLICATION_CONFIG_JSON` is unset this is a no-op. + */ + async scheduled(_event, env, ctx): Promise { + if (!env.REPLICATION_CONFIG_JSON) return + + const region = env.REGION ?? RegionLocationHint.AUTO + const id = env.DATABASE_DURABLE_OBJECT.idFromName(DURABLE_OBJECT_ID) + const stub = + region !== RegionLocationHint.AUTO + ? env.DATABASE_DURABLE_OBJECT.get(id, { + locationHint: region as DurableObjectLocationHint, + }) + : env.DATABASE_DURABLE_OBJECT.get(id) + const rpc = await stub.init() + + const dataSource: DataSource = { + rpc, + source: 'internal', + executionContext: ctx, + } + + const plugin = getReplicationPlugin(env) + try { + const summary = await plugin.runDue(dataSource) + if (summary.length > 0) { + console.log('replication: tick complete', summary) + } + } catch (e) { + console.error('replication: scheduled tick failed', e) + } + }, } satisfies ExportedHandler diff --git a/wrangler.toml b/wrangler.toml index 395c4ac..59e2b7a 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -78,3 +78,13 @@ AUTH_JWKS_ENDPOINT = "" # [[hyperdrive]] # binding = "HYPERDRIVE" # id = "" + +# Replication plugin: pulls rows from an external Postgres or MySQL into the +# DO SQLite on a per-table interval. Uncomment the trigger and set the +# REPLICATION_CONFIG_JSON var below to enable. The trigger only needs to fire +# at the smallest interval you want; the plugin handles per-table cadence. +# [triggers] +# crons = ["* * * * *"] + +# Example REPLICATION_CONFIG_JSON (single line, escape as needed): +# REPLICATION_CONFIG_JSON = '[{"source":"postgres","conn":"postgres://user:pass@host/db","intervalSeconds":300,"tables":[{"name":"users","watermark":"updated_at","primaryKey":"id"},{"name":"events","watermark":"id"}]}]'