Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Ecotone\Messaging\Handler\Recoverability\RetryRunner;
use Ecotone\Messaging\Handler\Recoverability\RetryTemplateBuilder;
use Ecotone\Messaging\Message;
use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker;
use Enqueue\Dbal\DbalConnectionFactory;
use Enqueue\Dbal\DbalContext;
use Enqueue\Dbal\ManagerRegistryConnectionFactory;
Expand All @@ -34,7 +35,7 @@ class DbalTransactionInterceptor
* @param array<string, DbalConnectionFactory|ManagerRegistryConnectionFactory> $connectionFactories
* @param string[] $disableTransactionOnAsynchronousEndpoints
*/
public function __construct(private array $connectionFactories, private array $disableTransactionOnAsynchronousEndpoints, private RetryRunner $retryRunner, private LoggingGateway $logger)
public function __construct(private array $connectionFactories, private array $disableTransactionOnAsynchronousEndpoints, private RetryRunner $retryRunner, private LoggingGateway $logger, private TransactionStatusTracker $transactionStatusTracker)
{
}

Expand Down Expand Up @@ -88,6 +89,12 @@ public function transactional(MethodInvocation $methodInvocation, Message $messa
}, $retryStrategy, $message, ConnectionException::class, 'Starting Database transaction has failed due to network work, retrying in order to self heal.');
$this->logger->info('Database Transaction started', $message);
}

$transactionStarted = $connections !== [];
if ($transactionStarted) {
$this->transactionStatusTracker->markAsInsideTransaction();
}

try {
$result = $methodInvocation->proceed();

Expand Down Expand Up @@ -134,6 +141,10 @@ public function transactional(MethodInvocation $methodInvocation, Message $messa
}

throw $exception;
} finally {
if ($transactionStarted) {
$this->transactionStatusTracker->markAsOutsideTransaction();
}
}

return $result;
Expand Down
2 changes: 2 additions & 0 deletions packages/Dbal/src/DbalTransaction/DbalTransactionModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Ecotone\Messaging\Handler\Recoverability\RetryRunner;
use Ecotone\Messaging\Precedence;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker;
use Ecotone\Projecting\Config\ProjectingConsoleCommands;
use Enqueue\Dbal\DbalConnectionFactory;

Expand Down Expand Up @@ -67,6 +68,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
$dbalConfiguration->getDisabledTransactionsOnAsynchronousEndpointNames(),
new Reference(RetryRunner::class),
new Reference(LoggingGateway::class),
new Reference(TransactionStatusTracker::class),
]);

$messagingConfiguration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Dbal\Fixture\InstantRetryTransaction;

use Ecotone\Dbal\DbalReconnectableConnectionFactory;
use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\CommandBus;
use Enqueue\Dbal\DbalContext;
use Interop\Queue\ConnectionFactory;

/**
* licence Apache-2.0
*/
final class CommandDispatchingAsyncHandler
{
private int $commandHandlerCallCount = 0;

public function __construct(private ConnectionFactory $connectionFactory)
{
}

#[Asynchronous('async')]
#[CommandHandler('dispatch.sql.command', endpointId: 'dispatchSqlCommandEndpoint')]
public function dispatch(string $payload, CommandBus $commandBus): void
{
$commandBus->sendWithRouting('execute.sql.command', $payload);
}

#[CommandHandler('execute.sql.command')]
public function executeSqlCommand(string $payload): void
{
$connection = $this->getTransactionalConnection();

$this->commandHandlerCallCount++;
if ($this->commandHandlerCallCount === 1) {
$connection->executeStatement("INSERT INTO persons (person_id, name) VALUES (1, 'duplicate')");
}

$connection->executeStatement(
"INSERT INTO persons (person_id, name) VALUES (?, ?)",
[$this->commandHandlerCallCount + 100, 'attempt-' . $this->commandHandlerCallCount],
);
}

public function getCommandHandlerCallCount(): int
{
return $this->commandHandlerCallCount;
}

private function getTransactionalConnection(): \Doctrine\DBAL\Connection
{
$factory = CachedConnectionFactory::createFor(
new DbalReconnectableConnectionFactory($this->connectionFactory),
);

/** @var DbalContext $context */
$context = $factory->createContext();

return $context->getDbalConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
use Exception;
use PHPUnit\Framework\Attributes\Group;
use Test\Ecotone\Dbal\DbalMessagingTestCase;
use Ecotone\Modelling\Config\InstantRetry\InstantRetryConfiguration;
use Test\Ecotone\Dbal\Fixture\ConnectionBreakingConfiguration;
use Test\Ecotone\Dbal\Fixture\ConnectionBreakingModule;
use Test\Ecotone\Dbal\Fixture\InstantRetryTransaction\CommandDispatchingAsyncHandler;
use Test\Ecotone\Dbal\Fixture\ORM\FailureMode\MultipleInternalCommandsService;
use Test\Ecotone\Dbal\Fixture\ORM\Person\Person;

Expand Down Expand Up @@ -437,4 +439,99 @@ public function test_turning_off_transactions_for_polling_consumer()
}
$this->assertFalse($aggregateCommitted);
}

