diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts index dc19a34de..671fd0397 100644 --- a/src/internal/queue/event.ts +++ b/src/internal/queue/event.ts @@ -21,6 +21,16 @@ export interface BasePayload { const { pgQueueEnable, region, isMultitenant } = getConfig() +function withPayloadVersion( + payload: TPayload, + version: string +): TPayload { + return { + ...payload, + $version: payload.$version ?? version, + } +} + export type StaticThis> = BaseEventConstructor interface BaseEventConstructor> { @@ -101,19 +111,20 @@ export class Event> { return Queue.getInstance().insert( messages.map((message) => { - const sendOptions = (this.getSendOptions(message.payload) as PgBoss.JobInsert) || {} - if (!message.payload.$version) { - ;(message.payload as (typeof message)['payload']).$version = this.version - } + const payloadWithVersion = withPayloadVersion( + message.payload as (typeof message)['payload'], + this.version + ) + const sendOptions = (this.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {} - if (message.payload.scheduleAt) { - sendOptions.startAfter = new Date(message.payload.scheduleAt) + if (payloadWithVersion.scheduleAt) { + sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt) } return { ...sendOptions, name: this.getQueueName(), - data: message.payload, + data: payloadWithVersion, deadLetter: this.deadLetterQueueName(), } }) @@ -125,10 +136,7 @@ export class Event> { payload: Omit, opts?: SendOptions & { tnx?: Knex } ) { - if (!payload.$version) { - ;(payload as T['payload']).$version = this.version - } - const that = new this(payload) + const that = new this(withPayloadVersion(payload as T['payload'], this.version)) return that.send(opts) } @@ -136,10 +144,7 @@ export class Event> { this: StaticThis, payload: Omit ) { - if (!payload.$version) { - ;(payload as T['payload']).$version = this.version - } - const that = new this(payload) + const that = new this(withPayloadVersion(payload as T['payload'], this.version)) return that.invoke() } @@ -148,10 +153,7 @@ export class Event> { payload: Omit, options?: SendOptions & { sendWhenError?: (error: unknown) => boolean } ) { - if (!payload.$version) { - ;(payload as T['payload']).$version = this.version - } - const that = new this(payload) + const that = new this(withPayloadVersion(payload as T['payload'], this.version)) return that.invokeOrSend(options) } diff --git a/src/test/event.test.ts b/src/test/event.test.ts new file mode 100644 index 000000000..35a4368e7 --- /dev/null +++ b/src/test/event.test.ts @@ -0,0 +1,141 @@ +'use strict' + +type EventModule = typeof import('../internal/queue/event') +type QueueModule = typeof import('../internal/queue/queue') + +type TestPayload = { + name: string + tenant: { + ref: string + host: string + } + reqId?: string + scheduleAt?: Date +} + +async function loadQueueModules(opts?: { pgQueueEnable?: boolean }) { + jest.resetModules() + + const configModule = await import('../config') + configModule.getConfig({ reload: true }) + configModule.mergeConfig({ + pgQueueEnable: opts?.pgQueueEnable ?? false, + }) + + const eventModule = (await import('../internal/queue/event')) as EventModule + const queueModule = (await import('../internal/queue/queue')) as QueueModule + + return { eventModule, queueModule } +} + +function defineTestEvent(EventBase: EventModule['Event']) { + return class TestEvent extends EventBase { + static readonly version = 'v-test' + protected static queueName = 'test-event' + } +} + +function createPayload(overrides: Partial = {}): TestPayload { + return { + name: 'test-object', + tenant: { + ref: 'test-tenant', + host: 'localhost', + }, + reqId: 'req-123', + ...overrides, + } +} + +describe('Event payload versioning', () => { + afterEach(() => { + jest.restoreAllMocks() + jest.resetModules() + }) + + it('does not mutate payloads passed to static send', async () => { + const { eventModule } = await loadQueueModules() + const TestEvent = defineTestEvent(eventModule.Event) + const payload = createPayload() + + jest.spyOn(TestEvent.prototype, 'send').mockImplementation(function ( + this: InstanceType + ) { + expect(this.payload.$version).toBe('v-test') + return Promise.resolve('queued') + }) + + await TestEvent.send(payload) + + expect(payload).toEqual(createPayload()) + expect(payload).not.toHaveProperty('$version') + }) + + it('does not mutate payloads passed to static invoke', async () => { + const { eventModule } = await loadQueueModules() + const TestEvent = defineTestEvent(eventModule.Event) + const payload = createPayload() + + jest.spyOn(TestEvent.prototype, 'invoke').mockImplementation(function ( + this: InstanceType + ) { + expect(this.payload.$version).toBe('v-test') + return Promise.resolve(null) + }) + + await TestEvent.invoke(payload) + + expect(payload).toEqual(createPayload()) + expect(payload).not.toHaveProperty('$version') + }) + + it('does not mutate payloads passed to static invokeOrSend', async () => { + const { eventModule } = await loadQueueModules() + const TestEvent = defineTestEvent(eventModule.Event) + const payload = createPayload() + + jest.spyOn(TestEvent.prototype, 'invokeOrSend').mockImplementation(function ( + this: InstanceType + ) { + expect(this.payload.$version).toBe('v-test') + return Promise.resolve(null) + }) + + await TestEvent.invokeOrSend(payload) + + expect(payload).toEqual(createPayload()) + expect(payload).not.toHaveProperty('$version') + }) + + it('does not mutate message payloads when batching', async () => { + const { eventModule, queueModule } = await loadQueueModules({ pgQueueEnable: true }) + const TestEvent = defineTestEvent(eventModule.Event) + const payload = createPayload({ scheduleAt: new Date('2026-04-07T10:00:00.000Z') }) + const insert = jest.fn().mockResolvedValue('job-id') + + jest + .spyOn(queueModule.Queue, 'getInstance') + .mockReturnValue({ insert } as unknown as ReturnType) + + const message = new TestEvent(payload) + + await TestEvent.batchSend([message]) + + expect(payload).toEqual(createPayload({ scheduleAt: new Date('2026-04-07T10:00:00.000Z') })) + expect(payload).not.toHaveProperty('$version') + expect(insert).toHaveBeenCalledWith([ + expect.objectContaining({ + name: 'test-event', + deadLetter: 'test-event-dead-letter', + data: expect.objectContaining({ + $version: 'v-test', + name: 'test-object', + tenant: expect.objectContaining({ + ref: 'test-tenant', + }), + }), + startAfter: new Date('2026-04-07T10:00:00.000Z'), + }), + ]) + }) +})