diff --git a/packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php b/packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php new file mode 100644 index 000000000..96cf5b295 --- /dev/null +++ b/packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php @@ -0,0 +1,99 @@ +markTestSkipped('Stream tests require AMQP lib'); + } + } + + public function test_resend_works_for_dynamic_channel_wrapping_amqp_stream_channels(): void + { + $queueTenantA = 'stream_queue_tenant_a_' . Uuid::v7()->toRfc4122(); + $queueTenantB = 'stream_queue_tenant_b_' . Uuid::v7()->toRfc4122(); + $handler = new DynamicStreamRetryHandler(); + + $ecotoneLite = $this->bootstrapForTesting( + [DynamicStreamRetryHandler::class], + [ + $handler, + ...$this->getConnectionFactoryReferences(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + AmqpQueue::createStreamQueue($queueTenantA), + AmqpStreamChannelBuilder::create( + channelName: 'async_tenant_a', + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueTenantA, + )->withFinalFailureStrategy(FinalFailureStrategy::RESEND), + AmqpQueue::createStreamQueue($queueTenantB), + AmqpStreamChannelBuilder::create( + channelName: 'async_tenant_b', + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueTenantB, + )->withFinalFailureStrategy(FinalFailureStrategy::RESEND), + DynamicMessageChannelBuilder::createRoundRobin( + thisMessageChannelName: 'async', + channelNames: ['async_tenant_a', 'async_tenant_b'], + ), + ]), + ); + + $ecotoneLite->getCommandBus()->sendWithRouting('execute.dynamic_stream', 'test_message'); + + $ecotoneLite->run('async', ExecutionPollingMetadata::createWithFinishWhenNoMessages(failAtError: false)->withExecutionTimeLimitInMilliseconds(5000)); + + self::assertTrue($handler->failedOnce, 'Handler should have failed on first attempt'); + + $ecotoneLite->run('async', ExecutionPollingMetadata::createWithFinishWhenNoMessages(failAtError: true)->withExecutionTimeLimitInMilliseconds(5000)); + + self::assertTrue($handler->succeeded, 'Handler should have succeeded on retry after resend'); + } +} + +class DynamicStreamRetryHandler +{ + public bool $failedOnce = false; + public bool $succeeded = false; + + #[Asynchronous('async')] + #[CommandHandler('execute.dynamic_stream', 'dynamic_stream_endpoint')] + public function handle(string $command): void + { + if (! $this->failedOnce) { + $this->failedOnce = true; + throw new Exception('Simulated failure to trigger resend'); + } + $this->succeeded = true; + } +} diff --git a/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php b/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php index b7de16c7b..8934122b8 100644 --- a/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php +++ b/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php @@ -46,7 +46,9 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message Assert::notNullAndEmpty($channelName, "Channel name to poll message from cannot be null. If you want to skip message receiving, return 'nullChannel' instead."); $channel = $this->resolveMessageChannel($channelName); - $message = $channel->receiveWithTimeout($pollingMetadata); + $message = $channel->receiveWithTimeout( + $pollingMetadata->setPolledChannelName($channelName) + ); $this->loggingGateway->info("Decided to received message from `{$channelName}` for `{$this->channelName}`", $message, ['channel_name' => $this->channelName, 'chosen_channel_name' => $channelName]); return $message; diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php index c41f4ed4d..ea20e54c6 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php @@ -27,6 +27,7 @@ public function __construct(private string $pollableChannelName, private Pollabl public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message { + $pollingMetadata = $pollingMetadata->setPolledChannelName($this->pollableChannelName); $timeoutInMilliseconds = $pollingMetadata->getFixedRateInMilliseconds(); $message = $timeoutInMilliseconds ? $this->pollableChannel->receiveWithTimeout($pollingMetadata) diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php b/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php index 73774a5e9..16ecd9a9f 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php @@ -26,6 +26,7 @@ final class PollingMetadata implements DefinedObject public const DEFAULT_FINISH_WHEN_NO_MESSAGES = false; private bool $withSignalInterceptors; + private ?string $polledChannelName = null; /** @@ -442,6 +443,19 @@ public function getCronExpression(): ?string return $this->cronExpression; } + public function getPolledChannelName(): ?string + { + return $this->polledChannelName; + } + + public function setPolledChannelName(string $polledChannelName): self + { + $copy = $this->createCopy(); + $copy->polledChannelName = $polledChannelName; + + return $copy; + } + public function hasFixedRateExpression(): bool { return $this->fixedRateExpression !== null; diff --git a/packages/Kafka/src/Inbound/InboundMessageConverter.php b/packages/Kafka/src/Inbound/InboundMessageConverter.php index 8c0679c98..94714f00f 100644 --- a/packages/Kafka/src/Inbound/InboundMessageConverter.php +++ b/packages/Kafka/src/Inbound/InboundMessageConverter.php @@ -35,7 +35,7 @@ public function toMessage( ConversionService $conversionService, BatchCommitCoordinator $batchCommitCoordinator, ): MessageBuilder { - $kafkaConsumerConfiguration = $this->kafkaAdmin->getRdKafkaConfiguration($endpointId, $channelName); + $kafkaConsumerConfiguration = $this->kafkaAdmin->getRdKafkaConfiguration($channelName); $headerMapper = $kafkaConsumerConfiguration->getHeaderMapper(); $acknowledgeMode = $kafkaConsumerConfiguration->getAcknowledgeMode(); @@ -49,6 +49,7 @@ public function toMessage( $this->loggingGateway, $this->kafkaAdmin, $endpointId, + $channelName, $this->finalFailureStrategy, $acknowledgeMode === KafkaAcknowledgementCallback::AUTO_ACK, $batchCommitCoordinator, diff --git a/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php b/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php index fb3ec2399..236b8f829 100644 --- a/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php +++ b/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php @@ -29,6 +29,7 @@ private function __construct( private LoggingGateway $loggingGateway, private KafkaAdmin $kafkaAdmin, private string $endpointId, + private string $channelName, private BatchCommitCoordinator $batchCommitCoordinator, ) { } @@ -39,6 +40,7 @@ public static function create( LoggingGateway $loggingGateway, KafkaAdmin $kafkaAdmin, string $endpointId, + string $channelName, FinalFailureStrategy $finalFailureStrategy, bool $isAutoAcked, BatchCommitCoordinator $batchCommitCoordinator, @@ -51,6 +53,7 @@ public static function create( $loggingGateway, $kafkaAdmin, $endpointId, + $channelName, $batchCommitCoordinator, ); } @@ -105,8 +108,8 @@ public function reject(): void public function resend(): void { try { - $this->kafkaAdmin->getProducer($this->endpointId); - $topic = $this->kafkaAdmin->getTopicForProducer($this->endpointId); + $this->kafkaAdmin->getProducer($this->channelName); + $topic = $this->kafkaAdmin->getTopicForProducer($this->channelName); $topic->producev( $this->message->partition, 0, diff --git a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php index 041c2e967..c432f1cc7 100644 --- a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php +++ b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php @@ -34,7 +34,8 @@ public function __construct( public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message { $endpointId = $pollingMetadata->getEndpointId(); - $consumer = $this->kafkaAdmin->getConsumer($endpointId, $this->channelName); + $channelName = $pollingMetadata->getPolledChannelName() ?? $this->channelName; + $consumer = $this->kafkaAdmin->getConsumer($endpointId, $channelName); if ($this->batchCommitCoordinator === null || $this->batchCommitCoordinator->consumer !== $consumer) { $this->batchCommitCoordinator = new BatchCommitCoordinator( @@ -60,7 +61,7 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) { return $this->inboundMessageConverter->toMessage( $endpointId, - $this->channelName, + $channelName, $consumer, $message, $this->conversionService, diff --git a/packages/Kafka/tests/Integration/DynamicChannelRetryTest.php b/packages/Kafka/tests/Integration/DynamicChannelRetryTest.php new file mode 100644 index 000000000..ec6bfaa3d --- /dev/null +++ b/packages/Kafka/tests/Integration/DynamicChannelRetryTest.php @@ -0,0 +1,97 @@ +toRfc4122(); + $topicTenantB = 'topic_async_tenant_b_' . Uuid::v7()->toRfc4122(); + $handler = new DynamicChannelRetryHandler(); + + $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( + [DynamicChannelRetryHandler::class], + [$handler, KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::KAFKA_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + KafkaMessageChannelBuilder::create( + channelName: 'async_tenant_a', + topicName: $topicTenantA, + ) + ->withFinalFailureStrategy(FinalFailureStrategy::RESEND) + ->withReceiveTimeout(3000), + KafkaMessageChannelBuilder::create( + channelName: 'async_tenant_b', + topicName: $topicTenantB, + ) + ->withFinalFailureStrategy(FinalFailureStrategy::RESEND) + ->withReceiveTimeout(3000), + DynamicMessageChannelBuilder::createRoundRobin( + thisMessageChannelName: 'async', + channelNames: ['async_tenant_a', 'async_tenant_b'], + ), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotoneTestSupport->sendCommandWithRoutingKey('execute.dynamic', 'test_message'); + + $ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + maxExecutionTimeInMilliseconds: 10000, + failAtError: false, + )); + + self::assertTrue($handler->failedOnce, 'Handler should have failed on first attempt'); + + $ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + maxExecutionTimeInMilliseconds: 10000, + failAtError: true, + )); + + self::assertTrue($handler->succeeded, 'Handler should have succeeded on retry after resend'); + } +} + +class DynamicChannelRetryHandler +{ + public bool $failedOnce = false; + public bool $succeeded = false; + + #[Asynchronous('async')] + #[CommandHandler('execute.dynamic', 'dynamic_endpoint')] + public function handle(string $command): void + { + if (! $this->failedOnce) { + $this->failedOnce = true; + throw new Exception('Simulated failure to trigger resend'); + } + $this->succeeded = true; + } +}