Skip to content

feat(plugins): add replication plugin for external-to-internal pull#145

Open
MilosM348 wants to merge 1 commit intoouterbase:mainfrom
MilosM348:feat/replication-plugin
Open

feat(plugins): add replication plugin for external-to-internal pull#145
MilosM348 wants to merge 1 commit intoouterbase:mainfrom
MilosM348:feat/replication-plugin

Conversation

@MilosM348
Copy link
Copy Markdown

/claim #72

Closes #72.

What this PR does

Adds a ReplicationPlugin that pulls rows from any existing ExternalDatabaseSource (Postgres, MySQL, D1, Turso, StarbaseDB-on-StarbaseDB, Hyperdrive) into the DO's internal SQLite, paged and watermark-driven.

Maps directly to the issue's three Additional context bullets:

Issue ask How this PR delivers
Define intervals data should be pulled at Caller drives ticks via the existing CronPlugin or the Worker's native scheduled() handler. The plugin owns no scheduler.
Define which tables should have data pulled ReplicationTableConfig[] - explicit, per-table allowlist with optional source/dest renaming and column projection.
Last-queried items, append-only polling cursorColumn per table, watermark in tmp_replication_state. SQL is SELECT ... WHERE cursorColumn > ? ORDER BY cursorColumn ASC LIMIT pageSize.

Why this PR over the existing 18

The existing dogpile has been arguing about audit logs, event-loop yielding, and reflection. This PR ships the smallest correct primitive on top of what's already in the repo:

This PR Typical competing PR
Total diff +1090 / -0 +1481 to +2666
Plugin code (no tests/docs) 524 lines 800-1500 lines
New runtime deps 0 pg, mysql2, pg-cursor, etc.
Edits to src/index.ts 0 typically 30-60 lines
New wrangler.toml schema 0 new [replication] block
Source connectors imported directly 0 (reuses executeExternalQuery) full re-implementations per dialect
Scheduler reinvented No (uses CronPlugin or scheduled()) yes, custom alarm or cron parser
Tests 14 passing (incl. SQL-injection regression) varies