public function test_command_bus_instant_retry_inside_async_transaction_retries_on_fresh_state(): void
{
if ($this->isUsingSqlite()) {
$this->markTestSkipped('SQLite does not enforce transaction abort on SQL errors like PostgreSQL');
}

$connectionFactory = $this->getConnectionFactory();
$handler = new CommandDispatchingAsyncHandler($connectionFactory);

$connection = $connectionFactory->createContext()->getDbalConnection();
$connection->executeStatement("INSERT INTO persons (person_id, name) VALUES (1, 'pre-existing')");

$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[CommandDispatchingAsyncHandler::class],
[
$handler,
DbalConnectionFactory::class => $connectionFactory,
],
ServiceConfiguration::createWithDefaults()
->withExtensionObjects([
DbalConfiguration::createWithDefaults()
->withTransactionOnAsynchronousEndpoints(true)
->withTransactionOnCommandBus(false),
DbalBackedMessageChannelBuilder::create('async'),
InstantRetryConfiguration::createWithDefaults()
->withCommandBusRetry(isEnabled: true, retryTimes: 3),
])
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
ModulePackageList::ASYNCHRONOUS_PACKAGE,
ModulePackageList::DBAL_PACKAGE,
])),
);

$ecotoneLite->sendCommandWithRoutingKey('dispatch.sql.command', 'test');
$ecotoneLite->run('async', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1, failAtError: false));

$this->assertSame(
1,
$handler->getCommandHandlerCallCount(),
'CommandBus InstantRetry should be skipped when running inside an already active async endpoint transaction. '
. 'Expected command handler to be called once (no retries inside broken transaction), but it was called ' . $handler->getCommandHandlerCallCount() . ' times.',
);
}

public function test_instant_retry_on_async_endpoint_retries_after_transaction_rollback_on_fresh_state(): void
{
if ($this->isUsingSqlite()) {
$this->markTestSkipped('SQLite does not enforce transaction abort on SQL errors like PostgreSQL');
}

$connectionFactory = $this->getConnectionFactory();
$handler = new CommandDispatchingAsyncHandler($connectionFactory);

$connection = $connectionFactory->createContext()->getDbalConnection();
$connection->executeStatement("INSERT INTO persons (person_id, name) VALUES (1, 'pre-existing')");

$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[CommandDispatchingAsyncHandler::class],
[
$handler,
DbalConnectionFactory::class => $connectionFactory,
],
ServiceConfiguration::createWithDefaults()
->withExtensionObjects([
DbalConfiguration::createWithDefaults()
->withTransactionOnAsynchronousEndpoints(true)
->withTransactionOnCommandBus(false),
DbalBackedMessageChannelBuilder::create('async'),
InstantRetryConfiguration::createWithDefaults()
->withAsynchronousEndpointsRetry(isEnabled: true, retryTimes: 3),
])
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
ModulePackageList::ASYNCHRONOUS_PACKAGE,
ModulePackageList::DBAL_PACKAGE,
])),
);

$ecotoneLite->sendCommandWithRoutingKey('dispatch.sql.command', 'test');
$ecotoneLite->run('async', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1, failAtError: false));

