From 91d25cec7bc0a3f05a1c63ce6add9c4b7b5ec18e Mon Sep 17 00:00:00 2001 From: jbiskur Date: Wed, 13 May 2026 16:43:39 +0100 Subject: [PATCH] fix(data-pump): self-heal main fetch loop on error and validate restart input MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bugs A+B from data-pathways outage 2026-05-13: when the fetch loop throws (transient API failure, invalid timeBucket from a corrupted restart command, etc.), the pump used to die permanently — only pod restart could recover. The processor loop already self-heals (v0.19.1, fragment bcb6423f-c5e7-48d6-b56c-5723671b8197); this mirrors that pattern for the main fetch loop. Also: restart() now validates state.timeBucket against ^\d{14}$ at the call-site (synchronous throw) so a bad command surfaces the error immediately instead of silently killing the pump later in the loop. The restartTo consumption is also try/catch-wrapped as belt-and-braces. Refs: outage chain bddefdd2-c377-472a-bc50-cd75f708f822 Co-Authored-By: Claude Opus 4.7 (1M context) --- deno.json | 2 +- src/data-pump/data-pump.ts | 57 ++++- test/tests/data-pump-restart.test.ts | 329 ++++++++++++++++++++++++++- 3 files changed, 373 insertions(+), 15 deletions(-) diff --git a/deno.json b/deno.json index ff5acab..8ed4f14 100644 --- a/deno.json +++ b/deno.json @@ -2,7 +2,7 @@ "$schema": "https://deno.land/x/deno/cli/schemas/config-file.v1.json", "name": "@flowcore/data-pump", "description": "Flowcore Data Pump", - "version": "0.21.1", + "version": "0.21.2", "license": "MIT", "exports": "./src/mod.ts", "publish": { diff --git a/src/data-pump/data-pump.ts b/src/data-pump/data-pump.ts index 6ecdece..69f13ec 100644 --- a/src/data-pump/data-pump.ts +++ b/src/data-pump/data-pump.ts @@ -97,6 +97,7 @@ export class FlowcoreDataPump { private failedCount = 0 private pulledCount = 0 private processLoopRestartAttempts = 0 + private mainLoopRestartAttempts = 0 private constructor( public readonly dataSource: FlowcoreDataSource, @@ -239,12 +240,38 @@ export class FlowcoreDataPump { return this.loop() } - void this.loop() - .then(() => callback()) - .catch((error) => callback(error)) + this.startMainLoop(callback) + } + + private startMainLoop(callback?: (error?: Error) => void): void { + this.loop() + .then(() => { + this.mainLoopRestartAttempts = 0 + callback?.() + }) + .catch((error) => { + this.logger?.error("Error in fetch loop", { error }) + if (!this.running) { + callback?.(error) + return + } + this.mainLoopRestartAttempts++ + const delay = Math.min(1_000 * Math.pow(2, this.mainLoopRestartAttempts - 1), 30_000) + this.logger?.warn(`Restarting fetch loop in ${delay}ms (attempt ${this.mainLoopRestartAttempts})`) + setTimeout(() => { + if (!this.running) { + callback?.(error) + return + } + this.startMainLoop(callback) + }, delay) + }) } public restart(state: FlowcoreDataPumpState, stopAt?: Date | null): void { + if (typeof state.timeBucket !== "string" || !state.timeBucket.match(/^\d{14}$/)) { + throw new Error(`Invalid timebucket: ${state.timeBucket}`) + } this.restartTo = state if (stopAt !== undefined) { this.options.stopAt = stopAt ?? undefined @@ -255,6 +282,7 @@ export class FlowcoreDataPump { public stop(isRestart = false): void { this.running = false this.processLoopRestartAttempts = 0 + this.mainLoopRestartAttempts = 0 this.buffer = [] this.updateMetricsGauges() this.pulseEmitter?.stop() @@ -307,6 +335,7 @@ export class FlowcoreDataPump { this.logger?.debug(`fetched ${events.length} events`) this.pulledCount += events.length + this.mainLoopRestartAttempts = 0 this.buffer.push(...events.map((event) => ({ event, status: "open" as const, deliveryCount: 0 }))) this.nextCursor = nextCursor this.updateMetricsGauges() @@ -347,14 +376,20 @@ export class FlowcoreDataPump { } while (this.running) if (this.restartTo) { - await this.dataSource.getTimeBuckets(true) - this.restartTo.timeBucket = (await this.dataSource.getClosestTimeBucket(this.restartTo.timeBucket)) ?? - format(startOfHour(utc(new Date())), "yyyyMMddHH0000") - this.nextCursor = undefined - this.bufferState = this.restartTo - this.restartTo = undefined - this.running = true - return this.loop() + try { + await this.dataSource.getTimeBuckets(true) + this.restartTo.timeBucket = (await this.dataSource.getClosestTimeBucket(this.restartTo.timeBucket)) ?? + format(startOfHour(utc(new Date())), "yyyyMMddHH0000") + this.nextCursor = undefined + this.bufferState = this.restartTo + this.restartTo = undefined + this.running = true + return this.loop() + } catch (error) { + this.logger?.error("Failed to consume restartTo, dropping it", { error }) + this.restartTo = undefined + return + } } this.logger?.info("Data pump stopped") diff --git a/test/tests/data-pump-restart.test.ts b/test/tests/data-pump-restart.test.ts index f82b74b..77ca253 100644 --- a/test/tests/data-pump-restart.test.ts +++ b/test/tests/data-pump-restart.test.ts @@ -1,9 +1,10 @@ -import { assertEquals } from "@std/assert" +import { assert, assertEquals, assertThrows } from "@std/assert" import { afterEach, beforeEach, describe, it } from "@std/testing/bdd" import { FakeTime } from "@std/testing/time" -import type { FlowcoreEvent } from "@flowcore/sdk" +import type { EventListOutput, FlowcoreEvent } from "@flowcore/sdk" import { FlowcoreDataPump } from "../../src/data-pump/data-pump.ts" -import type { FlowcoreDataPumpStateManager } from "../../src/data-pump/types.ts" +import { FlowcoreDataSource } from "../../src/data-pump/data-source.ts" +import type { FlowcoreDataPumpState, FlowcoreDataPumpStateManager } from "../../src/data-pump/types.ts" // #region Test Helpers @@ -85,6 +86,328 @@ describe("processLoop restart", { sanitizeOps: false, sanitizeResources: false } }) }) +// #region Main-loop self-heal helpers + +interface FakeDataSourceOptions { + timeBuckets?: string[] + getEventsImpl?: () => Promise +} + +class FakeDataSource extends FlowcoreDataSource { + public getEventsCalls = 0 + private readonly timeBucketsValue: string[] + private getEventsImpl: () => Promise + + constructor(opts: FakeDataSourceOptions = {}) { + super({ + auth: { apiKey: FAKE_API_KEY }, + dataSource: { + tenant: "test", + dataCore: "test-dc", + flowType: "test.0", + eventTypes: ["test.created.0"], + }, + baseUrlOverride: "http://localhost:99999", + noTranslation: true, + }) + this.timeBucketsValue = opts.timeBuckets ?? ["20260331120000", "20260331130000"] + this.getEventsImpl = opts.getEventsImpl ?? (() => Promise.resolve({ events: [], nextCursor: undefined })) + } + + public override getTimeBuckets(_force = false): Promise { + return Promise.resolve(this.timeBucketsValue) + } + + public override getClosestTimeBucket(timeBucket: string, getBefore = false): Promise { + if (!timeBucket.match(/^\d{14}$/)) { + return Promise.reject(new Error(`Invalid timebucket: ${timeBucket}`)) + } + if (getBefore) { + return Promise.resolve( + this.timeBucketsValue.findLast((t) => Number.parseFloat(t) <= Number.parseFloat(timeBucket)) ?? + this.timeBucketsValue[this.timeBucketsValue.length - 1] ?? null, + ) + } + return Promise.resolve( + this.timeBucketsValue.find((t) => Number.parseFloat(t) >= Number.parseFloat(timeBucket)) ?? + this.timeBucketsValue[this.timeBucketsValue.length - 1] ?? null, + ) + } + + public override getNextTimeBucket(timeBucket: string): Promise { + const index = this.timeBucketsValue.indexOf(timeBucket) + if (index === -1 || index + 1 >= this.timeBucketsValue.length) { + return Promise.resolve(null) + } + return Promise.resolve(this.timeBucketsValue[index + 1]) + } + + public override getEvents(): Promise { + this.getEventsCalls++ + return this.getEventsImpl() + } + + public setGetEventsImpl(impl: () => Promise): void { + this.getEventsImpl = impl + } +} + +function createPumpWithFakeDataSource( + fakeDataSource: FakeDataSource, + logger: ReturnType["logger"], + stateManager?: FlowcoreDataPumpStateManager, +): FlowcoreDataPump { + return FlowcoreDataPump.create( + { + auth: { apiKey: FAKE_API_KEY }, + dataSource: { + tenant: "test", + dataCore: "test-dc", + flowType: "test.0", + eventTypes: ["test.created.0"], + }, + stateManager: stateManager ?? createMockStateManager(), + notifier: { type: "poller", intervalMs: 60_000 }, + logger, + baseUrlOverride: "http://localhost:99999", + noTranslation: true, + }, + fakeDataSource, + ) +} + +// #endregion + +describe("startMainLoop self-heal", { sanitizeOps: false, sanitizeResources: false }, () => { + let fakeTime: FakeTime + + beforeEach(() => { + fakeTime = new FakeTime() + }) + + afterEach(() => { + fakeTime.restore() + }) + + it("(a) loop throws once then self-restarts and resets attempts on next successful fetch", async () => { + const { logger, logs } = createMockLogger() + let throwsLeft = 1 + const fakeDataSource = new FakeDataSource({ + getEventsImpl: () => { + if (throwsLeft-- > 0) { + return Promise.reject(new Error("transient API failure")) + } + return Promise.resolve({ events: [], nextCursor: undefined }) + }, + }) + const pump = createPumpWithFakeDataSource(fakeDataSource, logger) + + let callbackError: Error | undefined + let callbackFired = false + await pump.start((err) => { + callbackFired = true + callbackError = err + }) + + // First call should have happened and thrown + await fakeTime.tickAsync(0) + // Backoff: attempt 1 → 1s + await fakeTime.tickAsync(1_000) + // Drain microtasks for the restarted loop to make a second call + await fakeTime.tickAsync(0) + + assert(fakeDataSource.getEventsCalls >= 2, `expected >=2 calls, got ${fakeDataSource.getEventsCalls}`) + assertEquals(pump.isRunning, true) + // mainLoopRestartAttempts reset to 0 after a successful pull + assertEquals((pump as unknown as { mainLoopRestartAttempts: number }).mainLoopRestartAttempts, 0) + // callback has NOT fired yet — pump is still running after self-heal + assertEquals(callbackFired, false) + + const restartLogs = logs.filter((l) => l.level === "warn" && l.message.includes("Restarting fetch loop")) + assertEquals(restartLogs.length, 1) + assert(restartLogs[0].message.includes("attempt 1")) + + pump.stop() + await fakeTime.tickAsync(60_000) + // After graceful stop, callback fires without error + assertEquals(callbackFired, true) + assertEquals(callbackError, undefined) + }) + + it("(b) seven successive failures exponential-backoff cap at 30s", async () => { + const { logger, logs } = createMockLogger() + const fakeDataSource = new FakeDataSource({ + getEventsImpl: () => Promise.reject(new Error("permanent failure")), + }) + const pump = createPumpWithFakeDataSource(fakeDataSource, logger) + + void pump.start(() => {}) + + // First call (attempt 1 scheduled after 1s) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 1) + + // 1s → 2nd call (attempt 1 scheduled, then attempt 2 after 2s) + await fakeTime.tickAsync(1_000) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 2) + + // 2s → 3rd call (4s next) + await fakeTime.tickAsync(2_000) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 3) + + // 4s → 4th call (8s next) + await fakeTime.tickAsync(4_000) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 4) + + // 8s → 5th call (16s next) + await fakeTime.tickAsync(8_000) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 5) + + // 16s → 6th call (30s capped next) + await fakeTime.tickAsync(16_000) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 6) + + // 30s → 7th call (30s capped next) + await fakeTime.tickAsync(30_000) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 7) + + // 30s → 8th call (cap held) + await fakeTime.tickAsync(30_000) + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 8) + + // Verify exact backoff sequence was logged + const restartDelays = logs + .filter((l) => l.level === "warn" && l.message.includes("Restarting fetch loop in")) + .map((l) => { + const match = l.message.match(/Restarting fetch loop in (\d+)ms/) + return match ? Number.parseInt(match[1], 10) : -1 + }) + assertEquals(restartDelays.slice(0, 7), [1_000, 2_000, 4_000, 8_000, 16_000, 30_000, 30_000]) + + pump.stop() + await fakeTime.tickAsync(60_000) + }) + + it("(c) stop() during pending retry → no further retry fires", async () => { + const { logger, logs } = createMockLogger() + const fakeDataSource = new FakeDataSource({ + getEventsImpl: () => Promise.reject(new Error("transient failure")), + }) + const pump = createPumpWithFakeDataSource(fakeDataSource, logger) + + void pump.start(() => {}) + + // First call fires → throws → schedules 1s retry + await fakeTime.tickAsync(0) + assertEquals(fakeDataSource.getEventsCalls, 1) + + // Stop BEFORE the 1s retry fires + pump.stop() + + // Advance past the retry window + await fakeTime.tickAsync(60_000) + + // No second call should have happened + assertEquals(fakeDataSource.getEventsCalls, 1) + const restartLogs = logs.filter((l) => l.level === "warn" && l.message.includes("Restarting fetch loop")) + // The one warning got scheduled before stop() — but the actual restart never runs + assert(restartLogs.length <= 1) + }) + + it("(d) restart() with ISO timebucket throws synchronously", async () => { + const { logger } = createMockLogger() + const fakeDataSource = new FakeDataSource() + const pump = createPumpWithFakeDataSource(fakeDataSource, logger) + + void pump.start(() => {}) + await fakeTime.tickAsync(0) + + assertThrows( + () => pump.restart({ timeBucket: "2026-05-12T13:13" } as FlowcoreDataPumpState), + Error, + "Invalid timebucket: 2026-05-12T13:13", + ) + + // Pump still running — restart() bailed before mutating state + assertEquals(pump.isRunning, true) + + pump.stop() + await fakeTime.tickAsync(60_000) + }) + + it("(e) poisoned restartTo causes loop to log and exit cleanly", async () => { + const { logger, logs } = createMockLogger() + const fakeDataSource = new FakeDataSource({ + getEventsImpl: () => Promise.resolve({ events: [], nextCursor: undefined }), + }) + const pump = createPumpWithFakeDataSource(fakeDataSource, logger) + + let callbackFired = false + let callbackError: Error | undefined + void pump.start((err) => { + callbackFired = true + callbackError = err + }) + + // Bypass restart() validation by mutating private state directly + const internals = pump as unknown as { + restartTo: FlowcoreDataPumpState + running: boolean + abortController?: AbortController + } + internals.restartTo = { timeBucket: "bad" } + // Trigger loop exit so restartTo consumption block runs + internals.running = false + // Abort any in-flight waiter so loop iteration progresses + internals.abortController?.abort() + + // Let loop iterations + microtasks settle + await fakeTime.tickAsync(2_000) + + // Loop should have exited cleanly without throwing + assertEquals(callbackFired, true) + assertEquals(callbackError, undefined) + // restartTo dropped on error + assertEquals((pump as unknown as { restartTo?: FlowcoreDataPumpState }).restartTo, undefined) + // Loop logged the error + const errorLogs = logs.filter((l) => l.level === "error" && l.message.includes("Failed to consume restartTo")) + assertEquals(errorLogs.length, 1) + }) + + it("(f) happy-path restart still works (regression)", async () => { + const { logger } = createMockLogger() + const fakeDataSource = new FakeDataSource({ + timeBuckets: ["20260101000000", "20260102000000", "20260331120000"], + getEventsImpl: () => Promise.resolve({ events: [], nextCursor: undefined }), + }) + const pump = createPumpWithFakeDataSource(fakeDataSource, logger) + + void pump.start(() => {}) + await fakeTime.tickAsync(0) + assert(fakeDataSource.getEventsCalls >= 1) + + // Valid 14-digit timebucket — should not throw + pump.restart({ timeBucket: "20260101000000" }) + + // Loop processes the restart and continues pulling + await fakeTime.tickAsync(1_000) + await fakeTime.tickAsync(0) + + assert(pump.isRunning) + assert(fakeDataSource.getEventsCalls >= 2) + + pump.stop() + await fakeTime.tickAsync(60_000) + }) +}) + describe("backoff formula", () => { it("should produce correct exponential sequence capped at 30s", () => { const delays = []