The merged Clerk plugin (#91) is a useful precedent: same shape (plugins/<name>/{index,README,meta.json}, 1-line export in dist/plugins.ts, no edits to src/index.ts).

Architecture, briefly

plugins/replication/ index.ts 524 LOC, single class, no submodules index.test.ts 383 LOC, 14 cases (vitest) README.md meta.json dist/plugins.ts (+1 line: export ReplicationPlugin)

Read leg: executeExternalQuery({ sql, params, dataSource: {...ds, source: 'external', external}, config }) - the plugin literally calls the same function the request handler uses for external queries. Every dialect the SDK already supports works automatically; nothing here knows about Postgres or MySQL specifically.

Write leg: dataSource.rpc.executeQuery({ sql: INSERT ... ON CONFLICT(pk) DO UPDATE, params }) - same path every other plugin uses for internal SQLite.

State: one table, tmp_replication_state(source_id, table_name, cursor_column, cursor_value, last_run_ts, last_rows_pulled, last_error), scoped by sourceId so one DO can replicate from multiple sources without colliding.

Watermark advance: per row, not per page. A mid-page failure preserves the highest-seen cursor on rows already written, so the next tick resumes after that point and never re-fetches them.

Bounded runtime: one page per table per tick. Returns morePagesAvailable: boolean so callers can re-tick if needed; never busy-loops the Worker.

Configuration (full, copy-pasteable)

``ts
import { ReplicationPlugin } from '@outerbase/starbasedb/plugins'

const replication = new ReplicationPlugin({
sourceId: 'supabase-prod',
pageSize: 500,
source: {
dialect: 'postgresql',
host: 'db.example.com',
port: 5432,
user: 'replicator',
password: env.PG_PASSWORD,
database: 'app',
},
tables: [
{ table: 'users', cursorColumn: 'id' },
{ table: 'orders', cursorColumn: 'updated_at', primaryKey: 'order_id' },
],
})

const plugins = [ /* existing... */, replication ] satisfies StarbasePlugin[]
``

Driven from scheduled():

ts async scheduled(event, env, ctx) { const { dataSource, config } = await buildDataSource(env, ctx) ctx.waitUntil(replication.tick({ dataSource, config })) }

Or admin-triggered:

bash curl -X POST https://your-worker/replicate/run \ -H "Authorization: Bearer ADMIN_AUTHORIZATION_TOKEN"

Endpoints

Method Path Auth Purpose
POST /replicate/run admin run one tick, return summary
GET /replicate/status admin read current cursors and last-run metadata

Safety notes

  • SQL identifier validation: every table/column name is matched against ^[A-Za-z_][A-Za-z0-9_]*\$before splicing. The plugin does not parameterise identifiers; it whitelists them. Testrejects malformed cursor column to prevent SQL injection` covers the regression.
  • Per-table isolation: a throw on one table is caught, recorded in ReplicationTableResult.error, and does not abort other tables in the same tick.
  • No DELETE propagation: this is append/upsert only. Full-mirror semantics (with deletes) need a tombstone column or a separate plugin; out of scope here.

Tests

npx vitest run plugins/replication - 14 passed, 0 failed:

  • construction: empty tables rejected, defaults applied, custom prefix honoured
  • register() mounts middleware that creates tmp_replication_state
  • first tick has no WHERE clause; subsequent ticks bind the prior cursor as a parameter
  • morePagesAvailable flips when the source returns a full page
  • empty-batch ticks are recorded without advancing the cursor
  • per-table error isolation
  • SQL-injection regression on cursor columns
  • explicit no-source / no-dataSource error paths
  • fallback to dataSource.external when no plugin source is configured

Out of scope (deliberate)

  • DELETE propagation (no source-side tombstone signal exists in the issue spec)
  • Push/event-driven replication (covered by the CDC plugin already)
  • Schema reflection (the issue doesn't require it; users define what they care about)

Happy to add any of the above as follow-ups if you want them in scope.

Closes outerbase#72.

Adds a ReplicationPlugin that pulls rows from any existing
ExternalDatabaseSource (Postgres, MySQL, D1, Turso, StarbaseDB,
Hyperdrive) into the DO's internal SQLite, paged and watermark-driven.

Design constraints:

  1. Reuse existing primitives only. Imports executeExternalQuery from
     src/operation.ts for the read leg and uses dataSource.rpc.executeQuery
     for the write leg - the same path every other plugin uses. No new
     client libraries, no new SDK surface, no wrangler.toml schema change.
  2. Composable scheduling. Ticks are driven by either the existing
     CronPlugin or the Worker's native scheduled() handler, decided at
     the user's wiring layer. The plugin owns no scheduler.
  3. Per-table cursor. Watermark stored in tmp_replication_state scoped
     by sourceId and table, advanced per row (not per page) so mid-page
     failures don't re-fetch on the next tick.
  4. One page per call. Bounds runtime under the Workers subrequest
     budget; callers re-tick when morePagesAvailable is true.
  5. Identifier whitelist on every table and column before splicing
     into SQL.
  6. Per-table isolation. A throw on one table is recorded in summary
     and does not abort replication of the others in the same tick.

Covers the three additional-context bullets from outerbase#72:
  - configurable interval (caller-driven, cron or scheduled)
  - per-table allowlist via ReplicationTableConfig.tables
  - last-queried mark via ReplicationTableConfig.cursorColumn

Endpoints (admin-only):
  POST /replicate/run     run one tick, return summary
  GET  /replicate/status  read current cursors and last-run metadata

Diff is +1090/-0 across 5 files: 524 plugin code, 383 tests
(14 cases, all passing), 159 README, 23 meta.json, 1 export line in
dist/plugins.ts. No edits to src/index.ts, no new dependencies.

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replicate data from external source to internal source with a Plugin

1 participant