Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .claude/commands/create-pr.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ Create a GitHub Pull Request for the current branch using the repository's PR te

### Step 1: Gather Context

1. Run `git log main...HEAD --oneline` to see all commits on this branch
2. Run `git diff main...HEAD --stat` to see which files changed
3. Run `git diff main...HEAD` to read the actual code changes
4. Read the PR template at `.github/PULL_REQUEST_TEMPLATE.md`
1.Read the PR template at `.github/PULL_REQUEST_TEMPLATE.md`

### Step 2: Understand the "Why"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Doctrine\DBAL\Exception\ConnectionException;
use Ecotone\Dbal\DbalReconnectableConnectionFactory;
use Ecotone\Enqueue\CachedConnectionFactory;
use Ecotone\Messaging\Attribute\WithoutDatabaseTransaction;
use Ecotone\Messaging\Endpoint\PollingMetadata;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
Expand Down Expand Up @@ -37,8 +38,12 @@ public function __construct(private array $connectionFactories, private array $d
{
}

public function transactional(MethodInvocation $methodInvocation, Message $message, ?DbalTransaction $DbalTransaction, ?PollingMetadata $pollingMetadata)
public function transactional(MethodInvocation $methodInvocation, Message $message, ?DbalTransaction $DbalTransaction, ?PollingMetadata $pollingMetadata, ?WithoutDatabaseTransaction $withoutDatabaseTransaction = null)
{
if ($withoutDatabaseTransaction !== null) {
return $methodInvocation->proceed();
}

$endpointId = $pollingMetadata?->getEndpointId();

$connections = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
if ($dbalConfiguration->isTransactionOnConsoleCommands()) {
$pointcut .= '||(' . ConsoleCommand::class . ')';
}
$pointcut = '(' . $pointcut . ')&&not(' . WithoutDbalTransaction::class . ')';
$pointcut .= '&&not('. ProjectingConsoleCommands::class . '::backfillProjection)';
$connectionFactories = $dbalConfiguration->getDefaultConnectionReferenceNames() ?: [DbalConnectionFactory::class];

Expand Down
15 changes: 0 additions & 15 deletions packages/Dbal/src/DbalTransaction/WithoutDbalTransaction.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Test\Ecotone\Dbal\Integration\Transaction;

use Ecotone\Dbal\DbalConnection;
use Ecotone\Dbal\DbalTransaction\WithoutDbalTransaction;
use Ecotone\Messaging\Attribute\WithoutDatabaseTransaction;
use Ecotone\Dbal\MultiTenant\MultiTenantConfiguration;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Lite\Test\FlowTestSupport;
Expand Down Expand Up @@ -127,7 +127,7 @@ public function test_it_can_disable_transactions_on_interface(): void
$consoleCommands = new class () {
public bool $prepared = false;
#[CommandHandler('command.prepare')]
#[WithoutDbalTransaction]
#[WithoutDatabaseTransaction]
public function prepare(#[Reference] DbalConnectionFactory $dbalConnectionFactory): void
{
$dbalConnectionFactory->createContext()->getDbalConnection()->executeStatement(<<<SQL
Expand All @@ -140,7 +140,7 @@ public function prepare(#[Reference] DbalConnectionFactory $dbalConnectionFactor
}

#[ConsoleCommand('console.register.nontransactional')]
#[WithoutDbalTransaction]
#[WithoutDatabaseTransaction]
public function nontransactional(string $orderId, #[Reference] DbalConnectionFactory $dbalConnectionFactory): void
{
$dbalConnectionFactory->createContext()->getDbalConnection()->executeStatement(<<<SQL
Expand Down
17 changes: 16 additions & 1 deletion packages/Ecotone/src/Messaging/Attribute/Asynchronous.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,30 @@
class Asynchronous
{
private string|array $channelName;
/** @var AsynchronousEndpointAttribute[] */
private array $endpointAnnotations;

public function __construct(string|array $channelName)
/**
* @param AsynchronousEndpointAttribute[] $endpointAnnotations
*/
public function __construct(string|array $channelName, array $endpointAnnotations = [])
{
Assert::notNullAndEmpty($channelName, 'Channel name can not be empty string');
Assert::allInstanceOfType($endpointAnnotations, AsynchronousEndpointAttribute::class);
$this->channelName = $channelName;
$this->endpointAnnotations = $endpointAnnotations;
}

public function getChannelName(): array
{
return is_string($this->channelName) ? [$this->channelName] : $this->channelName;
}

/**
* @return AsynchronousEndpointAttribute[]
*/
public function getEndpointAnnotations(): array
{
return $this->endpointAnnotations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Attribute;

/**
* licence Apache-2.0
*/
interface AsynchronousEndpointAttribute
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Attribute;

use Attribute;

/**
* licence Apache-2.0
*/
#[Attribute]
class WithoutDatabaseTransaction implements AsynchronousEndpointAttribute
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Attribute;

use Attribute;

/**
* licence Enterprise
*/
#[Attribute]
class WithoutMessageCollector implements AsynchronousEndpointAttribute
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Ecotone\Messaging\Channel\Collector;

use Ecotone\Messaging\Attribute\Parameter\Reference;
use Ecotone\Messaging\Attribute\WithoutMessageCollector;
use Ecotone\Messaging\Config\ConfiguredMessagingSystem;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
Expand All @@ -24,8 +25,12 @@ public function send(
MethodInvocation $methodInvocation,
Message $message,
#[Reference] ConfiguredMessagingSystem $configuredMessagingSystem,
#[Reference] LoggingGateway $logger
#[Reference] LoggingGateway $logger,
?WithoutMessageCollector $withoutMessageCollector = null,
): mixed {
if ($withoutMessageCollector !== null) {
return $methodInvocation->proceed();
}
/** For example Command Bus inside Command Bus */
if ($this->collectorStorage->isEnabled()) {
return $methodInvocation->proceed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Ecotone\AnnotationFinder\AnnotationFinder;
use Ecotone\AnnotationFinder\AnnotationFinderFactory;
use Ecotone\Lite\Test\TestConfiguration;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\AsynchronousRunningEndpoint;
use Ecotone\Messaging\Channel\ChannelInterceptorBuilder;
use Ecotone\Messaging\Channel\EventDrivenChannelInterceptorAdapter;
Expand Down Expand Up @@ -39,6 +40,8 @@
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Endpoint\ChannelAdapterConsumerBuilder;
use Ecotone\Messaging\Endpoint\MessageHandlerConsumerBuilder;
use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncEndpointAnnotationContext;
use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncHandlerAnnotationRegistry;
use Ecotone\Messaging\Endpoint\PollingMetadata;
use Ecotone\Messaging\Gateway\MessagingEntrypointService;
use Ecotone\Messaging\Handler\Bridge\BridgeBuilder;
Expand All @@ -54,13 +57,15 @@
use Ecotone\Messaging\Handler\Processor\MethodInvoker\InterceptorWithPointCut;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInterceptorBuilder;
use Ecotone\Messaging\Handler\Recoverability\RetryTemplateBuilder;
use Ecotone\Messaging\Handler\Type;
use Ecotone\Messaging\Handler\ServiceActivator\UninterruptibleServiceActivator;
use Ecotone\Messaging\Handler\Transformer\RoutingSlipPrepender;
use Ecotone\Messaging\InMemoryConfigurationVariableService;
use Ecotone\Messaging\MessagingException;
use Ecotone\Messaging\NullableMessageChannel;
use Ecotone\Messaging\PollableChannel;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Modelling\Config\MessageBusChannel;
use Exception;

Expand Down Expand Up @@ -274,6 +279,11 @@ private function prepareAndOptimizeConfiguration(InterfaceToCallRegistry $interf
$this->aroundMethodInterceptors = $this->orderMethodInterceptors($this->aroundMethodInterceptors);
$this->afterCallMethodInterceptors = $this->orderMethodInterceptors($this->afterCallMethodInterceptors);

$this->registerServiceDefinition(
AsyncEndpointAnnotationContext::class,
new Definition(AsyncEndpointAnnotationContext::class)
);

foreach ($this->channelAdapters as $channelAdapter) {
$channelAdapter->withEndpointAnnotations(array_merge($channelAdapter->getEndpointAnnotations(), [new AttributeDefinition(AsynchronousRunningEndpoint::class, [$channelAdapter->getEndpointId()])]));
}
Expand Down Expand Up @@ -305,7 +315,7 @@ private function prepareAndOptimizeConfiguration(InterfaceToCallRegistry $interf
krsort($this->channelInterceptorBuilders);

$this->configureDefaultMessageChannels();
$this->configureAsynchronousEndpoints();
$this->configureAsynchronousEndpoints($interfaceToCallRegistry);

foreach ($this->requiredConsumerEndpointIds as $requiredConsumerEndpointId) {
if (! array_key_exists($requiredConsumerEndpointId, $this->messageHandlerBuilders) && ! array_key_exists($requiredConsumerEndpointId, $this->channelAdapters)) {
Expand Down Expand Up @@ -394,11 +404,11 @@ public function registerChannelInterceptor(ChannelInterceptorBuilder $channelInt
return $this;
}

/**
* @return void
*/
private function configureAsynchronousEndpoints(): void
private function configureAsynchronousEndpoints(InterfaceToCallRegistry $interfaceToCallRegistry): void
{
/** @var array<string, AttributeDefinition[]> $asyncHandlerAnnotations */
$asyncHandlerAnnotations = [];

foreach ($this->asynchronousEndpoints as $targetEndpointId => $asynchronousMessageChannels) {
$asynchronousMessageChannel = array_shift($asynchronousMessageChannels);
if (! isset($this->channelBuilders[$asynchronousMessageChannel]) && ! isset($this->defaultChannelBuilders[$asynchronousMessageChannel])) {
Expand All @@ -413,6 +423,26 @@ private function configureAsynchronousEndpoints(): void
$this->messageHandlerBuilders[$key] = $messageHandlerBuilder->withInputChannelName($handlerExecutionChannel);
$this->registerMessageChannel(SimpleMessageChannelBuilder::createDirectMessageChannel($handlerExecutionChannel));

$handlerInterface = $messageHandlerBuilder->getInterceptedInterface($interfaceToCallRegistry);
/** @var Asynchronous|null $asyncAttribute */
$asyncAttribute = $handlerInterface->findSingleAnnotation(Type::object(Asynchronous::class));
if ($asyncAttribute === null) {
foreach ($messageHandlerBuilder->getEndpointAnnotations() as $annotation) {
if ($annotation->getClassName() === Asynchronous::class) {
$asyncAttribute = $annotation->instance();
break;
}
}
}
$endpointAnnotations = $asyncAttribute ? $asyncAttribute->getEndpointAnnotations() : [];
if ($endpointAnnotations && ! $this->isRunningForEnterpriseLicence) {
throw LicensingException::create("Endpoint annotations on #[Asynchronous] attribute for endpoint `{$targetEndpointId}` require Ecotone Enterprise licence.");
}
$asyncHandlerAnnotations[$handlerExecutionChannel] = array_map(
fn ($a) => AttributeDefinition::fromObject($a),
$endpointAnnotations
);

$consequentialChannels = $asynchronousMessageChannels;
$consequentialChannels[] = $handlerExecutionChannel;
/**
Expand Down Expand Up @@ -452,6 +482,11 @@ private function configureAsynchronousEndpoints(): void
}
}

$this->registerServiceDefinition(
AsyncHandlerAnnotationRegistry::class,
new Definition(AsyncHandlerAnnotationRegistry::class, [$asyncHandlerAnnotations])
);

$asynchronousChannels = array_map(
fn (MessageChannelBuilder $channel) => $channel->getMessageChannelName(),
array_filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Ecotone\Messaging\Config\Container\PollingMetadataReference;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Endpoint\Interceptor\PcntlTerminationListener;
use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncEndpointAnnotationContext;
use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncHandlerAnnotationRegistry;
use Ecotone\Messaging\Endpoint\PollingConsumer\InterceptedConsumerRunner;
use Ecotone\Messaging\Endpoint\PollingConsumer\PollingConsumerErrorChannelInterceptor;
use Ecotone\Messaging\Gateway\MessagingEntrypointService;
Expand Down Expand Up @@ -71,6 +73,8 @@ public function registerConsumer(MessagingContainerBuilder $builder): void
new Reference(LoggingGateway::class),
new Reference(MessagingEntrypointService::class),
new Reference(ExpressionEvaluationService::REFERENCE),
new Reference(AsyncHandlerAnnotationRegistry::class),
new Reference(AsyncEndpointAnnotationContext::class),
]);
$builder->registerPollingEndpoint($this->endpointId, $consumerRunner);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Endpoint\PollingConsumer;

/**
* licence Enterprise
*/
class AsyncEndpointAnnotationContext
{
/** @var object[] */
private array $currentAnnotations = [];

/**
* @param object[] $annotations
*/
public function setAnnotations(array $annotations): void
{
$this->currentAnnotations = $annotations;
}

/**
* @return object[]
*/
public function getCurrentAnnotations(): array
{
return $this->currentAnnotations;
}

public function clear(): void
{
$this->currentAnnotations = [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Endpoint\PollingConsumer;

/**
* licence Enterprise
*/
class AsyncHandlerAnnotationRegistry
{
/**
* @param array<string, object[]> $channelToAnnotations
*/
public function __construct(private array $channelToAnnotations)
{
}

/**
* @return object[]
*/
public function getAnnotationsForChannel(string $channel): array
{
return $this->channelToAnnotations[$channel] ?? [];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public function __construct(
private LoggingGateway $logger,
private MessagingEntrypointService $messagingEntrypoint,
private ExpressionEvaluationService $expressionEvaluationService,
private AsyncHandlerAnnotationRegistry $asyncHandlerAnnotationRegistry,
private AsyncEndpointAnnotationContext $asyncEndpointAnnotationContext,
) {
}

Expand All @@ -51,7 +53,7 @@ public function createConsumer(?ExecutionPollingMetadata $executionPollingMetada
$interceptedConsumer = new ScheduledTaskConsumer(
SyncTaskScheduler::createWithEmptyTriggerContext($this->clock, $pollingMetadata),
$trigger,
new PollToGatewayTaskExecutor($this->messagePoller, $interceptedGateway, $this->messagingEntrypoint),
new PollToGatewayTaskExecutor($this->messagePoller, $interceptedGateway, $this->messagingEntrypoint, $this->asyncHandlerAnnotationRegistry, $this->asyncEndpointAnnotationContext),
);

if ($interceptors) {
Expand Down
Loading
Loading