diff --git a/packages/bus-core/src/service-bus/bus-instance.spec.ts b/packages/bus-core/src/service-bus/bus-instance.spec.ts new file mode 100644 index 00000000..3991e853 --- /dev/null +++ b/packages/bus-core/src/service-bus/bus-instance.spec.ts @@ -0,0 +1,254 @@ +import { BusInstance } from './bus-instance' +import { Transport } from '../transport' +import { + Command, + Event, + MessageAttributes +} from '@node-ts/bus-messages' +import { CoreDependencies, MiddlewareDispatcher } from '../util' +import { HandlerRegistry } from '../handler' +import { WorkflowRegistry } from '../workflow/registry' +import { Logger } from '../logger' +import { ContainerAdapter } from '../container' +import { Receiver } from '../receiver' +import { messageHandlingContext } + from '../message-handling-context' // To test prepareTransportOptions behavior + +// Mock external modules and dependencies +jest.mock('../workflow/registry') +jest.mock('../util/middleware-dispatcher') +jest.mock('../handler/handler-registry') +jest.mock('../logger') +// jest.mock('uuid', () => ({ v4: () => 'mock-uuid' })) // If generateUuid is directly used and needs mocking + +// Define mock types explicitly +type MockTransport = jest.Mocked +type MockLogger = jest.Mocked +type MockCoreDependencies = jest.Mocked +type MockWorkflowRegistry = jest.Mocked +type MockMiddlewareDispatcher = jest.Mocked> +type MockHandlerRegistry = jest.Mocked +type MockContainerAdapter = jest.Mocked +type MockReceiver = jest.Mocked + + +describe('BusInstance', () => { + let busInstance: BusInstance + let mockTransport: MockTransport + let mockLogger: MockLogger + let mockLoggerFactory: jest.Mock + let mockCoreDependencies: MockCoreDependencies + let mockWorkflowRegistry: MockWorkflowRegistry + let mockMessageReadMiddleware: MockMiddlewareDispatcher + let mockHandlerRegistry: MockHandlerRegistry + let mockContainer: MockContainerAdapter | undefined + let mockReceiver: MockReceiver | undefined + + const concurrency = 1 + const sendOnly = false + + // Sample messages and attributes for testing + class TestCommand implements Command { + $name = 'test-command' + $version = 1 + constructor(public readonly id: string) {} + } + + class TestEvent implements Event { + $name = 'test-event' + $version = 1 + constructor(public readonly id: string) {} + } + + beforeEach(() => { + mockTransport = { + connect: jest.fn(), + disconnect: jest.fn(), + publish: jest.fn(), + send: jest.fn(), + sendBatch: jest.fn().mockResolvedValue(undefined), + publishBatch: jest.fn().mockResolvedValue(undefined), + readNextMessage: jest.fn(), + deleteMessage: jest.fn(), + returnMessage: jest.fn(), + fail: jest.fn(), + initialize: jest.fn(), + dispose: jest.fn(), + start: jest.fn(), + stop: jest.fn(), + prepare: jest.fn() + } + + mockLogger = { + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + fatal: jest.fn(), + trace: jest.fn() + } + mockLoggerFactory = jest.fn(() => mockLogger) + + mockCoreDependencies = { + loggerFactory: mockLoggerFactory, + messageSerializer: { + serialize: jest.fn(message => JSON.stringify(message)), + deserialize: jest.fn(messageStr => JSON.parse(messageStr)) + }, + handlerRegistry: new HandlerRegistry(mockLoggerFactory) as any, // Actual instance or mock + interruptSignals: [] + // ... other properties if needed by BusInstance constructor or prepareTransportOptions + } as unknown as MockCoreDependencies // Use unknown for partial mock + + mockWorkflowRegistry = + new WorkflowRegistry(undefined, undefined) as MockWorkflowRegistry + mockMessageReadMiddleware = + new MiddlewareDispatcher() as MockMiddlewareDispatcher + mockHandlerRegistry = new HandlerRegistry(mockLoggerFactory) as MockHandlerRegistry + mockContainer = undefined + mockReceiver = undefined + + + busInstance = new BusInstance( + mockTransport, + concurrency, + mockWorkflowRegistry, + mockCoreDependencies, + mockMessageReadMiddleware, + mockHandlerRegistry, + mockContainer, + sendOnly, + mockReceiver + ) + }) + + describe('sendBatch', () => { + const commands: TestCommand[] = [ + new TestCommand('1'), + new TestCommand('2') + ] + + it('should call transport.sendBatch with commands and attributes when attributes are provided', async () => { + const messageAttributes: Partial = { correlationId: 'test-corid', attributes: { 'custom': 'value' } } + const expectedAttributesMatcher = expect.objectContaining({ + correlationId: 'test-corid', + attributes: { 'custom': 'value' } + }) + + await busInstance.sendBatch(commands, messageAttributes) + + expect(mockLogger.debug).toHaveBeenCalledWith( + 'Sending command batch', + { commands, messageAttributes } + ) + expect(mockTransport.sendBatch).toHaveBeenCalledWith( + commands, + expectedAttributesMatcher + ) + }) + + it('should call transport.sendBatch with commands and generated attributes when no attributes are provided', async () => { + // Mock generateUuid if not already done and if it's a direct dependency of prepareTransportOptions + // For this test, we'll rely on the fact that prepareTransportOptions generates a correlationId. + const expectedAttributesMatcher = expect.objectContaining({ + correlationId: expect.any(String), // Should be generated + attributes: {} // Default + }) + + await busInstance.sendBatch(commands) + + expect(mockLogger.debug).toHaveBeenCalledWith( + 'Sending command batch', + { commands, messageAttributes: {} } // Default is empty object + ) + expect(mockTransport.sendBatch).toHaveBeenCalledWith( + commands, + expectedAttributesMatcher + ) + }) + + it('should use correlationId from message handling context if available and no explicit correlationId is passed', async () => { + const contextCorrelationId = 'context-corid' + const handlingContext = { + message: {} as any, + attributes: { correlationId: contextCorrelationId, attributes: {}, stickyAttributes: {} } + } + const expectedAttributesMatcher = expect.objectContaining({ + correlationId: contextCorrelationId + }) + + await messageHandlingContext.run(handlingContext, async () => { + await busInstance.sendBatch(commands, { attributes: { 'key': 'val'} }); + }); + + expect(mockTransport.sendBatch).toHaveBeenCalledWith( + commands, + expectedAttributesMatcher + ); + }); + }) + + describe('publishBatch', () => { + const events: TestEvent[] = [ + new TestEvent('ev1'), + new TestEvent('ev2') + ] + + it('should call transport.publishBatch with events and attributes when attributes are provided', async () => { + const messageAttributes: Partial = { correlationId: 'test-corid-event', attributes: { 'eventProp': 'eventValue' } } + const expectedAttributesMatcher = expect.objectContaining({ + correlationId: 'test-corid-event', + attributes: { 'eventProp': 'eventValue' } + }) + + await busInstance.publishBatch(events, messageAttributes) + + expect(mockLogger.debug).toHaveBeenCalledWith( + 'Publishing event batch', + { events, messageAttributes } + ) + expect(mockTransport.publishBatch).toHaveBeenCalledWith( + events, + expectedAttributesMatcher + ) + }) + + it('should call transport.publishBatch with events and generated attributes when no attributes are provided', async () => { + const expectedAttributesMatcher = expect.objectContaining({ + correlationId: expect.any(String), // Should be generated + attributes: {} // Default + }) + + await busInstance.publishBatch(events) + + expect(mockLogger.debug).toHaveBeenCalledWith( + 'Publishing event batch', + { events, messageAttributes: {} } // Default is empty object + ) + expect(mockTransport.publishBatch).toHaveBeenCalledWith( + events, + expectedAttributesMatcher + ) + }) + + it('should use correlationId from message handling context if available and no explicit correlationId is passed for publish', async () => { + const contextCorrelationId = 'context-corid-publish' + const handlingContext = { + message: {} as any, + attributes: { correlationId: contextCorrelationId, attributes: {}, stickyAttributes: {} } + } + const expectedAttributesMatcher = expect.objectContaining({ + correlationId: contextCorrelationId + }) + + await messageHandlingContext.run(handlingContext, async () => { + await busInstance.publishBatch(events, { attributes: { 'key': 'val'} }); + }); + + expect(mockTransport.publishBatch).toHaveBeenCalledWith( + events, + expectedAttributesMatcher + ); + }); + }) +}) diff --git a/packages/bus-core/src/service-bus/bus-instance.ts b/packages/bus-core/src/service-bus/bus-instance.ts index 54605cbd..052be357 100644 --- a/packages/bus-core/src/service-bus/bus-instance.ts +++ b/packages/bus-core/src/service-bus/bus-instance.ts @@ -267,6 +267,38 @@ export class BusInstance { } } + /** + * Sends a batch of commands to the transport + * @param commands An array of commands to send + * @param messageAttributes A set of attributes to attach to the outgoing message when sent + */ + async sendBatch( + commands: TCommand[], + messageAttributes: Partial = {} + ): Promise { + this.logger.debug('Sending command batch', { commands, messageAttributes }); + const attributes = this.prepareTransportOptions(messageAttributes); + // TODO: Consider outbox pattern for batch operations if required in future. + // For now, directly calling transport. + return this.transport.sendBatch(commands, attributes); + } + + /** + * Publishes a batch of events to the transport + * @param events An array of events to publish + * @param messageAttributes A set of attributes to attach to the outgoing message when published + */ + async publishBatch( + events: TEvent[], + messageAttributes: Partial = {} + ): Promise { + this.logger.debug('Publishing event batch', { events, messageAttributes }); + const attributes = this.prepareTransportOptions(messageAttributes); + // TODO: Consider outbox pattern for batch operations if required in future. + // For now, directly calling transport. + return this.transport.publishBatch(events, attributes); + } + /** * Instructs the bus that the current message being handled cannot be processed even with * retries and instead should immediately be routed to the dead letter queue diff --git a/packages/bus-core/src/transport/in-memory-queue.ts b/packages/bus-core/src/transport/in-memory-queue.ts index 48f05423..5b96c519 100644 --- a/packages/bus-core/src/transport/in-memory-queue.ts +++ b/packages/bus-core/src/transport/in-memory-queue.ts @@ -86,6 +86,26 @@ export class InMemoryQueue implements Transport { this.addToQueue(command, messageOptions) } + async sendBatch( + commands: TCommand[], + messageOptions?: MessageAttributes + ): Promise { + this.logger.debug(`Sending batch of ${commands.length} commands to in-memory queue.`); + await Promise.all( + commands.map(command => this.send(command, messageOptions)) + ); + } + + async publishBatch( + events: TEvent[], + messageOptions?: MessageAttributes + ): Promise { + this.logger.debug(`Publishing batch of ${events.length} events to in-memory queue.`); + await Promise.all( + events.map(event => this.publish(event, messageOptions)) + ); + } + async fail( transportMessage: TransportMessage ): Promise { diff --git a/packages/bus-core/src/transport/transport.ts b/packages/bus-core/src/transport/transport.ts index 628a2f42..0dc90e6d 100644 --- a/packages/bus-core/src/transport/transport.ts +++ b/packages/bus-core/src/transport/transport.ts @@ -48,6 +48,28 @@ export interface Transport { messageOptions?: MessageAttributes ): Promise + /** + * Sends a batch of commands to the underlying transport. + * @param commands An array of domain commands to be sent + * @param messageOptions Options that control the behaviour around how the messages are sent and + * additional information that travels with them. + */ + sendBatch( + commands: TCommand[], + messageOptions?: MessageAttributes + ): Promise + + /** + * Publishes a batch of events to the underlying transport. + * @param events An array of domain events to be published + * @param messageOptions Options that control the behaviour around how the messages are sent and + * additional information that travels with them. + */ + publishBatch( + events: TEvent[], + messageOptions?: MessageAttributes + ): Promise + /** * Forwards @param transportMessage to the dead letter queue. The message must have been read in from the * queue and have a receipt handle. diff --git a/packages/bus-rabbitmq/src/rabbitmq-transport.ts b/packages/bus-rabbitmq/src/rabbitmq-transport.ts index 33bbf0a1..ac104ac1 100644 --- a/packages/bus-rabbitmq/src/rabbitmq-transport.ts +++ b/packages/bus-rabbitmq/src/rabbitmq-transport.ts @@ -103,6 +103,22 @@ export class RabbitMqTransport implements Transport { await this.publishMessage(command, messageAttributes) } + async sendBatch( + _commands: TCommand[], + _messageOptions?: MessageAttributes + ): Promise { + this.logger.warn('sendBatch is called but not supported by RabbitMqTransport.'); + throw new Error('Batch operations are not supported by RabbitMqTransport.'); + } + + async publishBatch( + _events: TEvent[], + _messageOptions?: MessageAttributes + ): Promise { + this.logger.warn('publishBatch is called but not supported by RabbitMqTransport.'); + throw new Error('Batch operations are not supported by RabbitMqTransport.'); + } + async fail(transportMessage: TransportMessage): Promise { const rawMessage = transportMessage.raw as GetMessage const serializedPayload = this.coreDependencies.messageSerializer.serialize( diff --git a/packages/bus-sqs/src/sqs-transport.ts b/packages/bus-sqs/src/sqs-transport.ts index a9f6f20b..c0ad9463 100644 --- a/packages/bus-sqs/src/sqs-transport.ts +++ b/packages/bus-sqs/src/sqs-transport.ts @@ -4,6 +4,9 @@ import { CreateTopicCommand, MessageAttributeValue, PublishCommand, + PublishBatchCommand, + PublishBatchCommandInput, + PublishBatchRequestEntry, SNSClient, SubscribeCommand } from '@aws-sdk/client-sns' @@ -31,7 +34,8 @@ import { TransportMessage, CoreDependencies, Logger, - TransportInitializationOptions + TransportInitializationOptions, + uuid } from '@node-ts/bus-core' import { generatePolicy } from './generate-policy' import { @@ -132,6 +136,114 @@ export class SqsTransport implements Transport { await this.publishMessage(command, messageAttributes) } + async sendBatch( + commands: TCommand[], + messageOptions: MessageAttributes = { attributes: {}, stickyAttributes: {} } + ): Promise { + if (commands.length === 0) { + this.logger.debug('sendBatch called with an empty command list.') + return + } + + this.logger.info(`Attempting to send a batch of ${commands.length} commands.`) + await this.assertSnsTopicsForMessages(commands) + + const messagesByTopic = await this.groupMessagesByTopic(commands) + + for (const [topicArn, topicMessages] of messagesByTopic.entries()) { + this.logger.debug(`Sending batch for topic ${topicArn}`, { count: topicMessages.length }) + await this.executePublishBatchInternal(topicMessages, topicArn, messageOptions) + } + } + + async publishBatch( + events: TEvent[], + messageOptions: MessageAttributes = { attributes: {}, stickyAttributes: {} } + ): Promise { + if (events.length === 0) { + this.logger.debug('publishBatch called with an empty event list.') + return + } + + this.logger.info(`Attempting to publish a batch of ${events.length} events.`) + await this.assertSnsTopicsForMessages(events) + + const messagesByTopic = await this.groupMessagesByTopic(events) + + for (const [topicArn, topicMessages] of messagesByTopic.entries()) { + this.logger.debug(`Publishing batch for topic ${topicArn}`, { count: topicMessages.length }) + await this.executePublishBatchInternal(topicMessages, topicArn, messageOptions) + } + } + + private async assertSnsTopicsForMessages(messages: Message[]): Promise { + const uniqueMessageNames = [...new Set(messages.map(m => m.$name))]; + this.logger.debug('Asserting SNS topics for message types in batch', { uniqueMessageNames }); + for (const messageName of uniqueMessageNames) { + const sampleMessage = messages.find(m => m.$name === messageName); + if (sampleMessage) { + await this.assertSnsTopic(sampleMessage); + } + } + } + + private async groupMessagesByTopic(messages: Message[]): Promise> { + const messagesByTopic: Map = new Map(); + for (const message of messages) { + const topicName = this.resolveTopicName(message.$name); + const topicArn = this.resolveTopicArn( + this.sqsConfiguration.awsAccountId!, + this.sqsConfiguration.awsRegion!, + topicName + ); + if (!messagesByTopic.has(topicArn)) { + messagesByTopic.set(topicArn, []); + } + messagesByTopic.get(topicArn)!.push(message); + } + return messagesByTopic; + } + + private async executePublishBatchInternal( + messages: Message[], + topicArn: string, + commonMessageAttributes: MessageAttributes + ): Promise { + const attributeMap = toMessageAttributeMap(commonMessageAttributes) + const batchSize = 10; // SNS PublishBatch limit + + for (let i = 0; i < messages.length; i += batchSize) { + const chunk = messages.slice(i, i + batchSize); + const entries: PublishBatchRequestEntry[] = chunk.map(message => ({ + Id: message.id || uuid(), // Ensure message has an ID or generate one. uuid() is from @node-ts/bus-core + Message: this.coreDependencies.messageSerializer.serialize(message), + MessageAttributes: attributeMap, // Apply common attributes to all messages in batch + Subject: message.$name + })); + + const commandInput: PublishBatchCommandInput = { + TopicArn: topicArn, + PublishBatchRequestEntries: entries + }; + const command = new PublishBatchCommand(commandInput); + + this.logger.debug(`Sending chunk of ${entries.length} messages to SNS topic ${topicArn}`, { commandInput }); + try { + const result = await this.sns.send(command); + if (result.Failed && result.Failed.length > 0) { + this.logger.error('Some messages failed to publish in batch', { failedMessages: result.Failed, topicArn }); + // Potentially throw an error or handle partial failures as needed by the application + } + if (result.Successful && result.Successful.length > 0) { + this.logger.debug(`${result.Successful.length} messages successfully published in batch to topic ${topicArn}`); + } + } catch (error) { + this.logger.error(`Error sending message batch to SNS topic ${topicArn}`, { error, topicArn }); + throw error; // Rethrow or handle as appropriate + } + } + } + async fail(transportMessage: TransportMessage): Promise { /* SQS doesn't support forwarding a message to another queue. This approach will copy the message to the dead letter diff --git a/packages/bus-test/src/transport.integration.ts b/packages/bus-test/src/transport.integration.ts index c96dbc0f..be9bc71a 100644 --- a/packages/bus-test/src/transport.integration.ts +++ b/packages/bus-test/src/transport.integration.ts @@ -234,5 +234,125 @@ export const transportTests = ( ) }) }) + + describe('when sending a batch of commands', () => { + const command1 = new TestCommand(uuid.v4(), new Date()) + const command2 = new TestCommand(uuid.v4(), new Date()) + const batchCommands = [command1, command2] + const messageOptions: MessageAttributes = { + correlationId: uuid.v4(), + attributes: { batch: 'true', sender: 'sendBatch-test' } + } + let receivedCount = 0 + const totalMessages = batchCommands.length + let allMessagesHandled: Promise + + beforeEach(() => { + handleChecker.reset() // Reset mock before each test in this describe + receivedCount = 0 + allMessagesHandled = new Promise(resolve => { + const onCommandReceived = () => { + receivedCount++ + if (receivedCount === totalMessages) { + testCommandHandlerEmitter.off('received', onCommandReceived) + resolve() + } + } + testCommandHandlerEmitter.on('received', onCommandReceived) + }) + }) + + it('should handle all commands in the batch or throw not supported error', async () => { + try { + await bus.sendBatch(batchCommands, messageOptions) + // If no error, transport supports batching + await allMessagesHandled + + handleChecker.verify( + h => h.check( + It.is((cmd: TestCommand) => cmd.id === command1.id), + It.isObjectWith(messageOptions) + ), + Times.once() + ) + handleChecker.verify( + h => h.check( + It.is((cmd: TestCommand) => cmd.id === command2.id), + It.isObjectWith(messageOptions) + ), + Times.once() + ) + } catch (e: any) { + // Transport does not support batching + expect(e.message).toEqual( + 'Batch operations are not supported by RabbitMqTransport.' + ) + handleChecker.verify( + h => h.check(It.isAny(), It.isAny()), + Times.never() + ) + } + }) + }) + + describe('when publishing a batch of events', () => { + const event1 = new TestEvent(uuid.v4()) + const event2 = new TestEvent(uuid.v4()) + const batchEvents = [event1, event2] + const messageOptions: MessageAttributes = { + correlationId: uuid.v4(), + attributes: { batch: 'true', publisher: 'publishBatch-test' } + } + let receivedCount = 0 + const totalMessages = batchEvents.length + let allMessagesHandled: Promise + + beforeEach(() => { + handleChecker.reset() // Reset mock before each test in this describe + receivedCount = 0 + allMessagesHandled = new Promise(resolve => { + const onEventReceived = () => { + receivedCount++ + if (receivedCount === totalMessages) { + testEventHandlerEmitter.off('received', onEventReceived) + resolve() + } + } + testEventHandlerEmitter.on('received', onEventReceived) + }) + }) + + it('should handle all events in the batch or throw not supported error', async () => { + try { + await bus.publishBatch(batchEvents, messageOptions) + // If no error, transport supports batching + await allMessagesHandled + + handleChecker.verify( + h => h.check( + It.is((evt: TestEvent) => evt.id === event1.id), + It.isObjectWith(messageOptions) + ), + Times.once() + ) + handleChecker.verify( + h => h.check( + It.is((evt: TestEvent) => evt.id === event2.id), + It.isObjectWith(messageOptions) + ), + Times.once() + ) + } catch (e: any) { + // Transport does not support batching + expect(e.message).toEqual( + 'Batch operations are not supported by RabbitMqTransport.' + ) + handleChecker.verify( + h => h.check(It.isAny(), It.isAny()), + Times.never() + ) + } + }) + }) }) }