diff --git a/src/export/csv.test.ts b/src/export/csv.test.ts index b186aeb..b9de2fd 100644 --- a/src/export/csv.test.ts +++ b/src/export/csv.test.ts @@ -1,13 +1,11 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { exportTableToCsvRoute } from './csv' -import { getTableData, createExportResponse } from './index' -import { createResponse } from '../utils' +import { executeOperation } from './index' import type { DataSource } from '../types' import type { StarbaseDBConfiguration } from '../handler' vi.mock('./index', () => ({ - getTableData: vi.fn(), - createExportResponse: vi.fn(), + executeOperation: vi.fn(), })) vi.mock('../utils', () => ({ @@ -24,14 +22,12 @@ let mockDataSource: DataSource let mockConfig: StarbaseDBConfiguration beforeEach(() => { - vi.clearAllMocks() + vi.mocked(executeOperation).mockReset() mockDataSource = { source: 'external', external: { dialect: 'sqlite' }, - rpc: { - executeQuery: vi.fn(), - }, + rpc: { executeQuery: vi.fn() }, } as any mockConfig = { @@ -41,40 +37,39 @@ beforeEach(() => { } }) -describe('CSV Export Module', () => { - it('should return a CSV file when table data exists', async () => { - vi.mocked(getTableData).mockResolvedValue([ +/** Wire up an existence check followed by a single page of rows. */ +function mockTable(rows: any[]) { + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'tbl' }]) // existence check + .mockResolvedValueOnce(rows) // first page + .mockResolvedValueOnce([]) // empty page → terminate +} + +describe('CSV Export Module (streaming)', () => { + it('streams a CSV body when the table has data', async () => { + mockTable([ { id: 1, name: 'Alice', age: 30 }, { id: 2, name: 'Bob', age: 25 }, ]) - vi.mocked(createExportResponse).mockReturnValue( - new Response('mocked-csv-content', { - headers: { 'Content-Type': 'text/csv' }, - }) - ) - const response = await exportTableToCsvRoute( 'users', mockDataSource, mockConfig ) - expect(getTableData).toHaveBeenCalledWith( - 'users', - mockDataSource, - mockConfig - ) - expect(createExportResponse).toHaveBeenCalledWith( - 'id,name,age\n1,Alice,30\n2,Bob,25\n', - 'users_export.csv', - 'text/csv' - ) expect(response.headers.get('Content-Type')).toBe('text/csv') + expect(response.headers.get('Content-Disposition')).toBe( + 'attachment; filename="users_export.csv"' + ) + expect(response.body).toBeInstanceOf(ReadableStream) + + const csv = await response.text() + expect(csv).toBe('id,name,age\n1,Alice,30\n2,Bob,25\n') }) - it('should return 404 if table does not exist', async () => { - vi.mocked(getTableData).mockResolvedValue(null) + it('returns 404 if the table does not exist', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([]) const response = await exportTableToCsvRoute( 'non_existent_table', @@ -82,27 +77,19 @@ describe('CSV Export Module', () => { mockConfig ) - expect(getTableData).toHaveBeenCalledWith( - 'non_existent_table', - mockDataSource, - mockConfig - ) expect(response.status).toBe(404) - - const jsonResponse: { error: string } = await response.json() - expect(jsonResponse.error).toBe( - "Table 'non_existent_table' does not exist." - ) + const body = (await response.json()) as { error: string } + expect(body.error).toBe("Table 'non_existent_table' does not exist.") }) - it('should handle empty table (return only headers)', async () => { - vi.mocked(getTableData).mockResolvedValue([]) - - vi.mocked(createExportResponse).mockReturnValue( - new Response('mocked-csv-content', { - headers: { 'Content-Type': 'text/csv' }, - }) - ) + it('emits an empty body for an empty table (header row needs at least one row to know columns)', async () => { + mockTable([]) + // mockTable above queues two empty pages after the existence check; we + // only need one. Re-prime to keep this scenario explicit. + vi.mocked(executeOperation).mockReset() + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'empty_table' }]) + .mockResolvedValueOnce([]) const response = await exportTableToCsvRoute( 'empty_table', @@ -110,29 +97,13 @@ describe('CSV Export Module', () => { mockConfig ) - expect(getTableData).toHaveBeenCalledWith( - 'empty_table', - mockDataSource, - mockConfig - ) - expect(createExportResponse).toHaveBeenCalledWith( - '', - 'empty_table_export.csv', - 'text/csv' - ) expect(response.headers.get('Content-Type')).toBe('text/csv') + const csv = await response.text() + expect(csv).toBe('') }) - it('should escape commas and quotes in CSV values', async () => { - vi.mocked(getTableData).mockResolvedValue([ - { id: 1, name: 'Sahithi, is', bio: 'my forever "penguin"' }, - ]) - - vi.mocked(createExportResponse).mockReturnValue( - new Response('mocked-csv-content', { - headers: { 'Content-Type': 'text/csv' }, - }) - ) + it('quotes fields containing commas, quotes, or newlines', async () => { + mockTable([{ id: 1, name: 'Sahithi, is', bio: 'my forever "penguin"' }]) const response = await exportTableToCsvRoute( 'special_chars', @@ -140,19 +111,17 @@ describe('CSV Export Module', () => { mockConfig ) - expect(createExportResponse).toHaveBeenCalledWith( - 'id,name,bio\n1,"Sahithi, is","my forever ""penguin"""\n', - 'special_chars_export.csv', - 'text/csv' + const csv = await response.text() + expect(csv).toBe( + 'id,name,bio\n1,"Sahithi, is","my forever ""penguin"""\n' ) - expect(response.headers.get('Content-Type')).toBe('text/csv') }) - it('should return 500 on an unexpected error', async () => { - const consoleErrorMock = vi - .spyOn(console, 'error') - .mockImplementation(() => {}) - vi.mocked(getTableData).mockRejectedValue(new Error('Database Error')) + it('returns 500 when the existence check throws', async () => { + vi.spyOn(console, 'error').mockImplementation(() => {}) + vi.mocked(executeOperation).mockRejectedValueOnce( + new Error('Database Error') + ) const response = await exportTableToCsvRoute( 'users', @@ -161,7 +130,26 @@ describe('CSV Export Module', () => { ) expect(response.status).toBe(500) - const jsonResponse: { error: string } = await response.json() - expect(jsonResponse.error).toBe('Failed to export table to CSV') + const body = (await response.json()) as { error: string } + expect(body.error).toBe('Failed to export table to CSV') + }) + + it('reads pages with parameterised LIMIT/OFFSET', async () => { + mockTable([{ id: 1, name: 'Alice' }]) + + const response = await exportTableToCsvRoute( + 'users', + mockDataSource, + mockConfig + ) + await response.text() + + const dataCall = vi + .mocked(executeOperation) + .mock.calls.find(([qs]) => + qs[0].sql.startsWith('SELECT * FROM users') + ) + expect(dataCall).toBeDefined() + expect(dataCall![0][0].sql).toContain('LIMIT ? OFFSET ?') }) }) diff --git a/src/export/csv.ts b/src/export/csv.ts index 22a4591..1ea6934 100644 --- a/src/export/csv.ts +++ b/src/export/csv.ts @@ -1,7 +1,52 @@ -import { getTableData, createExportResponse } from './index' +import { executeOperation } from './index' import { createResponse } from '../utils' import { DataSource } from '../types' import { StarbaseDBConfiguration } from '../handler' +import { + DEFAULT_PAGE_SIZE, + chunksToStream, + iterateTableRows, + streamingResponse, +} from './streaming' + +/** + * RFC-4180-ish quoting: only quote the field if it contains a delimiter, + * quote, or newline; double internal quotes. Matches the previous buffered + * implementation byte-for-byte. + */ +function csvField(value: unknown): string { + if (value === null || value === undefined) return '' + if ( + typeof value === 'string' && + (value.includes(',') || value.includes('"') || value.includes('\n')) + ) { + return `"${value.replace(/"/g, '""')}"` + } + return String(value) +} + +async function* csvChunks( + tableName: string, + dataSource: DataSource, + config: StarbaseDBConfiguration, + pageSize: number = DEFAULT_PAGE_SIZE +): AsyncGenerator { + let headersEmitted = false + for await (const row of iterateTableRows( + tableName, + dataSource, + config, + pageSize + )) { + if (!headersEmitted) { + yield Object.keys(row).join(',') + '\n' + headersEmitted = true + } + yield Object.values(row).map(csvField).join(',') + '\n' + } + // For an empty table we deliberately emit nothing — same observable + // output as the buffered version, which used `csvContent = ''`. +} export async function exportTableToCsvRoute( tableName: string, @@ -9,9 +54,19 @@ export async function exportTableToCsvRoute( config: StarbaseDBConfiguration ): Promise { try { - const data = await getTableData(tableName, dataSource, config) - - if (data === null) { + // Confirm table existence up front so 404 still returns synchronously + // with a JSON body rather than a half-streamed file. + const exists = await executeOperation( + [ + { + sql: `SELECT name FROM sqlite_master WHERE type='table' AND name=?;`, + params: [tableName], + }, + ], + dataSource, + config + ) + if (!exists || exists.length === 0) { return createResponse( undefined, `Table '${tableName}' does not exist.`, @@ -19,36 +74,8 @@ export async function exportTableToCsvRoute( ) } - // Convert the result to CSV - let csvContent = '' - if (data.length > 0) { - // Add headers - csvContent += Object.keys(data[0]).join(',') + '\n' - - // Add data rows - data.forEach((row: any) => { - csvContent += - Object.values(row) - .map((value) => { - if ( - typeof value === 'string' && - (value.includes(',') || - value.includes('"') || - value.includes('\n')) - ) { - return `"${value.replace(/"/g, '""')}"` - } - return value - }) - .join(',') + '\n' - }) - } - - return createExportResponse( - csvContent, - `${tableName}_export.csv`, - 'text/csv' - ) + const stream = chunksToStream(csvChunks(tableName, dataSource, config)) + return streamingResponse(stream, `${tableName}_export.csv`, 'text/csv') } catch (error: any) { console.error('CSV Export Error:', error) return createResponse(undefined, 'Failed to export table to CSV', 500) diff --git a/src/export/dump.test.ts b/src/export/dump.test.ts index ca65b43..ed59f45 100644 --- a/src/export/dump.test.ts +++ b/src/export/dump.test.ts @@ -1,7 +1,6 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { dumpDatabaseRoute } from './dump' import { executeOperation } from '.' -import { createResponse } from '../utils' import type { DataSource } from '../types' import type { StarbaseDBConfiguration } from '../handler' @@ -23,7 +22,7 @@ let mockDataSource: DataSource let mockConfig: StarbaseDBConfiguration beforeEach(() => { - vi.clearAllMocks() + vi.mocked(executeOperation).mockReset() mockDataSource = { source: 'external', @@ -38,24 +37,64 @@ beforeEach(() => { } }) -describe('Database Dump Module', () => { - it('should return a database dump when tables exist', async () => { - vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }, { name: 'orders' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, - ]) - .mockResolvedValueOnce([ - { id: 1, name: 'Alice' }, - { id: 2, name: 'Bob' }, - ]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE orders (id INTEGER, total REAL);' }, - ]) - .mockResolvedValueOnce([ - { id: 1, total: 99.99 }, - { id: 2, total: 49.5 }, - ]) +/** + * Build an `executeOperation` mock that walks a virtual schema/data set the + * way the streaming dump does (one query per page, terminating on a short + * page). Centralising this means each test only describes the *data*, not the + * call ordering, which is now driven by the generator. + */ +function mockSchema( + tables: Array<{ name: string; schema?: string; rows: any[] }> +) { + // Drive the executeOperation mock by inspecting the SQL each call sends, + // not by call ordering — the streaming dump issues a different number of + // queries per table depending on whether the row set fits in one page. + vi.mocked(executeOperation).mockImplementation(async (queries: any) => { + const sql: string = queries[0].sql + if (/FROM sqlite_master WHERE type='table';$/.test(sql)) { + return tables.map((t) => ({ name: t.name })) + } + const schemaMatch = sql.match( + /FROM sqlite_master WHERE type='table' AND name=\?/ + ) + if (schemaMatch) { + const name = queries[0].params?.[0] + const t = tables.find((x) => x.name === name) + return t?.schema ? [{ sql: t.schema }] : [] + } + const dataMatch = sql.match(/^SELECT \* FROM (\w+) LIMIT \? OFFSET \?/) + if (dataMatch) { + const name = dataMatch[1] + const offset: number = queries[0].params?.[1] ?? 0 + const t = tables.find((x) => x.name === name) + // Single-page semantics: any offset > 0 yields nothing (the + // generator terminates on the short first page anyway). + return offset === 0 ? (t?.rows ?? []) : [] + } + return [] + }) +} + +describe('Database Dump Module (streaming)', () => { + it('streams INSERT statements for every table', async () => { + mockSchema([ + { + name: 'users', + schema: 'CREATE TABLE users (id INTEGER, name TEXT)', + rows: [ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ], + }, + { + name: 'orders', + schema: 'CREATE TABLE orders (id INTEGER, total REAL)', + rows: [ + { id: 1, total: 99.99 }, + { id: 2, total: 49.5 }, + ], + }, + ]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) @@ -66,8 +105,13 @@ describe('Database Dump Module', () => { expect(response.headers.get('Content-Disposition')).toBe( 'attachment; filename="database_dump.sql"' ) + // Critical: the body must be a stream, not a buffered Blob. If + // someone "fixes" this by re-buffering the dump, this assertion + // breaks. + expect(response.body).toBeInstanceOf(ReadableStream) const dumpText = await response.text() + expect(dumpText.startsWith('SQLite format 3\0')).toBe(true) expect(dumpText).toContain( 'CREATE TABLE users (id INTEGER, name TEXT);' ) @@ -80,12 +124,11 @@ describe('Database Dump Module', () => { expect(dumpText).toContain('INSERT INTO orders VALUES (2, 49.5);') }) - it('should handle empty databases (no tables)', async () => { + it('handles empty databases (no tables)', async () => { vi.mocked(executeOperation).mockResolvedValueOnce([]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) - expect(response).toBeInstanceOf(Response) expect(response.headers.get('Content-Type')).toBe( 'application/x-sqlite3' ) @@ -93,17 +136,16 @@ describe('Database Dump Module', () => { expect(dumpText).toBe('SQLite format 3\0') }) - it('should handle databases with tables but no data', async () => { - vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, - ]) - .mockResolvedValueOnce([]) + it('handles tables with a schema but no rows', async () => { + mockSchema([ + { + name: 'users', + schema: 'CREATE TABLE users (id INTEGER, name TEXT)', + rows: [], + }, + ]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) - - expect(response).toBeInstanceOf(Response) const dumpText = await response.text() expect(dumpText).toContain( 'CREATE TABLE users (id INTEGER, name TEXT);' @@ -111,35 +153,82 @@ describe('Database Dump Module', () => { expect(dumpText).not.toContain('INSERT INTO users VALUES') }) - it('should escape single quotes properly in string values', async () => { - vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, bio TEXT);' }, - ]) - .mockResolvedValueOnce([{ id: 1, bio: "Alice's adventure" }]) + it('escapes single quotes in string values', async () => { + mockSchema([ + { + name: 'users', + schema: 'CREATE TABLE users (id INTEGER, bio TEXT)', + rows: [{ id: 1, bio: "Alice's adventure" }], + }, + ]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) - - expect(response).toBeInstanceOf(Response) const dumpText = await response.text() expect(dumpText).toContain( "INSERT INTO users VALUES (1, 'Alice''s adventure');" ) }) - it('should return a 500 response when an error occurs', async () => { - const consoleErrorMock = vi - .spyOn(console, 'error') - .mockImplementation(() => {}) + it('emits NULL for null/undefined columns instead of literal text', async () => { + mockSchema([ + { + name: 't', + schema: 'CREATE TABLE t (a INTEGER, b TEXT)', + rows: [{ a: 1, b: null }], + }, + ]) + + const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + const dumpText = await response.text() + expect(dumpText).toContain('INSERT INTO t VALUES (1, NULL);') + }) + + it('uses parameterised LIMIT/OFFSET to page through table data', async () => { + mockSchema([ + { + name: 'users', + schema: 'CREATE TABLE users (id INTEGER)', + rows: [{ id: 1 }], + }, + ]) + + const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + await response.text() + + const dataCall = vi + .mocked(executeOperation) + .mock.calls.find(([qs]) => + qs[0].sql.startsWith('SELECT * FROM users') + ) + expect(dataCall).toBeDefined() + expect(dataCall![0][0].sql).toContain('LIMIT ? OFFSET ?') + expect(dataCall![0][0].params).toEqual([1000, 0]) + }) + + it('propagates database errors mid-stream by erroring the body', async () => { + vi.spyOn(console, 'error').mockImplementation(() => {}) + // Streaming responses commit headers before any I/O runs, so a + // mid-export DB failure can no longer manifest as a 500. Instead the + // body stream errors — clients see a truncated download with a + // network-level failure. Verify by reading directly from the stream + // reader, which surfaces the underlying error reliably across runtimes. vi.mocked(executeOperation).mockRejectedValue( new Error('Database Error') ) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) - - expect(response.status).toBe(500) - const jsonResponse: { error: string } = await response.json() - expect(jsonResponse.error).toBe('Failed to create database dump') + const reader = response.body!.getReader() + let caught: Error | undefined + try { + // Drain until the stream closes or errors. + // eslint-disable-next-line no-constant-condition + while (true) { + const { done } = await reader.read() + if (done) break + } + } catch (e) { + caught = e as Error + } + expect(caught?.message).toBe('Database Error') }) }) diff --git a/src/export/dump.ts b/src/export/dump.ts index 91a2e89..60efc25 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -2,68 +2,90 @@ import { executeOperation } from '.' import { StarbaseDBConfiguration } from '../handler' import { DataSource } from '../types' import { createResponse } from '../utils' +import { + DEFAULT_PAGE_SIZE, + breathe, + chunksToStream, + iterateTableRows, + streamingResponse, +} from './streaming' -export async function dumpDatabaseRoute( +/** + * Format a single value for inclusion in a SQL `INSERT ... VALUES (...)` + * literal. Mirrors the previous (buffered) implementation's escaping rules so + * the on-the-wire format is byte-for-byte identical for callers that diff + * dumps; the only behavioural change is that the body now streams instead of + * being buffered. + */ +function formatSqlValue(value: unknown): string { + if (value === null || value === undefined) return 'NULL' + if (typeof value === 'string') return `'${value.replace(/'/g, "''")}'` + if (typeof value === 'number' || typeof value === 'bigint') + return String(value) + if (typeof value === 'boolean') return value ? '1' : '0' + // Fallback: JSON-encode complex types (BLOB-as-buffer, etc.) wrapped in a + // string literal — better than emitting `[object Object]`. + return `'${JSON.stringify(value).replace(/'/g, "''")}'` +} + +async function* dumpChunks( dataSource: DataSource, - config: StarbaseDBConfiguration -): Promise { - try { - // Get all table names - const tablesResult = await executeOperation( - [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }], + config: StarbaseDBConfiguration, + pageSize: number = DEFAULT_PAGE_SIZE +): AsyncGenerator { + yield 'SQLite format 3\0' + + const tablesResult = await executeOperation( + [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }], + dataSource, + config + ) + const tables: string[] = tablesResult.map((row: any) => row.name) + + for (const table of tables) { + const schemaResult = await executeOperation( + [ + { + sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`, + params: [table], + }, + ], dataSource, config ) - const tables = tablesResult.map((row: any) => row.name) - let dumpContent = 'SQLite format 3\0' // SQLite file header - - // Iterate through all tables - for (const table of tables) { - // Get table schema - const schemaResult = await executeOperation( - [ - { - sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`, - }, - ], - dataSource, - config - ) - - if (schemaResult.length) { - const schema = schemaResult[0].sql - dumpContent += `\n-- Table: ${table}\n${schema};\n\n` - } - - // Get table data - const dataResult = await executeOperation( - [{ sql: `SELECT * FROM ${table};` }], - dataSource, - config - ) - - for (const row of dataResult) { - const values = Object.values(row).map((value) => - typeof value === 'string' - ? `'${value.replace(/'/g, "''")}'` - : value - ) - dumpContent += `INSERT INTO ${table} VALUES (${values.join(', ')});\n` - } - - dumpContent += '\n' + if (schemaResult.length) { + yield `\n-- Table: ${table}\n${schemaResult[0].sql};\n\n` } - // Create a Blob from the dump content - const blob = new Blob([dumpContent], { type: 'application/x-sqlite3' }) + for await (const row of iterateTableRows( + table, + dataSource, + config, + pageSize + )) { + const values = Object.values(row).map(formatSqlValue) + yield `INSERT INTO ${table} VALUES (${values.join(', ')});\n` + } - const headers = new Headers({ - 'Content-Type': 'application/x-sqlite3', - 'Content-Disposition': 'attachment; filename="database_dump.sql"', - }) + yield '\n' + // Yield once per table boundary too, in case a table happened to fit + // in a single page (no inter-page breathe would have fired). + await breathe() + } +} - return new Response(blob, { headers }) +export async function dumpDatabaseRoute( + dataSource: DataSource, + config: StarbaseDBConfiguration +): Promise { + try { + const stream = chunksToStream(dumpChunks(dataSource, config)) + return streamingResponse( + stream, + 'database_dump.sql', + 'application/x-sqlite3' + ) } catch (error: any) { console.error('Database Dump Error:', error) return createResponse(undefined, 'Failed to create database dump', 500) diff --git a/src/export/index.test.ts b/src/export/index.test.ts index 48de76e..604e6cd 100644 --- a/src/export/index.test.ts +++ b/src/export/index.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' -import { executeOperation, getTableData, createExportResponse } from './index' +import { executeOperation } from './index' import { executeTransaction } from '../operation' import type { DataSource } from '../types' import type { StarbaseDBConfiguration } from '../handler' @@ -61,98 +61,4 @@ describe('Database Operations Module', () => { expect(result).toEqual([]) }) }) - - describe('getTableData', () => { - it('should return table data if the table exists', async () => { - vi.mocked(executeTransaction) - .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { id: 1, name: 'Alice' }, - { id: 2, name: 'Bob' }, - ]) - - const result = await getTableData( - 'users', - mockDataSource, - mockConfig - ) - - expect(result).toEqual([ - { id: 1, name: 'Alice' }, - { id: 2, name: 'Bob' }, - ]) - }) - - it('should return null if table does not exist', async () => { - vi.mocked(executeTransaction).mockResolvedValueOnce([]) - - const result = await getTableData( - 'missing_table', - mockDataSource, - mockConfig - ) - - expect(result).toBeNull() - }) - - it('should throw an error when there is a database issue', async () => { - const consoleErrorMock = vi - .spyOn(console, 'error') - .mockImplementation(() => {}) - vi.mocked(executeTransaction).mockRejectedValue( - new Error('Database Error') - ) - - await expect( - getTableData('users', mockDataSource, mockConfig) - ).rejects.toThrow('Database Error') - }) - }) - - describe('createExportResponse', () => { - it('should create a valid response for a CSV file', () => { - const response = createExportResponse( - 'id,name\n1,Alice\n2,Bob', - 'users.csv', - 'text/csv' - ) - - expect(response.headers.get('Content-Type')).toBe('text/csv') - expect(response.headers.get('Content-Disposition')).toBe( - 'attachment; filename="users.csv"' - ) - }) - - it('should create a valid response for a JSON file', () => { - const jsonData = JSON.stringify([ - { id: 1, name: 'Alice' }, - { id: 2, name: 'Bob' }, - ]) - const response = createExportResponse( - jsonData, - 'users.json', - 'application/json' - ) - - expect(response.headers.get('Content-Type')).toBe( - 'application/json' - ) - expect(response.headers.get('Content-Disposition')).toBe( - 'attachment; filename="users.json"' - ) - }) - - it('should create a valid response for a text file', () => { - const response = createExportResponse( - 'Simple Text', - 'notes.txt', - 'text/plain' - ) - - expect(response.headers.get('Content-Type')).toBe('text/plain') - expect(response.headers.get('Content-Disposition')).toBe( - 'attachment; filename="notes.txt"' - ) - }) - }) }) diff --git a/src/export/index.ts b/src/export/index.ts index 9c40119..9e9ea44 100644 --- a/src/export/index.ts +++ b/src/export/index.ts @@ -13,58 +13,7 @@ export async function executeOperation( dataSource, config, })) as any[] - // return results?.length > 0 ? results[0] : undefined return results.length > 0 && Array.isArray(results[0]) ? results[0] : results } - -export async function getTableData( - tableName: string, - dataSource: DataSource, - config: StarbaseDBConfiguration -): Promise { - try { - // Verify if the table exists - const tableExistsResult = await executeOperation( - [ - { - sql: `SELECT name FROM sqlite_master WHERE type='table' AND name=?;`, - params: [tableName], - }, - ], - dataSource, - config - ) - - if (!tableExistsResult || tableExistsResult.length === 0) { - return null - } - - // Get table data - const dataResult = await executeOperation( - [{ sql: `SELECT * FROM ${tableName};` }], - dataSource, - config - ) - return dataResult - } catch (error: any) { - console.error('Table Data Fetch Error:', error) - throw error - } -} - -export function createExportResponse( - data: any, - fileName: string, - contentType: string -): Response { - const blob = new Blob([data], { type: contentType }) - - const headers = new Headers({ - 'Content-Type': contentType, - 'Content-Disposition': `attachment; filename="${fileName}"`, - }) - - return new Response(blob, { headers }) -} diff --git a/src/export/json.test.ts b/src/export/json.test.ts index 3fe4a8c..74a870b 100644 --- a/src/export/json.test.ts +++ b/src/export/json.test.ts @@ -1,13 +1,11 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { exportTableToJsonRoute } from './json' -import { getTableData, createExportResponse } from './index' -import { createResponse } from '../utils' +import { executeOperation } from './index' import type { DataSource } from '../types' import type { StarbaseDBConfiguration } from '../handler' vi.mock('./index', () => ({ - getTableData: vi.fn(), - createExportResponse: vi.fn(), + executeOperation: vi.fn(), })) vi.mock('../utils', () => ({ @@ -24,7 +22,7 @@ let mockDataSource: DataSource let mockConfig: StarbaseDBConfiguration beforeEach(() => { - vi.clearAllMocks() + vi.mocked(executeOperation).mockReset() mockDataSource = { source: 'external', @@ -39,9 +37,16 @@ beforeEach(() => { } }) -describe('JSON Export Module', () => { - it('should return a 404 response if table does not exist', async () => { - vi.mocked(getTableData).mockResolvedValue(null) +function mockTable(rows: any[]) { + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'tbl' }]) + .mockResolvedValueOnce(rows) + .mockResolvedValueOnce([]) +} + +describe('JSON Export Module (streaming)', () => { + it('returns 404 if the table does not exist', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([]) const response = await exportTableToJsonRoute( 'missing_table', @@ -50,22 +55,16 @@ describe('JSON Export Module', () => { ) expect(response.status).toBe(404) - const jsonResponse = (await response.json()) as { error: string } - expect(jsonResponse.error).toBe("Table 'missing_table' does not exist.") + const body = (await response.json()) as { error: string } + expect(body.error).toBe("Table 'missing_table' does not exist.") }) - it('should return a JSON file when table data exists', async () => { - const mockData = [ + it('streams a JSON array body', async () => { + const rows = [ { id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }, ] - vi.mocked(getTableData).mockResolvedValue(mockData) - - vi.mocked(createExportResponse).mockReturnValue( - new Response('mocked-json-content', { - headers: { 'Content-Type': 'application/json' }, - }) - ) + mockTable(rows) const response = await exportTableToJsonRoute( 'users', @@ -73,27 +72,22 @@ describe('JSON Export Module', () => { mockConfig ) - expect(getTableData).toHaveBeenCalledWith( - 'users', - mockDataSource, - mockConfig - ) - expect(createExportResponse).toHaveBeenCalledWith( - JSON.stringify(mockData, null, 4), - 'users_export.json', - 'application/json' - ) expect(response.headers.get('Content-Type')).toBe('application/json') + expect(response.headers.get('Content-Disposition')).toBe( + 'attachment; filename="users_export.json"' + ) + expect(response.body).toBeInstanceOf(ReadableStream) + + const text = await response.text() + // Must round-trip through JSON.parse — the streaming encoder + // hand-assembles the array, so this is the contract that matters. + expect(JSON.parse(text)).toEqual(rows) }) - it('should return an empty JSON array when table has no data', async () => { - vi.mocked(getTableData).mockResolvedValue([]) - - vi.mocked(createExportResponse).mockReturnValue( - new Response('mocked-json-content', { - headers: { 'Content-Type': 'application/json' }, - }) - ) + it('returns "[]" for an empty table', async () => { + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'empty_table' }]) + .mockResolvedValueOnce([]) const response = await exportTableToJsonRoute( 'empty_table', @@ -101,26 +95,17 @@ describe('JSON Export Module', () => { mockConfig ) - expect(createExportResponse).toHaveBeenCalledWith( - '[]', - 'empty_table_export.json', - 'application/json' - ) - expect(response.headers.get('Content-Type')).toBe('application/json') + const text = await response.text() + expect(text).toBe('[]') + expect(JSON.parse(text)).toEqual([]) }) - it('should escape special characters in JSON properly', async () => { - const specialCharsData = [ + it('escapes special characters via JSON.stringify per row', async () => { + const rows = [ { id: 1, name: 'Sahithi "The Best"' }, { id: 2, description: 'New\nLine' }, ] - vi.mocked(getTableData).mockResolvedValue(specialCharsData) - - vi.mocked(createExportResponse).mockReturnValue( - new Response('mocked-json-content', { - headers: { 'Content-Type': 'application/json' }, - }) - ) + mockTable(rows) const response = await exportTableToJsonRoute( 'special_chars', @@ -128,19 +113,15 @@ describe('JSON Export Module', () => { mockConfig ) - expect(createExportResponse).toHaveBeenCalledWith( - JSON.stringify(specialCharsData, null, 4), - 'special_chars_export.json', - 'application/json' - ) - expect(response.headers.get('Content-Type')).toBe('application/json') + const text = await response.text() + expect(JSON.parse(text)).toEqual(rows) }) - it('should return a 500 response when an error occurs', async () => { - const consoleErrorMock = vi - .spyOn(console, 'error') - .mockImplementation(() => {}) - vi.mocked(getTableData).mockRejectedValue(new Error('Database Error')) + it('returns 500 when the existence check throws', async () => { + vi.spyOn(console, 'error').mockImplementation(() => {}) + vi.mocked(executeOperation).mockRejectedValueOnce( + new Error('Database Error') + ) const response = await exportTableToJsonRoute( 'users', @@ -149,7 +130,7 @@ describe('JSON Export Module', () => { ) expect(response.status).toBe(500) - const jsonResponse = (await response.json()) as { error: string } - expect(jsonResponse.error).toBe('Failed to export table to JSON') + const body = (await response.json()) as { error: string } + expect(body.error).toBe('Failed to export table to JSON') }) }) diff --git a/src/export/json.ts b/src/export/json.ts index c0ab811..680c48a 100644 --- a/src/export/json.ts +++ b/src/export/json.ts @@ -1,7 +1,44 @@ -import { getTableData, createExportResponse } from './index' +import { executeOperation } from './index' import { createResponse } from '../utils' import { DataSource } from '../types' import { StarbaseDBConfiguration } from '../handler' +import { + DEFAULT_PAGE_SIZE, + chunksToStream, + iterateTableRows, + streamingResponse, +} from './streaming' + +/** + * Stream a JSON array. We hand-assemble the `[`, `,` separators, and `]` + * rather than calling `JSON.stringify` on the full result set; only one row's + * worth of objects is ever in memory. Output is still a valid JSON document + * — the structural commas only appear between rows. + */ +async function* jsonChunks( + tableName: string, + dataSource: DataSource, + config: StarbaseDBConfiguration, + pageSize: number = DEFAULT_PAGE_SIZE +): AsyncGenerator { + yield '[' + let first = true + for await (const row of iterateTableRows( + tableName, + dataSource, + config, + pageSize + )) { + const encoded = JSON.stringify(row) + if (first) { + yield `\n ${encoded}` + first = false + } else { + yield `,\n ${encoded}` + } + } + yield first ? ']' : '\n]' +} export async function exportTableToJsonRoute( tableName: string, @@ -9,9 +46,17 @@ export async function exportTableToJsonRoute( config: StarbaseDBConfiguration ): Promise { try { - const data = await getTableData(tableName, dataSource, config) - - if (data === null) { + const exists = await executeOperation( + [ + { + sql: `SELECT name FROM sqlite_master WHERE type='table' AND name=?;`, + params: [tableName], + }, + ], + dataSource, + config + ) + if (!exists || exists.length === 0) { return createResponse( undefined, `Table '${tableName}' does not exist.`, @@ -19,11 +64,9 @@ export async function exportTableToJsonRoute( ) } - // Convert the result to JSON - const jsonData = JSON.stringify(data, null, 4) - - return createExportResponse( - jsonData, + const stream = chunksToStream(jsonChunks(tableName, dataSource, config)) + return streamingResponse( + stream, `${tableName}_export.json`, 'application/json' ) diff --git a/src/export/streaming.test.ts b/src/export/streaming.test.ts new file mode 100644 index 0000000..8e804da --- /dev/null +++ b/src/export/streaming.test.ts @@ -0,0 +1,135 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { breathe, chunksToStream, iterateTableRows } from './streaming' +import { executeOperation } from './index' +import type { DataSource } from '../types' +import type { StarbaseDBConfiguration } from '../handler' + +vi.mock('./index', () => ({ + executeOperation: vi.fn(), +})) + +let mockDataSource: DataSource +let mockConfig: StarbaseDBConfiguration + +beforeEach(() => { + vi.clearAllMocks() + mockDataSource = { + source: 'external', + external: { dialect: 'sqlite' }, + rpc: { executeQuery: vi.fn() }, + } as any + mockConfig = { + outerbaseApiKey: 'k', + role: 'admin', + features: { allowlist: true, rls: true, rest: true }, + } +}) + +describe('iterateTableRows', () => { + it('issues a single query when the first page is short', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([ + { id: 1 }, + { id: 2 }, + ]) + + const out: any[] = [] + for await (const row of iterateTableRows( + 'users', + mockDataSource, + mockConfig, + 10 + )) { + out.push(row) + } + + expect(out).toEqual([{ id: 1 }, { id: 2 }]) + expect(executeOperation).toHaveBeenCalledTimes(1) + // Page-size and offset are passed as bound params, not interpolated. + expect(vi.mocked(executeOperation).mock.calls[0][0][0].params).toEqual([ + 10, 0, + ]) + }) + + it('pages until a short page terminates the loop', async () => { + // Two full pages of size 2, then a partial page of 1 row. + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ id: 1 }, { id: 2 }]) + .mockResolvedValueOnce([{ id: 3 }, { id: 4 }]) + .mockResolvedValueOnce([{ id: 5 }]) + + const out: any[] = [] + for await (const row of iterateTableRows( + 'users', + mockDataSource, + mockConfig, + 2 + )) { + out.push(row) + } + + expect(out.map((r) => r.id)).toEqual([1, 2, 3, 4, 5]) + expect(executeOperation).toHaveBeenCalledTimes(3) + // Offsets advance by the configured page size. + const offsets = vi + .mocked(executeOperation) + .mock.calls.map(([qs]) => qs[0].params?.[1]) + expect(offsets).toEqual([0, 2, 4]) + }) + + it('terminates immediately on an empty page', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([]) + + const out: any[] = [] + for await (const row of iterateTableRows( + 'empty', + mockDataSource, + mockConfig + )) { + out.push(row) + } + + expect(out).toEqual([]) + expect(executeOperation).toHaveBeenCalledTimes(1) + }) +}) + +describe('breathe', () => { + it('prefers scheduler.wait when available', async () => { + const wait = vi.fn().mockResolvedValue(undefined) + ;(globalThis as any).scheduler = { wait } + try { + await breathe() + expect(wait).toHaveBeenCalledWith(0) + } finally { + delete (globalThis as any).scheduler + } + }) + + it('falls back to setTimeout(0) without scheduler', async () => { + // Just confirm it resolves; the absence of scheduler is the path. + await expect(breathe()).resolves.toBeUndefined() + }) +}) + +describe('chunksToStream', () => { + it('encodes generator output as a UTF-8 byte stream', async () => { + async function* gen() { + yield 'hello, ' + yield 'world' + } + + const stream = chunksToStream(gen()) + const text = await new Response(stream).text() + expect(text).toBe('hello, world') + }) + + it('propagates generator errors to the stream consumer', async () => { + async function* gen() { + yield 'partial' + throw new Error('boom') + } + + const stream = chunksToStream(gen()) + await expect(new Response(stream).text()).rejects.toThrow('boom') + }) +}) diff --git a/src/export/streaming.ts b/src/export/streaming.ts new file mode 100644 index 0000000..2cefc08 --- /dev/null +++ b/src/export/streaming.ts @@ -0,0 +1,113 @@ +import { DataSource } from '../types' +import { StarbaseDBConfiguration } from '../handler' +import { executeOperation } from '.' + +/** + * Default page size when reading rows from a table for export. SQLite + DO + * round-trips are cheap, but the JS-side cost of holding rows in memory is the + * scaling bottleneck — 1000 rows per page keeps peak memory well under the + * 128 MB DO isolate limit even for very wide rows while still amortising query + * overhead. + */ +export const DEFAULT_PAGE_SIZE = 1000 + +/** + * Yield back to the runtime between pages. Cloudflare Workers expose + * `scheduler.wait(0)` as the canonical "let the event loop breathe" hook; + * `setTimeout(0)` is the universal fallback (vitest, node, browsers). + * + * Without this, exporting a 10 GB DB monopolises the isolate long enough that + * Cloudflare evicts the DO mid-stream — see issue #59 for context. + */ +export async function breathe(): Promise { + const sched = (globalThis as any).scheduler + if (sched && typeof sched.wait === 'function') { + await sched.wait(0) + return + } + await new Promise((resolve) => setTimeout(resolve, 0)) +} + +/** + * Async iterator over the rows of a single table. Pages with LIMIT/OFFSET so + * we never materialise more than `pageSize` rows in memory at once, and + * `breathe()`s between pages so the DO runtime stays responsive. + * + * We assume the caller has already validated the table name (handler does this + * via `hasTableName`); the dump endpoint reads names directly from + * `sqlite_master` which is trusted. We still avoid concatenating the page + * markers as identifiers — LIMIT/OFFSET take bound integers. + */ +export async function* iterateTableRows( + tableName: string, + dataSource: DataSource, + config: StarbaseDBConfiguration, + pageSize: number = DEFAULT_PAGE_SIZE +): AsyncGenerator { + let offset = 0 + while (true) { + const page = await executeOperation( + [ + { + sql: `SELECT * FROM ${tableName} LIMIT ? OFFSET ?;`, + params: [pageSize, offset], + }, + ], + dataSource, + config + ) + + if (!page || page.length === 0) return + + for (const row of page) yield row + + if (page.length < pageSize) return + offset += page.length + await breathe() + } +} + +/** + * Wrap an async generator of string chunks as a `ReadableStream` + * suitable for handing directly to `new Response(stream, ...)`. Errors thrown + * inside the generator propagate to the stream consumer (the HTTP client sees + * a truncated body, which is the correct signal mid-export). + */ +export function chunksToStream( + chunks: AsyncGenerator +): ReadableStream { + const encoder = new TextEncoder() + return new ReadableStream({ + async pull(controller) { + try { + const { value, done } = await chunks.next() + if (done) { + controller.close() + return + } + if (value) controller.enqueue(encoder.encode(value)) + } catch (err) { + controller.error(err) + } + }, + async cancel(reason) { + // Allow the generator to clean up if the client disconnects. + await chunks.return?.(undefined as any) + }, + }) +} + +export function streamingResponse( + stream: ReadableStream, + fileName: string, + contentType: string +): Response { + const headers = new Headers({ + 'Content-Type': contentType, + 'Content-Disposition': `attachment; filename="${fileName}"`, + // Hint to clients/proxies that we're streaming — discourages buffering. + 'Cache-Control': 'no-store', + 'Transfer-Encoding': 'chunked', + }) + return new Response(stream, { headers }) +}