diff --git a/packages/sync-engine/src/database/postgres.ts b/packages/sync-engine/src/database/postgres.ts index 9381d5a6..98c01524 100644 --- a/packages/sync-engine/src/database/postgres.ts +++ b/packages/sync-engine/src/database/postgres.ts @@ -770,18 +770,20 @@ export class PostgresClient { intervalSeconds: number ): Promise<{ accountId: string; runStartedAt: Date } | null> { const result = await this.query( - `SELECT r."_account_id", r.started_at - FROM "${this.syncSchema}"."_sync_runs" r - WHERE r."_account_id" = $1 + `SELECT r.account_id, r.started_at + FROM "${this.syncSchema}"."sync_runs" r + WHERE r.account_id = $1 AND r.closed_at IS NOT NULL AND r.closed_at >= now() - make_interval(secs => $2) + AND r.status = 'complete' + ORDER BY r.closed_at DESC LIMIT 1`, [accountId, intervalSeconds] ) if (result.rows.length === 0) return null const row = result.rows[0] - return { accountId: row._account_id, runStartedAt: row.started_at } + return { accountId: row.account_id, runStartedAt: row.started_at } } /** diff --git a/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts b/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts index 1338c8a1..742dae93 100644 --- a/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts +++ b/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts @@ -127,6 +127,57 @@ describe('Observable Sync System Methods', () => { }) }) + describe('getCompletedRun', () => { + it('should ignore closed runs that have errored object runs', async () => { + const run = await postgresClient.getOrCreateSyncRun(testAccountId) + await postgresClient.createObjectRuns(run!.accountId, run!.runStartedAt, ['customer']) + await postgresClient.tryStartObjectSync(run!.accountId, run!.runStartedAt, 'customer') + await postgresClient.failObjectSync( + run!.accountId, + run!.runStartedAt, + 'customer', + 'Rate limited' + ) + + const result = await postgresClient.getCompletedRun(testAccountId, 24 * 60 * 60) + + expect(result).toBeNull() + }) + + it('should return the most recent successful completed run', async () => { + const olderRun = await postgresClient.getOrCreateSyncRun(testAccountId) + await postgresClient.createObjectRuns(olderRun!.accountId, olderRun!.runStartedAt, ['customer']) + await postgresClient.tryStartObjectSync( + olderRun!.accountId, + olderRun!.runStartedAt, + 'customer' + ) + await postgresClient.completeObjectSync( + olderRun!.accountId, + olderRun!.runStartedAt, + 'customer' + ) + + const newerStartedAt = new Date(olderRun!.runStartedAt.getTime() + 1_000) + await db.pool.query( + `INSERT INTO stripe._sync_runs ("_account_id", started_at, closed_at, max_concurrent, triggered_by) + VALUES ($1, $2, $2 + interval '1 second', 3, 'test')`, + [testAccountId, newerStartedAt] + ) + await db.pool.query( + `INSERT INTO stripe._sync_obj_runs ("_account_id", run_started_at, object, status, processed_count, started_at, completed_at) + VALUES ($1, $2, 'invoice', 'complete', 1, $2, $2 + interval '1 second')`, + [testAccountId, newerStartedAt] + ) + + const result = await postgresClient.getCompletedRun(testAccountId, 24 * 60 * 60) + + expect(result).not.toBeNull() + expect(result!.accountId).toBe(testAccountId) + expect(result!.runStartedAt.getTime()).toBe(newerStartedAt.getTime()) + }) + }) + describe('createObjectRuns', () => { it('should create object run entries for each object', async () => { const run = await postgresClient.getOrCreateSyncRun(testAccountId)