$this->assertSame(
2,
$handler->getCommandHandlerCallCount(),
'Expected retry to happen after transaction rollback. '
. 'Handler was called ' . $handler->getCommandHandlerCallCount() . ' times.',
);

$result = $connection->executeQuery('SELECT name FROM persons WHERE person_id = 102')->fetchOne();
$this->assertSame(
'attempt-2',
$result,
'Retry should have succeeded on fresh transaction state, inserting person_id=102.',
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Ecotone\Modelling\Config\DatabaseTransaction;

/**
* licence Apache-2.0
*/
final class TransactionStatusTracker
{
public function __construct(
private bool $isInsideTransaction
) {
}

public function markAsInsideTransaction(): void
{
$this->isInsideTransaction = true;
}

public function markAsOutsideTransaction(): void
{
$this->isInsideTransaction = false;
}

public function isInsideTransaction(): bool
{
return $this->isInsideTransaction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Modelling\Attribute\InstantRetry;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker;
use Symfony\Component\Uid\Uuid;

#[ModuleAnnotation]
Expand Down Expand Up @@ -152,7 +153,7 @@ private function registerInterceptor(
?string $relatedEndpointId,
): void {
$instantRetryId = Uuid::v7()->toRfc4122();
$messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class), $relatedEndpointId]));
$messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class), Reference::to(TransactionStatusTracker::class), $relatedEndpointId]));

$messagingConfiguration
->registerAroundMethodInterceptor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
use Ecotone\Messaging\Message;
use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker;
use Exception;

/**
Expand All @@ -15,10 +16,11 @@
class InstantRetryInterceptor
{
public function __construct(
private int $maxRetryAttempts,
private array $exceptions,
private RetryStatusTracker $retryStatusTracker,
private ?string $relatedEndpointId = null,
private int $maxRetryAttempts,
private array $exceptions,
private RetryStatusTracker $retryStatusTracker,
private TransactionStatusTracker $transactionStatusTracker,
private ?string $relatedEndpointId = null,
) {
}

Expand All @@ -34,6 +36,10 @@ public function retry(MethodInvocation $methodInvocation, Message $message, #[Re
return $methodInvocation->proceed();
}

if ($this->transactionStatusTracker->isInsideTransaction()) {
return $methodInvocation->proceed();
}

try {
$isSuccessful = false;
$retries = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Ecotone\Messaging\Handler\Processor\MethodInvoker\AroundInterceptorBuilder;
use Ecotone\Messaging\Precedence;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\Config\DatabaseTransaction\TransactionStatusTracker;
use Symfony\Component\Uid\Uuid;

#[ModuleAnnotation]
Expand Down Expand Up @@ -44,6 +45,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
{
$configuration = ExtensionObjectResolver::resolveUnique(InstantRetryConfiguration::class, $extensionObjects, InstantRetryConfiguration::createWithDefaults());
$messagingConfiguration->registerServiceDefinition(RetryStatusTracker::class, Definition::createFor(RetryStatusTracker::class, [false]));
$messagingConfiguration->registerServiceDefinition(TransactionStatusTracker::class, Definition::createFor(TransactionStatusTracker::class, [false]));

if ($configuration->isEnabledForCommandBus()) {
$this->registerInterceptor($messagingConfiguration, $interfaceToCallRegistry, $configuration->getCommandBusRetryTimes(), $configuration->getCommandBuExceptions(), CommandBus::class, Precedence::GLOBAL_INSTANT_RETRY_PRECEDENCE);
Expand Down Expand Up @@ -83,7 +85,7 @@ private function registerInterceptor(
int $precedence,
): void {
$instantRetryId = Uuid::v7()->toRfc4122();
$messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class)]));
$messagingConfiguration->registerServiceDefinition($instantRetryId, Definition::createFor(InstantRetryInterceptor::class, [$retryAttempt, $exceptions, Reference::to(RetryStatusTracker::class), Reference::to(TransactionStatusTracker::class)]));

$messagingConfiguration
->registerAroundMethodInterceptor(
Expand Down
Loading