Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import nock from "nock"
import type { StreamEvent, ProcessedStreamEvent } from "../../src/session"

jest.setTimeout(20000)

let workerMod: { shutdown(signal: string): Promise<void> } | null = null
let exitSpy: jest.SpyInstance | null = null

beforeEach(() => {
exitSpy = jest
.spyOn(process, "exit")
.mockImplementation((() => undefined) as never)
})

afterEach(async () => {
if (workerMod) {
await workerMod.shutdown("test")
workerMod = null
}
exitSpy?.mockRestore()
exitSpy = null
nock.cleanAll()
jest.clearAllTimers()
jest.resetModules()
await new Promise((r) => setTimeout(r, 100))
})

test("filtered events are not published (integration)", async () => {
// We'll mock the pipeline's EventFilter so the worker drops events
// with type === 'blocked'. Do the mock before importing the worker.
jest.doMock("../../src/pipeline", () => {
return {
EventFilter: class {
allow(event: StreamEvent) {
return event.data?.type !== "blocked"
}
},
}
})

process.env.NODE_ENV = "test"
process.env.API_URL = "http://mock-api"
process.env.POLL_INTERVAL_MS = "50"

const now = new Date().toISOString()
const good = { streamId: "s", data: { type: "ok" }, timestamp: now }
const bad = { streamId: "s2", data: { type: "blocked" }, timestamp: now }

let sent = false
nock("http://mock-api")
.get("/streams/pending")
.times(100)
.reply(() => {
if (!sent) {
sent = true
return [200, [good, bad]]
}
return [200, []]
})

const published: ProcessedStreamEvent[] = []
const publishedPromise = new Promise<void>((resolve) => {
nock("http://mock-api")
.post("/streams/processed")
.reply(200, function (_u, body: unknown) {
published.push(body as ProcessedStreamEvent)
resolve()
return "ok"
})
})

workerMod = await import("../../src/worker")
await publishedPromise

// Only the unblocked event should have been published
expect(published.length).toBeGreaterThanOrEqual(1)
expect(published.some((p) => p.streamId === good.streamId)).toBe(true)
expect(published.some((p) => p.streamId === bad.streamId)).toBe(false)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import nock from "nock"
import type { ProcessedStreamEvent } from "../../src/session"

jest.setTimeout(20000)

function awaitWithTimeout<T>(p: Promise<T>, ms: number, msg = "timeout") {
return Promise.race([
p,
new Promise<T>((_, rej) => setTimeout(() => rej(new Error(msg)), ms)),
])
}

let workerMod: { shutdown(signal: string): Promise<void> } | null = null
let exitSpy: jest.SpyInstance | null = null

beforeEach(() => {
exitSpy = jest
.spyOn(process, "exit")
.mockImplementation((() => undefined) as never)
})

afterEach(async () => {
if (workerMod) {
await workerMod.shutdown("test")
workerMod = null
}
exitSpy?.mockRestore()
exitSpy = null
nock.cleanAll()
jest.clearAllTimers()
jest.resetModules()
await new Promise((r) => setTimeout(r, 100))
})

test("single event: polled -> session -> published", async () => {
// Arrange env and nock before importing the worker (worker starts on import)
process.env.NODE_ENV = "test"
process.env.API_URL = "http://mock-api"
process.env.POLL_INTERVAL_MS = "50"

const now = new Date().toISOString()
const event = { streamId: "s1", data: { type: "t1", v: 1 }, timestamp: now }

let sent = false
nock("http://mock-api")
.get("/streams/pending")
.times(100)
.reply(() => {
if (!sent) {
sent = true
return [200, [event]]
}
return [200, []]
})

let publishedBody: ProcessedStreamEvent | null = null
const publishedPromise = new Promise<void>((resolve) => {
nock("http://mock-api")
.post("/streams/processed")
.reply(200, function (_uri, body: unknown) {
publishedBody = body as ProcessedStreamEvent
resolve()
return "ok"
})
})

// Act: import worker (starts polling)
workerMod = await import("../../src/worker")
// wait for publish
await awaitWithTimeout(publishedPromise, 5000, "publish timeout")

expect(publishedBody).not.toBeNull()
const actual = publishedBody as unknown as ProcessedStreamEvent
expect(actual.streamId).toBe("s1")
})

test("multiple events same stream -> routed to same session", async () => {
process.env.API_URL = "http://mock-api"
process.env.POLL_INTERVAL_MS = "50"

const now = new Date().toISOString()
const e1 = { streamId: "same", data: { type: "t1", i: 1 }, timestamp: now }
const e2 = { streamId: "same", data: { type: "t1", i: 2 }, timestamp: now }

let sentOnce = false
nock("http://mock-api")
.get("/streams/pending")
.times(100)
.reply(() => {
if (!sentOnce) {
sentOnce = true
return [200, [e1, e2]]
}
return [200, []]
})

const received: ProcessedStreamEvent[] = []
const publishedPromise = new Promise<void>((resolve) => {
nock("http://mock-api")
.post("/streams/processed")
.times(2)
.reply(200, function (_u, body: unknown) {
received.push(body as ProcessedStreamEvent)
if (received.length === 2) resolve()
return "ok"
})
})

workerMod = await import("../../src/worker")
await awaitWithTimeout(publishedPromise, 5000, "publish timeout")

expect(received.length).toBe(2)
expect(received[0].sessionId).toBe(received[1].sessionId)
expect(received[0].streamId).toBe("same")
})

test("capacity exceeded -> event dropped, not published", async () => {
process.env.API_URL = "http://mock-api"
process.env.POLL_INTERVAL_MS = "50"
process.env.MAX_CONCURRENT_SESSIONS = "1"

const now = new Date().toISOString()
const a = { streamId: "a", data: { type: "t" }, timestamp: now }
const b = { streamId: "b", data: { type: "t" }, timestamp: now }

let once = false
nock("http://mock-api")
.get("/streams/pending")
.times(100)
.reply(() => {
if (!once) {
once = true
return [200, [a, b]]
}
return [200, []]
})

const published: ProcessedStreamEvent[] = []
const publishedPromise = new Promise<void>((resolve) => {
nock("http://mock-api")
.post("/streams/processed")
.reply(200, function (_u, body: unknown) {
published.push(body as ProcessedStreamEvent)
// resolve after at most one publish (capacity should drop the other)
if (published.length >= 1) resolve()
return "ok"
})
})

workerMod = await import("../../src/worker")
await awaitWithTimeout(publishedPromise, 5000, "publish timeout")

expect(published.length).toBe(1)
})

test("graceful shutdown flushes pending publishes", async () => {
process.env.API_URL = "http://mock-api"
process.env.POLL_INTERVAL_MS = "50"

const now = new Date().toISOString()
const event = { streamId: "slow", data: { type: "t" }, timestamp: now }

let sent = false
nock("http://mock-api")
.get("/streams/pending")
.times(100)
.reply(() => {
if (!sent) {
sent = true
return [200, [event]]
}
return [200, []]
})

let published = false
nock("http://mock-api")
.post("/streams/processed")
.reply(200, async function () {
// simulate slow publish
await new Promise((r) => setTimeout(r, 200))
published = true
return "ok"
})

const workerMod = await import("../../src/worker")

// give worker a moment to pick up the event
await new Promise((r) => setTimeout(r, 100))
// request shutdown and wait for it to complete (should wait for publish)
await workerMod.shutdown("test")
expect(published).toBe(true)
})

test("api error then recovery -> worker retries next poll", async () => {
process.env.API_URL = "http://mock-api"
process.env.POLL_INTERVAL_MS = "50"

const now = new Date().toISOString()
const event = { streamId: "r1", data: { type: "t" }, timestamp: now }

let calls = 0
nock("http://mock-api")
.get("/streams/pending")
.times(100)
.reply(() => {
calls++
if (calls === 1) return [500, "fail"]
if (calls === 2) return [200, []]
return [200, [event]]
})

const publishedPromise = new Promise<void>((resolve) => {
nock("http://mock-api")
.post("/streams/processed")
.reply(200, function () {
resolve()
return "ok"
})
})

const workerMod = await import("../../src/worker")
await awaitWithTimeout(publishedPromise, 5000, "publish timeout")
await workerMod.shutdown("test")
})
12 changes: 9 additions & 3 deletions xstreamroll-processing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
"jest": {
"preset": "ts-jest",
"testEnvironment": "node",
"testMatch": ["**/__tests__/**/*.test.ts"],
"testMatch": [
"**/__tests__/**/*.test.ts"
],
"forceExit": true,
"coverageDirectory": "coverage",
"coverageReporters": ["lcov", "text"]
"coverageReporters": [
"lcov",
"text"
]
},
"dependencies": {
"axios": "^1.6.0",
Expand All @@ -29,6 +34,7 @@
"jest": "^29.7.0",
"ts-jest": "^29.2.5",
"ts-node": "^10.9.0",
"typescript": "^5.3.0"
"typescript": "^5.3.0",
"nock": "^13.3.0"
}
}
23 changes: 18 additions & 5 deletions xstreamroll-processing/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ let shuttingDown = false
async function pollOnce(): Promise<void> {
let events: StreamEvent[] = []
try {
const response = await axiosInstance.get<StreamEvent[]>(`${API_URL}/streams/pending`)
const response = await axiosInstance.get<StreamEvent[]>(
`${API_URL}/streams/pending`,
)
events = Array.isArray(response.data) ? response.data : []
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
Expand All @@ -47,7 +49,11 @@ async function pollOnce(): Promise<void> {
}

for (const event of events) {
if (!event || typeof event.streamId !== "string" || event.streamId.length === 0) {
if (
!event ||
typeof event.streamId !== "string" ||
event.streamId.length === 0
) {
console.warn(`[${WORKER_ID}] dropping malformed event`, event)
continue
}
Expand Down Expand Up @@ -86,7 +92,9 @@ async function start(): Promise<void> {
void loop()
}

const gracefulShutdown = new GracefulShutdown({ timeoutMs: 15_000 })
const gracefulShutdown = new GracefulShutdown({
timeoutMs: 15_000,
})

gracefulShutdown.register({
name: "stop poll loop",
Expand All @@ -111,14 +119,19 @@ gracefulShutdown.register({
},
})

gracefulShutdown.install()
if (env.NODE_ENV !== "test") {
gracefulShutdown.install()
}

/** Exported for testing: triggers the graceful-shutdown sequence. */
export const shutdown = (signal: string): Promise<void> =>
gracefulShutdown.requestShutdown(signal as ShutdownReason)

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
return new Promise((resolve) => {
const timer = setTimeout(() => resolve(), ms)
if (typeof timer.unref === "function") timer.unref()
})
}

void start()
Loading