From 931dc5a9c2724389cad977ca3fc638bf15c2badc Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Sun, 5 Apr 2026 21:18:02 +0200 Subject: [PATCH] fix: skip InstantRetry when running inside an already active database transaction --- .../DbalTransactionInterceptor.php | 13 ++- .../DbalTransaction/DbalTransactionModule.php | 2 + .../CommandDispatchingAsyncHandler.php | 65 +++++++++++++ ...balTransactionAsynchronousEndpointTest.php | 97 +++++++++++++++++++ .../TransactionStatusTracker.php | 31 ++++++ .../InstantRetryAttributeModule.php | 3 +- .../InstantRetry/InstantRetryInterceptor.php | 14 ++- .../InstantRetry/InstantRetryModule.php | 4 +- 8 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 packages/Dbal/tests/Fixture/InstantRetryTransaction/CommandDispatchingAsyncHandler.php create mode 100644 packages/Ecotone/src/Modelling/Config/DatabaseTransaction/TransactionStatusTracker.php diff --git a/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php b/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php index add8b30a2..5595708ea 100644 --- a/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php +++ b/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php @@ -13,6 +13,7 @@ use Ecotone\Messaging\Handler\Recoverability\RetryRunner; use Ecotone\Messaging\Handler\Recoverability\RetryTemplateBuilder; use Ecotone\Messaging\Message; +use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker; use Enqueue\Dbal\DbalConnectionFactory; use Enqueue\Dbal\DbalContext; use Enqueue\Dbal\ManagerRegistryConnectionFactory; @@ -34,7 +35,7 @@ class DbalTransactionInterceptor * @param array $connectionFactories * @param string[] $disableTransactionOnAsynchronousEndpoints */ - public function __construct(private array $connectionFactories, private array $disableTransactionOnAsynchronousEndpoints, private RetryRunner $retryRunner, private LoggingGateway $logger) + public function __construct(private array $connectionFactories, private array $disableTransactionOnAsynchronousEndpoints, private RetryRunner $retryRunner, private LoggingGateway $logger, private TransactionStatusTracker $transactionStatusTracker) { } @@ -88,6 +89,12 @@ public function transactional(MethodInvocation $methodInvocation, Message $messa }, $retryStrategy, $message, ConnectionException::class, 'Starting Database transaction has failed due to network work, retrying in order to self heal.'); $this->logger->info('Database Transaction started', $message); } + + $transactionStarted = $connections !== []; + if ($transactionStarted) { + $this->transactionStatusTracker->markAsInsideTransaction(); + } + try { $result = $methodInvocation->proceed(); @@ -134,6 +141,10 @@ public function transactional(MethodInvocation $methodInvocation, Message $messa } throw $exception; + } finally { + if ($transactionStarted) { + $this->transactionStatusTracker->markAsOutsideTransaction(); + } } return $result; diff --git a/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php b/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php index a34ca5a98..f57bfd967 100644 --- a/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php +++ b/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php @@ -20,6 +20,7 @@ use Ecotone\Messaging\Handler\Recoverability\RetryRunner; use Ecotone\Messaging\Precedence; use Ecotone\Modelling\CommandBus; +use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker; use Ecotone\Projecting\Config\ProjectingConsoleCommands; use Enqueue\Dbal\DbalConnectionFactory; @@ -67,6 +68,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $dbalConfiguration->getDisabledTransactionsOnAsynchronousEndpointNames(), new Reference(RetryRunner::class), new Reference(LoggingGateway::class), + new Reference(TransactionStatusTracker::class), ]); $messagingConfiguration diff --git a/packages/Dbal/tests/Fixture/InstantRetryTransaction/CommandDispatchingAsyncHandler.php b/packages/Dbal/tests/Fixture/InstantRetryTransaction/CommandDispatchingAsyncHandler.php new file mode 100644 index 000000000..79c0fc592 --- /dev/null +++ b/packages/Dbal/tests/Fixture/InstantRetryTransaction/CommandDispatchingAsyncHandler.php @@ -0,0 +1,65 @@ +sendWithRouting('execute.sql.command', $payload); + } + + #[CommandHandler('execute.sql.command')] + public function executeSqlCommand(string $payload): void + { + $connection = $this->getTransactionalConnection(); + + $this->commandHandlerCallCount++; + if ($this->commandHandlerCallCount === 1) { + $connection->executeStatement("INSERT INTO persons (person_id, name) VALUES (1, 'duplicate')"); + } + + $connection->executeStatement( + "INSERT INTO persons (person_id, name) VALUES (?, ?)", + [$this->commandHandlerCallCount + 100, 'attempt-' . $this->commandHandlerCallCount], + ); + } + + public function getCommandHandlerCallCount(): int + { + return $this->commandHandlerCallCount; + } + + private function getTransactionalConnection(): \Doctrine\DBAL\Connection + { + $factory = CachedConnectionFactory::createFor( + new DbalReconnectableConnectionFactory($this->connectionFactory), + ); + + /** @var DbalContext $context */ + $context = $factory->createContext(); + + return $context->getDbalConnection(); + } +} diff --git a/packages/Dbal/tests/Integration/Transaction/DbalTransactionAsynchronousEndpointTest.php b/packages/Dbal/tests/Integration/Transaction/DbalTransactionAsynchronousEndpointTest.php index eba91f665..363fbbafe 100644 --- a/packages/Dbal/tests/Integration/Transaction/DbalTransactionAsynchronousEndpointTest.php +++ b/packages/Dbal/tests/Integration/Transaction/DbalTransactionAsynchronousEndpointTest.php @@ -19,8 +19,10 @@ use Exception; use PHPUnit\Framework\Attributes\Group; use Test\Ecotone\Dbal\DbalMessagingTestCase; +use Ecotone\Modelling\Config\InstantRetry\InstantRetryConfiguration; use Test\Ecotone\Dbal\Fixture\ConnectionBreakingConfiguration; use Test\Ecotone\Dbal\Fixture\ConnectionBreakingModule; +use Test\Ecotone\Dbal\Fixture\InstantRetryTransaction\CommandDispatchingAsyncHandler; use Test\Ecotone\Dbal\Fixture\ORM\FailureMode\MultipleInternalCommandsService; use Test\Ecotone\Dbal\Fixture\ORM\Person\Person; @@ -437,4 +439,99 @@ public function test_turning_off_transactions_for_polling_consumer() } $this->assertFalse($aggregateCommitted); } + + public function test_command_bus_instant_retry_inside_async_transaction_retries_on_fresh_state(): void + { + if ($this->isUsingSqlite()) { + $this->markTestSkipped('SQLite does not enforce transaction abort on SQL errors like PostgreSQL'); + } + + $connectionFactory = $this->getConnectionFactory(); + $handler = new CommandDispatchingAsyncHandler($connectionFactory); + + $connection = $connectionFactory->createContext()->getDbalConnection(); + $connection->executeStatement("INSERT INTO persons (person_id, name) VALUES (1, 'pre-existing')"); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [CommandDispatchingAsyncHandler::class], + [ + $handler, + DbalConnectionFactory::class => $connectionFactory, + ], + ServiceConfiguration::createWithDefaults() + ->withExtensionObjects([ + DbalConfiguration::createWithDefaults() + ->withTransactionOnAsynchronousEndpoints(true) + ->withTransactionOnCommandBus(false), + DbalBackedMessageChannelBuilder::create('async'), + InstantRetryConfiguration::createWithDefaults() + ->withCommandBusRetry(isEnabled: true, retryTimes: 3), + ]) + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ModulePackageList::DBAL_PACKAGE, + ])), + ); + + $ecotoneLite->sendCommandWithRoutingKey('dispatch.sql.command', 'test'); + $ecotoneLite->run('async', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1, failAtError: false)); + + $this->assertSame( + 1, + $handler->getCommandHandlerCallCount(), + 'CommandBus InstantRetry should be skipped when running inside an already active async endpoint transaction. ' + . 'Expected command handler to be called once (no retries inside broken transaction), but it was called ' . $handler->getCommandHandlerCallCount() . ' times.', + ); + } + + public function test_instant_retry_on_async_endpoint_retries_after_transaction_rollback_on_fresh_state(): void + { + if ($this->isUsingSqlite()) { + $this->markTestSkipped('SQLite does not enforce transaction abort on SQL errors like PostgreSQL'); + } + + $connectionFactory = $this->getConnectionFactory(); + $handler = new CommandDispatchingAsyncHandler($connectionFactory); + + $connection = $connectionFactory->createContext()->getDbalConnection(); + $connection->executeStatement("INSERT INTO persons (person_id, name) VALUES (1, 'pre-existing')"); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [CommandDispatchingAsyncHandler::class], + [ + $handler, + DbalConnectionFactory::class => $connectionFactory, + ], + ServiceConfiguration::createWithDefaults() + ->withExtensionObjects([ + DbalConfiguration::createWithDefaults() + ->withTransactionOnAsynchronousEndpoints(true) + ->withTransactionOnCommandBus(false), + DbalBackedMessageChannelBuilder::create('async'), + InstantRetryConfiguration::createWithDefaults() + ->withAsynchronousEndpointsRetry(isEnabled: true, retryTimes: 3), + ]) + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ModulePackageList::DBAL_PACKAGE, + ])), + ); + + $ecotoneLite->sendCommandWithRoutingKey('dispatch.sql.command', 'test'); + $ecotoneLite->run('async', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1, failAtError: false)); + + $this->assertSame( + 2, + $handler->getCommandHandlerCallCount(), + 'Expected retry to happen after transaction rollback. ' + . 'Handler was called ' . $handler->getCommandHandlerCallCount() . ' times.', + ); + + $result = $connection->executeQuery('SELECT name FROM persons WHERE person_id = 102')->fetchOne(); + $this->assertSame( + 'attempt-2', + $result, + 'Retry should have succeeded on fresh transaction state, inserting person_id=102.', + ); + } } diff --git a/packages/Ecotone/src/Modelling/Config/DatabaseTransaction/TransactionStatusTracker.php b/packages/Ecotone/src/Modelling/Config/DatabaseTransaction/TransactionStatusTracker.php new file mode 100644 index 000000000..cc40c170d --- /dev/null +++ b/packages/Ecotone/src/Modelling/Config/DatabaseTransaction/TransactionStatusTracker.php @@ -0,0 +1,31 @@ +isInsideTransaction = true; + } + + public function markAsOutsideTransaction(): void + { + $this->isInsideTransaction = false; + } + + public function isInsideTransaction(): bool + { + return $this->isInsideTransaction; + } +} diff --git a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php index ceb67e1ae..6b8d36c19 100644 --- a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php +++ b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php @@ -21,6 +21,7 @@ use Ecotone\Messaging\Support\LicensingException; use Ecotone\Modelling\Attribute\InstantRetry; use Ecotone\Modelling\CommandBus; +use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker; use Symfony\Component\Uid\Uuid; #[ModuleAnnotation] @@ -152,7 +153,7 @@ private function registerInterceptor( ?string $relatedEndpointId, ): void { $instantRetryId = Uuid::v7()->toRfc4122(); - $messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class), $relatedEndpointId])); + $messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class), Reference::to(TransactionStatusTracker::class), $relatedEndpointId])); $messagingConfiguration ->registerAroundMethodInterceptor( diff --git a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryInterceptor.php b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryInterceptor.php index 76fe0afb3..482f4fb60 100644 --- a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryInterceptor.php +++ b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryInterceptor.php @@ -7,6 +7,7 @@ use Ecotone\Messaging\Handler\Logger\LoggingGateway; use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation; use Ecotone\Messaging\Message; +use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker; use Exception; /** @@ -15,10 +16,11 @@ class InstantRetryInterceptor { public function __construct( - private int $maxRetryAttempts, - private array $exceptions, - private RetryStatusTracker $retryStatusTracker, - private ?string $relatedEndpointId = null, + private int $maxRetryAttempts, + private array $exceptions, + private RetryStatusTracker $retryStatusTracker, + private TransactionStatusTracker $transactionStatusTracker, + private ?string $relatedEndpointId = null, ) { } @@ -34,6 +36,10 @@ public function retry(MethodInvocation $methodInvocation, Message $message, #[Re return $methodInvocation->proceed(); } + if ($this->transactionStatusTracker->isInsideTransaction()) { + return $methodInvocation->proceed(); + } + try { $isSuccessful = false; $retries = 0; diff --git a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryModule.php b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryModule.php index b9c60dd2f..54f9d8fd5 100644 --- a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryModule.php +++ b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryModule.php @@ -17,6 +17,7 @@ use Ecotone\Messaging\Handler\Processor\MethodInvoker\AroundInterceptorBuilder; use Ecotone\Messaging\Precedence; use Ecotone\Modelling\CommandBus; +use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker; use Symfony\Component\Uid\Uuid; #[ModuleAnnotation] @@ -44,6 +45,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO { $configuration = ExtensionObjectResolver::resolveUnique(InstantRetryConfiguration::class, $extensionObjects, InstantRetryConfiguration::createWithDefaults()); $messagingConfiguration->registerServiceDefinition(RetryStatusTracker::class, Definition::createFor(RetryStatusTracker::class, [false])); + $messagingConfiguration->registerServiceDefinition(TransactionStatusTracker::class, Definition::createFor(TransactionStatusTracker::class, [false])); if ($configuration->isEnabledForCommandBus()) { $this->registerInterceptor($messagingConfiguration, $interfaceToCallRegistry, $configuration->getCommandBusRetryTimes(), $configuration->getCommandBuExceptions(), CommandBus::class, Precedence::GLOBAL_INSTANT_RETRY_PRECEDENCE); @@ -83,7 +85,7 @@ private function registerInterceptor( int $precedence, ): void { $instantRetryId = Uuid::v7()->toRfc4122(); - $messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class)])); + $messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class), Reference::to(TransactionStatusTracker::class)])); $messagingConfiguration ->registerAroundMethodInterceptor(