From dc244f29daf9424cab34a81518b33368f034cc12 Mon Sep 17 00:00:00 2001 From: ayshadogo Date: Tue, 16 Jun 2026 14:54:41 +0100 Subject: [PATCH 1/3] Implement end-to-end integration tests for stream processing pipeline --- .../integration/pipeline.integration.test.ts | 61 +++++ .../integration/worker.integration.test.ts | 208 ++++++++++++++++++ xstreamroll-processing/package.json | 12 +- 3 files changed, 278 insertions(+), 3 deletions(-) create mode 100644 xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts create mode 100644 xstreamroll-processing/__tests__/integration/worker.integration.test.ts diff --git a/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts b/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts new file mode 100644 index 0000000..445e2e0 --- /dev/null +++ b/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts @@ -0,0 +1,61 @@ +import nock from "nock" + +jest.setTimeout(20000) + +afterEach(() => { + nock.cleanAll() + jest.resetModules() +}) + +test("filtered events are not published (integration)", async () => { + // We'll mock the pipeline's EventFilter so the worker drops events + // with type === 'blocked'. Do the mock before importing the worker. + jest.doMock("../../src/pipeline", () => { + return { + EventFilter: class { + allow(event: any) { + return event.data?.type !== "blocked" + } + }, + } + }) + + process.env.API_URL = "http://mock-api" + process.env.POLL_INTERVAL_MS = "50" + + const now = new Date().toISOString() + const good = { streamId: "s", data: { type: "ok" }, timestamp: now } + const bad = { streamId: "s2", data: { type: "blocked" }, timestamp: now } + + let sent = false + nock("http://mock-api") + .get("/streams/pending") + .times(100) + .reply(() => { + if (!sent) { + sent = true + return [200, [good, bad]] + } + return [200, []] + }) + + const published: any[] = [] + const publishedPromise = new Promise((resolve) => { + nock("http://mock-api") + .post("/streams/processed") + .reply(200, function (_u, body) { + published.push(body) + resolve() + return "ok" + }) + }) + + const worker = require("../../src/worker") + await publishedPromise + await worker.shutdown("test") + + // Only the unblocked event should have been published + expect(published.length).toBeGreaterThanOrEqual(1) + expect(published.some((p) => p.streamId === good.streamId)).toBe(true) + expect(published.some((p) => p.streamId === bad.streamId)).toBe(false) +}) diff --git a/xstreamroll-processing/__tests__/integration/worker.integration.test.ts b/xstreamroll-processing/__tests__/integration/worker.integration.test.ts new file mode 100644 index 0000000..f7425ea --- /dev/null +++ b/xstreamroll-processing/__tests__/integration/worker.integration.test.ts @@ -0,0 +1,208 @@ +import nock from "nock" + +jest.setTimeout(20000) + +function awaitWithTimeout(p: Promise, ms: number, msg = "timeout") { + return Promise.race([ + p, + new Promise((_, rej) => setTimeout(() => rej(new Error(msg)), ms)), + ]) +} + +afterEach(() => { + nock.cleanAll() + jest.resetModules() +}) + +test("single event: polled -> session -> published", async () => { + // Arrange env and nock before importing the worker (worker starts on import) + process.env.API_URL = "http://mock-api" + process.env.POLL_INTERVAL_MS = "50" + + const now = new Date().toISOString() + const event = { streamId: "s1", data: { type: "t1", v: 1 }, timestamp: now } + + let sent = false + nock("http://mock-api") + .get("/streams/pending") + .times(100) + .reply(() => { + if (!sent) { + sent = true + return [200, [event]] + } + return [200, []] + }) + + let publishedBody: any = null + const publishedPromise = new Promise((resolve) => { + nock("http://mock-api") + .post("/streams/processed") + .reply(200, function (_uri, body) { + publishedBody = body + resolve() + return "ok" + }) + }) + + // Act: import worker (starts polling) + const worker = require("../../src/worker") + // wait for publish + await awaitWithTimeout(publishedPromise, 5000, "publish timeout") + + // shutdown and assert + await worker.shutdown("test") + expect(publishedBody).not.toBeNull() + expect(publishedBody.streamId).toBe("s1") +}) + +test("multiple events same stream -> routed to same session", async () => { + process.env.API_URL = "http://mock-api" + process.env.POLL_INTERVAL_MS = "50" + + const now = new Date().toISOString() + const e1 = { streamId: "same", data: { type: "t1", i: 1 }, timestamp: now } + const e2 = { streamId: "same", data: { type: "t1", i: 2 }, timestamp: now } + + let sentOnce = false + nock("http://mock-api") + .get("/streams/pending") + .times(100) + .reply(() => { + if (!sentOnce) { + sentOnce = true + return [200, [e1, e2]] + } + return [200, []] + }) + + const received: any[] = [] + const publishedPromise = new Promise((resolve) => { + nock("http://mock-api") + .post("/streams/processed") + .times(2) + .reply(200, function (_u, body) { + received.push(body) + if (received.length === 2) resolve() + return "ok" + }) + }) + + const worker = require("../../src/worker") + await awaitWithTimeout(publishedPromise, 5000, "publish timeout") + await worker.shutdown("test") + + expect(received.length).toBe(2) + expect(received[0].sessionId).toBe(received[1].sessionId) + expect(received[0].streamId).toBe("same") +}) + +test("capacity exceeded -> event dropped, not published", async () => { + process.env.API_URL = "http://mock-api" + process.env.POLL_INTERVAL_MS = "50" + process.env.MAX_CONCURRENT_SESSIONS = "1" + + const now = new Date().toISOString() + const a = { streamId: "a", data: { type: "t" }, timestamp: now } + const b = { streamId: "b", data: { type: "t" }, timestamp: now } + + let once = false + nock("http://mock-api") + .get("/streams/pending") + .times(100) + .reply(() => { + if (!once) { + once = true + return [200, [a, b]] + } + return [200, []] + }) + + const published: any[] = [] + const publishedPromise = new Promise((resolve) => { + nock("http://mock-api") + .post("/streams/processed") + .reply(200, function (_u, body) { + published.push(body) + // resolve after at most one publish (capacity should drop the other) + if (published.length >= 1) resolve() + return "ok" + }) + }) + + const worker = require("../../src/worker") + await awaitWithTimeout(publishedPromise, 5000, "publish timeout") + await worker.shutdown("test") + + expect(published.length).toBe(1) +}) + +test("graceful shutdown flushes pending publishes", async () => { + process.env.API_URL = "http://mock-api" + process.env.POLL_INTERVAL_MS = "50" + + const now = new Date().toISOString() + const event = { streamId: "slow", data: { type: "t" }, timestamp: now } + + let sent = false + nock("http://mock-api") + .get("/streams/pending") + .times(100) + .reply(() => { + if (!sent) { + sent = true + return [200, [event]] + } + return [200, []] + }) + + let published = false + nock("http://mock-api") + .post("/streams/processed") + .reply(200, async function () { + // simulate slow publish + await new Promise((r) => setTimeout(r, 200)) + published = true + return "ok" + }) + + const worker = require("../../src/worker") + + // give worker a moment to pick up the event + await new Promise((r) => setTimeout(r, 100)) + // request shutdown and wait for it to complete (should wait for publish) + await worker.shutdown("test") + expect(published).toBe(true) +}) + +test("api error then recovery -> worker retries next poll", async () => { + process.env.API_URL = "http://mock-api" + process.env.POLL_INTERVAL_MS = "50" + + const now = new Date().toISOString() + const event = { streamId: "r1", data: { type: "t" }, timestamp: now } + + let calls = 0 + nock("http://mock-api") + .get("/streams/pending") + .times(100) + .reply(() => { + calls++ + if (calls === 1) return [500, "fail"] + if (calls === 2) return [200, []] + return [200, [event]] + }) + + const publishedPromise = new Promise((resolve) => { + nock("http://mock-api") + .post("/streams/processed") + .reply(200, function () { + resolve() + return "ok" + }) + }) + + const worker = require("../../src/worker") + await awaitWithTimeout(publishedPromise, 5000, "publish timeout") + await worker.shutdown("test") +}) diff --git a/xstreamroll-processing/package.json b/xstreamroll-processing/package.json index b5df5ec..9628dc5 100644 --- a/xstreamroll-processing/package.json +++ b/xstreamroll-processing/package.json @@ -14,10 +14,15 @@ "jest": { "preset": "ts-jest", "testEnvironment": "node", - "testMatch": ["**/__tests__/**/*.test.ts"], + "testMatch": [ + "**/__tests__/**/*.test.ts" + ], "forceExit": true, "coverageDirectory": "coverage", - "coverageReporters": ["lcov", "text"] + "coverageReporters": [ + "lcov", + "text" + ] }, "dependencies": { "axios": "^1.6.0", @@ -29,6 +34,7 @@ "jest": "^29.7.0", "ts-jest": "^29.2.5", "ts-node": "^10.9.0", - "typescript": "^5.3.0" + "typescript": "^5.3.0", + "nock": "^13.3.0" } } From 5334dd985e0877479d98eb2cf039790c865b0b6b Mon Sep 17 00:00:00 2001 From: ayshadogo Date: Tue, 16 Jun 2026 15:04:46 +0100 Subject: [PATCH 2/3] Implement end-to-end integration tests for stream processing pipeline --- .../integration/pipeline.integration.test.ts | 9 ++++--- .../integration/worker.integration.test.ts | 27 ++++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts b/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts index 445e2e0..5427860 100644 --- a/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts +++ b/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts @@ -1,4 +1,5 @@ import nock from "nock" +import type { StreamEvent, ProcessedStreamEvent } from "../../src/session" jest.setTimeout(20000) @@ -13,7 +14,7 @@ test("filtered events are not published (integration)", async () => { jest.doMock("../../src/pipeline", () => { return { EventFilter: class { - allow(event: any) { + allow(event: StreamEvent) { return event.data?.type !== "blocked" } }, @@ -39,7 +40,7 @@ test("filtered events are not published (integration)", async () => { return [200, []] }) - const published: any[] = [] + const published: ProcessedStreamEvent[] = [] const publishedPromise = new Promise((resolve) => { nock("http://mock-api") .post("/streams/processed") @@ -50,9 +51,9 @@ test("filtered events are not published (integration)", async () => { }) }) - const worker = require("../../src/worker") + const workerMod = await import("../../src/worker") await publishedPromise - await worker.shutdown("test") + await workerMod.shutdown("test") // Only the unblocked event should have been published expect(published.length).toBeGreaterThanOrEqual(1) diff --git a/xstreamroll-processing/__tests__/integration/worker.integration.test.ts b/xstreamroll-processing/__tests__/integration/worker.integration.test.ts index f7425ea..d329a3c 100644 --- a/xstreamroll-processing/__tests__/integration/worker.integration.test.ts +++ b/xstreamroll-processing/__tests__/integration/worker.integration.test.ts @@ -1,4 +1,5 @@ import nock from "nock" +import type { ProcessedStreamEvent } from "../../src/session" jest.setTimeout(20000) @@ -34,7 +35,7 @@ test("single event: polled -> session -> published", async () => { return [200, []] }) - let publishedBody: any = null + let publishedBody: ProcessedStreamEvent | null = null const publishedPromise = new Promise((resolve) => { nock("http://mock-api") .post("/streams/processed") @@ -46,12 +47,12 @@ test("single event: polled -> session -> published", async () => { }) // Act: import worker (starts polling) - const worker = require("../../src/worker") + const workerMod = await import("../../src/worker") // wait for publish await awaitWithTimeout(publishedPromise, 5000, "publish timeout") // shutdown and assert - await worker.shutdown("test") + await workerMod.shutdown("test") expect(publishedBody).not.toBeNull() expect(publishedBody.streamId).toBe("s1") }) @@ -76,7 +77,7 @@ test("multiple events same stream -> routed to same session", async () => { return [200, []] }) - const received: any[] = [] + const received: ProcessedStreamEvent[] = [] const publishedPromise = new Promise((resolve) => { nock("http://mock-api") .post("/streams/processed") @@ -88,9 +89,9 @@ test("multiple events same stream -> routed to same session", async () => { }) }) - const worker = require("../../src/worker") + const workerMod = await import("../../src/worker") await awaitWithTimeout(publishedPromise, 5000, "publish timeout") - await worker.shutdown("test") + await workerMod.shutdown("test") expect(received.length).toBe(2) expect(received[0].sessionId).toBe(received[1].sessionId) @@ -118,7 +119,7 @@ test("capacity exceeded -> event dropped, not published", async () => { return [200, []] }) - const published: any[] = [] + const published: ProcessedStreamEvent[] = [] const publishedPromise = new Promise((resolve) => { nock("http://mock-api") .post("/streams/processed") @@ -130,9 +131,9 @@ test("capacity exceeded -> event dropped, not published", async () => { }) }) - const worker = require("../../src/worker") + const workerMod = await import("../../src/worker") await awaitWithTimeout(publishedPromise, 5000, "publish timeout") - await worker.shutdown("test") + await workerMod.shutdown("test") expect(published.length).toBe(1) }) @@ -166,12 +167,12 @@ test("graceful shutdown flushes pending publishes", async () => { return "ok" }) - const worker = require("../../src/worker") + const workerMod = await import("../../src/worker") // give worker a moment to pick up the event await new Promise((r) => setTimeout(r, 100)) // request shutdown and wait for it to complete (should wait for publish) - await worker.shutdown("test") + await workerMod.shutdown("test") expect(published).toBe(true) }) @@ -202,7 +203,7 @@ test("api error then recovery -> worker retries next poll", async () => { }) }) - const worker = require("../../src/worker") + const workerMod = await import("../../src/worker") await awaitWithTimeout(publishedPromise, 5000, "publish timeout") - await worker.shutdown("test") + await workerMod.shutdown("test") }) From dd0eafa7e7a598315dfc5be9e78dbfcbbc53d421 Mon Sep 17 00:00:00 2001 From: ayshadogo Date: Tue, 16 Jun 2026 15:26:55 +0100 Subject: [PATCH 3/3] fixed CI workflow failure --- .../integration/pipeline.integration.test.ts | 27 ++++++++--- .../integration/worker.integration.test.ts | 45 ++++++++++++------- xstreamroll-processing/src/worker.ts | 23 +++++++--- 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts b/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts index 5427860..ed29a06 100644 --- a/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts +++ b/xstreamroll-processing/__tests__/integration/pipeline.integration.test.ts @@ -3,9 +3,26 @@ import type { StreamEvent, ProcessedStreamEvent } from "../../src/session" jest.setTimeout(20000) -afterEach(() => { +let workerMod: { shutdown(signal: string): Promise } | null = null +let exitSpy: jest.SpyInstance | null = null + +beforeEach(() => { + exitSpy = jest + .spyOn(process, "exit") + .mockImplementation((() => undefined) as never) +}) + +afterEach(async () => { + if (workerMod) { + await workerMod.shutdown("test") + workerMod = null + } + exitSpy?.mockRestore() + exitSpy = null nock.cleanAll() + jest.clearAllTimers() jest.resetModules() + await new Promise((r) => setTimeout(r, 100)) }) test("filtered events are not published (integration)", async () => { @@ -21,6 +38,7 @@ test("filtered events are not published (integration)", async () => { } }) + process.env.NODE_ENV = "test" process.env.API_URL = "http://mock-api" process.env.POLL_INTERVAL_MS = "50" @@ -44,16 +62,15 @@ test("filtered events are not published (integration)", async () => { const publishedPromise = new Promise((resolve) => { nock("http://mock-api") .post("/streams/processed") - .reply(200, function (_u, body) { - published.push(body) + .reply(200, function (_u, body: unknown) { + published.push(body as ProcessedStreamEvent) resolve() return "ok" }) }) - const workerMod = await import("../../src/worker") + workerMod = await import("../../src/worker") await publishedPromise - await workerMod.shutdown("test") // Only the unblocked event should have been published expect(published.length).toBeGreaterThanOrEqual(1) diff --git a/xstreamroll-processing/__tests__/integration/worker.integration.test.ts b/xstreamroll-processing/__tests__/integration/worker.integration.test.ts index d329a3c..2dd11eb 100644 --- a/xstreamroll-processing/__tests__/integration/worker.integration.test.ts +++ b/xstreamroll-processing/__tests__/integration/worker.integration.test.ts @@ -10,13 +10,31 @@ function awaitWithTimeout(p: Promise, ms: number, msg = "timeout") { ]) } -afterEach(() => { +let workerMod: { shutdown(signal: string): Promise } | null = null +let exitSpy: jest.SpyInstance | null = null + +beforeEach(() => { + exitSpy = jest + .spyOn(process, "exit") + .mockImplementation((() => undefined) as never) +}) + +afterEach(async () => { + if (workerMod) { + await workerMod.shutdown("test") + workerMod = null + } + exitSpy?.mockRestore() + exitSpy = null nock.cleanAll() + jest.clearAllTimers() jest.resetModules() + await new Promise((r) => setTimeout(r, 100)) }) test("single event: polled -> session -> published", async () => { // Arrange env and nock before importing the worker (worker starts on import) + process.env.NODE_ENV = "test" process.env.API_URL = "http://mock-api" process.env.POLL_INTERVAL_MS = "50" @@ -39,22 +57,21 @@ test("single event: polled -> session -> published", async () => { const publishedPromise = new Promise((resolve) => { nock("http://mock-api") .post("/streams/processed") - .reply(200, function (_uri, body) { - publishedBody = body + .reply(200, function (_uri, body: unknown) { + publishedBody = body as ProcessedStreamEvent resolve() return "ok" }) }) // Act: import worker (starts polling) - const workerMod = await import("../../src/worker") + workerMod = await import("../../src/worker") // wait for publish await awaitWithTimeout(publishedPromise, 5000, "publish timeout") - // shutdown and assert - await workerMod.shutdown("test") expect(publishedBody).not.toBeNull() - expect(publishedBody.streamId).toBe("s1") + const actual = publishedBody as unknown as ProcessedStreamEvent + expect(actual.streamId).toBe("s1") }) test("multiple events same stream -> routed to same session", async () => { @@ -82,16 +99,15 @@ test("multiple events same stream -> routed to same session", async () => { nock("http://mock-api") .post("/streams/processed") .times(2) - .reply(200, function (_u, body) { - received.push(body) + .reply(200, function (_u, body: unknown) { + received.push(body as ProcessedStreamEvent) if (received.length === 2) resolve() return "ok" }) }) - const workerMod = await import("../../src/worker") + workerMod = await import("../../src/worker") await awaitWithTimeout(publishedPromise, 5000, "publish timeout") - await workerMod.shutdown("test") expect(received.length).toBe(2) expect(received[0].sessionId).toBe(received[1].sessionId) @@ -123,17 +139,16 @@ test("capacity exceeded -> event dropped, not published", async () => { const publishedPromise = new Promise((resolve) => { nock("http://mock-api") .post("/streams/processed") - .reply(200, function (_u, body) { - published.push(body) + .reply(200, function (_u, body: unknown) { + published.push(body as ProcessedStreamEvent) // resolve after at most one publish (capacity should drop the other) if (published.length >= 1) resolve() return "ok" }) }) - const workerMod = await import("../../src/worker") + workerMod = await import("../../src/worker") await awaitWithTimeout(publishedPromise, 5000, "publish timeout") - await workerMod.shutdown("test") expect(published.length).toBe(1) }) diff --git a/xstreamroll-processing/src/worker.ts b/xstreamroll-processing/src/worker.ts index b094a6a..268f4b3 100644 --- a/xstreamroll-processing/src/worker.ts +++ b/xstreamroll-processing/src/worker.ts @@ -38,7 +38,9 @@ let shuttingDown = false async function pollOnce(): Promise { let events: StreamEvent[] = [] try { - const response = await axiosInstance.get(`${API_URL}/streams/pending`) + const response = await axiosInstance.get( + `${API_URL}/streams/pending`, + ) events = Array.isArray(response.data) ? response.data : [] } catch (err) { const message = err instanceof Error ? err.message : String(err) @@ -47,7 +49,11 @@ async function pollOnce(): Promise { } for (const event of events) { - if (!event || typeof event.streamId !== "string" || event.streamId.length === 0) { + if ( + !event || + typeof event.streamId !== "string" || + event.streamId.length === 0 + ) { console.warn(`[${WORKER_ID}] dropping malformed event`, event) continue } @@ -86,7 +92,9 @@ async function start(): Promise { void loop() } -const gracefulShutdown = new GracefulShutdown({ timeoutMs: 15_000 }) +const gracefulShutdown = new GracefulShutdown({ + timeoutMs: 15_000, +}) gracefulShutdown.register({ name: "stop poll loop", @@ -111,14 +119,19 @@ gracefulShutdown.register({ }, }) -gracefulShutdown.install() +if (env.NODE_ENV !== "test") { + gracefulShutdown.install() +} /** Exported for testing: triggers the graceful-shutdown sequence. */ export const shutdown = (signal: string): Promise => gracefulShutdown.requestShutdown(signal as ShutdownReason) function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)) + return new Promise((resolve) => { + const timer = setTimeout(() => resolve(), ms) + if (typeof timer.unref === "function") timer.unref() + }) } void start()