Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"$schema": "https://deno.land/x/deno/cli/schemas/config-file.v1.json",
"name": "@flowcore/data-pump",
"description": "Flowcore Data Pump",
"version": "0.21.1",
"version": "0.21.2",
"license": "MIT",
"exports": "./src/mod.ts",
"publish": {
Expand Down
57 changes: 46 additions & 11 deletions src/data-pump/data-pump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export class FlowcoreDataPump {
private failedCount = 0
private pulledCount = 0
private processLoopRestartAttempts = 0
private mainLoopRestartAttempts = 0

private constructor(
public readonly dataSource: FlowcoreDataSource,
Expand Down Expand Up @@ -239,12 +240,38 @@ export class FlowcoreDataPump {
return this.loop()
}

void this.loop()
.then(() => callback())
.catch((error) => callback(error))
this.startMainLoop(callback)
}

private startMainLoop(callback?: (error?: Error) => void): void {
this.loop()
.then(() => {
this.mainLoopRestartAttempts = 0
callback?.()
})
.catch((error) => {
this.logger?.error("Error in fetch loop", { error })
if (!this.running) {
callback?.(error)
return
}
this.mainLoopRestartAttempts++
const delay = Math.min(1_000 * Math.pow(2, this.mainLoopRestartAttempts - 1), 30_000)
this.logger?.warn(`Restarting fetch loop in ${delay}ms (attempt ${this.mainLoopRestartAttempts})`)
setTimeout(() => {
if (!this.running) {
callback?.(error)
return
}
this.startMainLoop(callback)
}, delay)
})
}

public restart(state: FlowcoreDataPumpState, stopAt?: Date | null): void {
if (typeof state.timeBucket !== "string" || !state.timeBucket.match(/^\d{14}$/)) {
throw new Error(`Invalid timebucket: ${state.timeBucket}`)
}
this.restartTo = state
if (stopAt !== undefined) {
this.options.stopAt = stopAt ?? undefined
Expand All @@ -255,6 +282,7 @@ export class FlowcoreDataPump {
public stop(isRestart = false): void {
this.running = false
this.processLoopRestartAttempts = 0
this.mainLoopRestartAttempts = 0
this.buffer = []
this.updateMetricsGauges()
this.pulseEmitter?.stop()
Expand Down Expand Up @@ -307,6 +335,7 @@ export class FlowcoreDataPump {
this.logger?.debug(`fetched ${events.length} events`)

this.pulledCount += events.length
this.mainLoopRestartAttempts = 0
this.buffer.push(...events.map((event) => ({ event, status: "open" as const, deliveryCount: 0 })))
this.nextCursor = nextCursor
this.updateMetricsGauges()
Expand Down Expand Up @@ -347,14 +376,20 @@ export class FlowcoreDataPump {
} while (this.running)

if (this.restartTo) {
await this.dataSource.getTimeBuckets(true)
this.restartTo.timeBucket = (await this.dataSource.getClosestTimeBucket(this.restartTo.timeBucket)) ??
format(startOfHour(utc(new Date())), "yyyyMMddHH0000")
this.nextCursor = undefined
this.bufferState = this.restartTo
this.restartTo = undefined
this.running = true
return this.loop()
try {
await this.dataSource.getTimeBuckets(true)
this.restartTo.timeBucket = (await this.dataSource.getClosestTimeBucket(this.restartTo.timeBucket)) ??
format(startOfHour(utc(new Date())), "yyyyMMddHH0000")
this.nextCursor = undefined
this.bufferState = this.restartTo
this.restartTo = undefined
this.running = true
return this.loop()
} catch (error) {
this.logger?.error("Failed to consume restartTo, dropping it", { error })
this.restartTo = undefined
return
}
}

this.logger?.info("Data pump stopped")
Expand Down
Loading
Loading