From 8a434e2b7da4072ab04860fe5c93877f23978cb5 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Sat, 4 Apr 2026 19:02:58 +0000 Subject: [PATCH] fix: resolve RESEND failure strategy crash for dynamic channels wrapping Kafka sub-channels When a dynamic channel (e.g. "async") wraps Kafka sub-channels, the RESEND failure strategy crashed with "Publisher configuration for async not found" because KafkaAcknowledgementCallback used the endpoint ID instead of the actual channel name for publisher lookup. Added polledChannelName to PollingMetadata so the actual channel name propagates through the polling flow. DynamicMessageChannel overrides it with the resolved sub-channel name before delegating. --- .../DynamicStreamChannelRetryTest.php | 99 +++++++++++++++++++ .../DynamicChannel/DynamicMessageChannel.php | 4 +- .../PollableChannelPollerAdapter.php | 1 + .../Messaging/Endpoint/PollingMetadata.php | 14 +++ .../src/Inbound/InboundMessageConverter.php | 3 +- .../Inbound/KafkaAcknowledgementCallback.php | 7 +- .../Inbound/KafkaInboundChannelAdapter.php | 5 +- .../Integration/DynamicChannelRetryTest.php | 97 ++++++++++++++++++ 8 files changed, 224 insertions(+), 6 deletions(-) create mode 100644 packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php create mode 100644 packages/Kafka/tests/Integration/DynamicChannelRetryTest.php diff --git a/packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php b/packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php new file mode 100644 index 000000000..96cf5b295 --- /dev/null +++ b/packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php @@ -0,0 +1,99 @@ +markTestSkipped('Stream tests require AMQP lib'); + } + } + + public function test_resend_works_for_dynamic_channel_wrapping_amqp_stream_channels(): void + { + $queueTenantA = 'stream_queue_tenant_a_' . Uuid::v7()->toRfc4122(); + $queueTenantB = 'stream_queue_tenant_b_' . Uuid::v7()->toRfc4122(); + $handler = new DynamicStreamRetryHandler(); + + $ecotoneLite = $this->bootstrapForTesting( + [DynamicStreamRetryHandler::class], + [ + $handler, + ...$this->getConnectionFactoryReferences(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + AmqpQueue::createStreamQueue($queueTenantA), + AmqpStreamChannelBuilder::create( + channelName: 'async_tenant_a', + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueTenantA, + )->withFinalFailureStrategy(FinalFailureStrategy::RESEND), + AmqpQueue::createStreamQueue($queueTenantB), + AmqpStreamChannelBuilder::create( + channelName: 'async_tenant_b', + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueTenantB, + )->withFinalFailureStrategy(FinalFailureStrategy::RESEND), + DynamicMessageChannelBuilder::createRoundRobin( + thisMessageChannelName: 'async', + channelNames: ['async_tenant_a', 'async_tenant_b'], + ), + ]), + ); + + $ecotoneLite->getCommandBus()->sendWithRouting('execute.dynamic_stream', 'test_message'); + + $ecotoneLite->run('async', ExecutionPollingMetadata::createWithFinishWhenNoMessages(failAtError: false)->withExecutionTimeLimitInMilliseconds(5000)); + + self::assertTrue($handler->failedOnce, 'Handler should have failed on first attempt'); + + $ecotoneLite->run('async', ExecutionPollingMetadata::createWithFinishWhenNoMessages(failAtError: true)->withExecutionTimeLimitInMilliseconds(5000)); + + self::assertTrue($handler->succeeded, 'Handler should have succeeded on retry after resend'); + } +} + +class DynamicStreamRetryHandler +{ + public bool $failedOnce = false; + public bool $succeeded = false; + + #[Asynchronous('async')] + #[CommandHandler('execute.dynamic_stream', 'dynamic_stream_endpoint')] + public function handle(string $command): void + { + if (! $this->failedOnce) { + $this->failedOnce = true; + throw new Exception('Simulated failure to trigger resend'); + } + $this->succeeded = true; + } +} diff --git a/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php b/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php index b7de16c7b..8934122b8 100644 --- a/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php +++ b/packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannel.php @@ -46,7 +46,9 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message Assert::notNullAndEmpty($channelName, "Channel name to poll message from cannot be null. If you want to skip message receiving, return 'nullChannel' instead."); $channel = $this->resolveMessageChannel($channelName); - $message = $channel->receiveWithTimeout($pollingMetadata); + $message = $channel->receiveWithTimeout( + $pollingMetadata->setPolledChannelName($channelName) + ); $this->loggingGateway->info("Decided to received message from `{$channelName}` for `{$this->channelName}`", $message, ['channel_name' => $this->channelName, 'chosen_channel_name' => $channelName]); return $message; diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php index c41f4ed4d..ea20e54c6 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/PollableChannelPollerAdapter.php @@ -27,6 +27,7 @@ public function __construct(private string $pollableChannelName, private Pollabl public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message { + $pollingMetadata = $pollingMetadata->setPolledChannelName($this->pollableChannelName); $timeoutInMilliseconds = $pollingMetadata->getFixedRateInMilliseconds(); $message = $timeoutInMilliseconds ? $this->pollableChannel->receiveWithTimeout($pollingMetadata) diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php b/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php index 73774a5e9..16ecd9a9f 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php @@ -26,6 +26,7 @@ final class PollingMetadata implements DefinedObject public const DEFAULT_FINISH_WHEN_NO_MESSAGES = false; private bool $withSignalInterceptors; + private ?string $polledChannelName = null; /** @@ -442,6 +443,19 @@ public function getCronExpression(): ?string return $this->cronExpression; } + public function getPolledChannelName(): ?string + { + return $this->polledChannelName; + } + + public function setPolledChannelName(string $polledChannelName): self + { + $copy = $this->createCopy(); + $copy->polledChannelName = $polledChannelName; + + return $copy; + } + public function hasFixedRateExpression(): bool { return $this->fixedRateExpression !== null; diff --git a/packages/Kafka/src/Inbound/InboundMessageConverter.php b/packages/Kafka/src/Inbound/InboundMessageConverter.php index 8c0679c98..94714f00f 100644 --- a/packages/Kafka/src/Inbound/InboundMessageConverter.php +++ b/packages/Kafka/src/Inbound/InboundMessageConverter.php @@ -35,7 +35,7 @@ public function toMessage( ConversionService $conversionService, BatchCommitCoordinator $batchCommitCoordinator, ): MessageBuilder { - $kafkaConsumerConfiguration = $this->kafkaAdmin->getRdKafkaConfiguration($endpointId, $channelName); + $kafkaConsumerConfiguration = $this->kafkaAdmin->getRdKafkaConfiguration($channelName); $headerMapper = $kafkaConsumerConfiguration->getHeaderMapper(); $acknowledgeMode = $kafkaConsumerConfiguration->getAcknowledgeMode(); @@ -49,6 +49,7 @@ public function toMessage( $this->loggingGateway, $this->kafkaAdmin, $endpointId, + $channelName, $this->finalFailureStrategy, $acknowledgeMode === KafkaAcknowledgementCallback::AUTO_ACK, $batchCommitCoordinator, diff --git a/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php b/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php index fb3ec2399..236b8f829 100644 --- a/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php +++ b/packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php @@ -29,6 +29,7 @@ private function __construct( private LoggingGateway $loggingGateway, private KafkaAdmin $kafkaAdmin, private string $endpointId, + private string $channelName, private BatchCommitCoordinator $batchCommitCoordinator, ) { } @@ -39,6 +40,7 @@ public static function create( LoggingGateway $loggingGateway, KafkaAdmin $kafkaAdmin, string $endpointId, + string $channelName, FinalFailureStrategy $finalFailureStrategy, bool $isAutoAcked, BatchCommitCoordinator $batchCommitCoordinator, @@ -51,6 +53,7 @@ public static function create( $loggingGateway, $kafkaAdmin, $endpointId, + $channelName, $batchCommitCoordinator, ); } @@ -105,8 +108,8 @@ public function reject(): void public function resend(): void { try { - $this->kafkaAdmin->getProducer($this->endpointId); - $topic = $this->kafkaAdmin->getTopicForProducer($this->endpointId); + $this->kafkaAdmin->getProducer($this->channelName); + $topic = $this->kafkaAdmin->getTopicForProducer($this->channelName); $topic->producev( $this->message->partition, 0, diff --git a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php index 041c2e967..c432f1cc7 100644 --- a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php +++ b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php @@ -34,7 +34,8 @@ public function __construct( public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message { $endpointId = $pollingMetadata->getEndpointId(); - $consumer = $this->kafkaAdmin->getConsumer($endpointId, $this->channelName); + $channelName = $pollingMetadata->getPolledChannelName() ?? $this->channelName; + $consumer = $this->kafkaAdmin->getConsumer($endpointId, $channelName); if ($this->batchCommitCoordinator === null || $this->batchCommitCoordinator->consumer !== $consumer) { $this->batchCommitCoordinator = new BatchCommitCoordinator( @@ -60,7 +61,7 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) { return $this->inboundMessageConverter->toMessage( $endpointId, - $this->channelName, + $channelName, $consumer, $message, $this->conversionService, diff --git a/packages/Kafka/tests/Integration/DynamicChannelRetryTest.php b/packages/Kafka/tests/Integration/DynamicChannelRetryTest.php new file mode 100644 index 000000000..ec6bfaa3d --- /dev/null +++ b/packages/Kafka/tests/Integration/DynamicChannelRetryTest.php @@ -0,0 +1,97 @@ +toRfc4122(); + $topicTenantB = 'topic_async_tenant_b_' . Uuid::v7()->toRfc4122(); + $handler = new DynamicChannelRetryHandler(); + + $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( + [DynamicChannelRetryHandler::class], + [$handler, KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::KAFKA_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + KafkaMessageChannelBuilder::create( + channelName: 'async_tenant_a', + topicName: $topicTenantA, + ) + ->withFinalFailureStrategy(FinalFailureStrategy::RESEND) + ->withReceiveTimeout(3000), + KafkaMessageChannelBuilder::create( + channelName: 'async_tenant_b', + topicName: $topicTenantB, + ) + ->withFinalFailureStrategy(FinalFailureStrategy::RESEND) + ->withReceiveTimeout(3000), + DynamicMessageChannelBuilder::createRoundRobin( + thisMessageChannelName: 'async', + channelNames: ['async_tenant_a', 'async_tenant_b'], + ), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotoneTestSupport->sendCommandWithRoutingKey('execute.dynamic', 'test_message'); + + $ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + maxExecutionTimeInMilliseconds: 10000, + failAtError: false, + )); + + self::assertTrue($handler->failedOnce, 'Handler should have failed on first attempt'); + + $ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + maxExecutionTimeInMilliseconds: 10000, + failAtError: true, + )); + + self::assertTrue($handler->succeeded, 'Handler should have succeeded on retry after resend'); + } +} + +class DynamicChannelRetryHandler +{ + public bool $failedOnce = false; + public bool $succeeded = false; + + #[Asynchronous('async')] + #[CommandHandler('execute.dynamic', 'dynamic_endpoint')] + public function handle(string $command): void + { + if (! $this->failedOnce) { + $this->failedOnce = true; + throw new Exception('Simulated failure to trigger resend'); + } + $this->succeeded = true; + } +}