diff --git a/.claude/commands/create-pr.md b/.claude/commands/create-pr.md index 486ae212e..be5d58508 100644 --- a/.claude/commands/create-pr.md +++ b/.claude/commands/create-pr.md @@ -17,10 +17,7 @@ Create a GitHub Pull Request for the current branch using the repository's PR te ### Step 1: Gather Context -1. Run `git log main...HEAD --oneline` to see all commits on this branch -2. Run `git diff main...HEAD --stat` to see which files changed -3. Run `git diff main...HEAD` to read the actual code changes -4. Read the PR template at `.github/PULL_REQUEST_TEMPLATE.md` +1.Read the PR template at `.github/PULL_REQUEST_TEMPLATE.md` ### Step 2: Understand the "Why" diff --git a/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php b/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php index 56e71168d..add8b30a2 100644 --- a/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php +++ b/packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php @@ -6,6 +6,7 @@ use Doctrine\DBAL\Exception\ConnectionException; use Ecotone\Dbal\DbalReconnectableConnectionFactory; use Ecotone\Enqueue\CachedConnectionFactory; +use Ecotone\Messaging\Attribute\WithoutDatabaseTransaction; use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\Logger\LoggingGateway; use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation; @@ -37,8 +38,12 @@ public function __construct(private array $connectionFactories, private array $d { } - public function transactional(MethodInvocation $methodInvocation, Message $message, ?DbalTransaction $DbalTransaction, ?PollingMetadata $pollingMetadata) + public function transactional(MethodInvocation $methodInvocation, Message $message, ?DbalTransaction $DbalTransaction, ?PollingMetadata $pollingMetadata, ?WithoutDatabaseTransaction $withoutDatabaseTransaction = null) { + if ($withoutDatabaseTransaction !== null) { + return $methodInvocation->proceed(); + } + $endpointId = $pollingMetadata?->getEndpointId(); $connections = []; diff --git a/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php b/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php index 4f1c0df19..a34ca5a98 100644 --- a/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php +++ b/packages/Dbal/src/DbalTransaction/DbalTransactionModule.php @@ -59,7 +59,6 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO if ($dbalConfiguration->isTransactionOnConsoleCommands()) { $pointcut .= '||(' . ConsoleCommand::class . ')'; } - $pointcut = '(' . $pointcut . ')&¬(' . WithoutDbalTransaction::class . ')'; $pointcut .= '&¬('. ProjectingConsoleCommands::class . '::backfillProjection)'; $connectionFactories = $dbalConfiguration->getDefaultConnectionReferenceNames() ?: [DbalConnectionFactory::class]; diff --git a/packages/Dbal/src/DbalTransaction/WithoutDbalTransaction.php b/packages/Dbal/src/DbalTransaction/WithoutDbalTransaction.php deleted file mode 100644 index 1b6c5d6a4..000000000 --- a/packages/Dbal/src/DbalTransaction/WithoutDbalTransaction.php +++ /dev/null @@ -1,15 +0,0 @@ -createContext()->getDbalConnection()->executeStatement(<<createContext()->getDbalConnection()->executeStatement(<<channelName = $channelName; + $this->endpointAnnotations = $endpointAnnotations; } public function getChannelName(): array { return is_string($this->channelName) ? [$this->channelName] : $this->channelName; } + + /** + * @return AsynchronousEndpointAttribute[] + */ + public function getEndpointAnnotations(): array + { + return $this->endpointAnnotations; + } } diff --git a/packages/Ecotone/src/Messaging/Attribute/AsynchronousEndpointAttribute.php b/packages/Ecotone/src/Messaging/Attribute/AsynchronousEndpointAttribute.php new file mode 100644 index 000000000..b865ac4cf --- /dev/null +++ b/packages/Ecotone/src/Messaging/Attribute/AsynchronousEndpointAttribute.php @@ -0,0 +1,12 @@ +proceed(); + } /** For example Command Bus inside Command Bus */ if ($this->collectorStorage->isEnabled()) { return $methodInvocation->proceed(); diff --git a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php index db788097d..417972585 100644 --- a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php +++ b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php @@ -9,6 +9,7 @@ use Ecotone\AnnotationFinder\AnnotationFinder; use Ecotone\AnnotationFinder\AnnotationFinderFactory; use Ecotone\Lite\Test\TestConfiguration; +use Ecotone\Messaging\Attribute\Asynchronous; use Ecotone\Messaging\Attribute\AsynchronousRunningEndpoint; use Ecotone\Messaging\Channel\ChannelInterceptorBuilder; use Ecotone\Messaging\Channel\EventDrivenChannelInterceptorAdapter; @@ -39,6 +40,8 @@ use Ecotone\Messaging\Conversion\ConversionService; use Ecotone\Messaging\Endpoint\ChannelAdapterConsumerBuilder; use Ecotone\Messaging\Endpoint\MessageHandlerConsumerBuilder; +use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncEndpointAnnotationContext; +use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncHandlerAnnotationRegistry; use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Gateway\MessagingEntrypointService; use Ecotone\Messaging\Handler\Bridge\BridgeBuilder; @@ -54,6 +57,7 @@ use Ecotone\Messaging\Handler\Processor\MethodInvoker\InterceptorWithPointCut; use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInterceptorBuilder; use Ecotone\Messaging\Handler\Recoverability\RetryTemplateBuilder; +use Ecotone\Messaging\Handler\Type; use Ecotone\Messaging\Handler\ServiceActivator\UninterruptibleServiceActivator; use Ecotone\Messaging\Handler\Transformer\RoutingSlipPrepender; use Ecotone\Messaging\InMemoryConfigurationVariableService; @@ -61,6 +65,7 @@ use Ecotone\Messaging\NullableMessageChannel; use Ecotone\Messaging\PollableChannel; use Ecotone\Messaging\Support\Assert; +use Ecotone\Messaging\Support\LicensingException; use Ecotone\Modelling\Config\MessageBusChannel; use Exception; @@ -274,6 +279,11 @@ private function prepareAndOptimizeConfiguration(InterfaceToCallRegistry $interf $this->aroundMethodInterceptors = $this->orderMethodInterceptors($this->aroundMethodInterceptors); $this->afterCallMethodInterceptors = $this->orderMethodInterceptors($this->afterCallMethodInterceptors); + $this->registerServiceDefinition( + AsyncEndpointAnnotationContext::class, + new Definition(AsyncEndpointAnnotationContext::class) + ); + foreach ($this->channelAdapters as $channelAdapter) { $channelAdapter->withEndpointAnnotations(array_merge($channelAdapter->getEndpointAnnotations(), [new AttributeDefinition(AsynchronousRunningEndpoint::class, [$channelAdapter->getEndpointId()])])); } @@ -305,7 +315,7 @@ private function prepareAndOptimizeConfiguration(InterfaceToCallRegistry $interf krsort($this->channelInterceptorBuilders); $this->configureDefaultMessageChannels(); - $this->configureAsynchronousEndpoints(); + $this->configureAsynchronousEndpoints($interfaceToCallRegistry); foreach ($this->requiredConsumerEndpointIds as $requiredConsumerEndpointId) { if (! array_key_exists($requiredConsumerEndpointId, $this->messageHandlerBuilders) && ! array_key_exists($requiredConsumerEndpointId, $this->channelAdapters)) { @@ -394,11 +404,11 @@ public function registerChannelInterceptor(ChannelInterceptorBuilder $channelInt return $this; } - /** - * @return void - */ - private function configureAsynchronousEndpoints(): void + private function configureAsynchronousEndpoints(InterfaceToCallRegistry $interfaceToCallRegistry): void { + /** @var array $asyncHandlerAnnotations */ + $asyncHandlerAnnotations = []; + foreach ($this->asynchronousEndpoints as $targetEndpointId => $asynchronousMessageChannels) { $asynchronousMessageChannel = array_shift($asynchronousMessageChannels); if (! isset($this->channelBuilders[$asynchronousMessageChannel]) && ! isset($this->defaultChannelBuilders[$asynchronousMessageChannel])) { @@ -413,6 +423,26 @@ private function configureAsynchronousEndpoints(): void $this->messageHandlerBuilders[$key] = $messageHandlerBuilder->withInputChannelName($handlerExecutionChannel); $this->registerMessageChannel(SimpleMessageChannelBuilder::createDirectMessageChannel($handlerExecutionChannel)); + $handlerInterface = $messageHandlerBuilder->getInterceptedInterface($interfaceToCallRegistry); + /** @var Asynchronous|null $asyncAttribute */ + $asyncAttribute = $handlerInterface->findSingleAnnotation(Type::object(Asynchronous::class)); + if ($asyncAttribute === null) { + foreach ($messageHandlerBuilder->getEndpointAnnotations() as $annotation) { + if ($annotation->getClassName() === Asynchronous::class) { + $asyncAttribute = $annotation->instance(); + break; + } + } + } + $endpointAnnotations = $asyncAttribute ? $asyncAttribute->getEndpointAnnotations() : []; + if ($endpointAnnotations && ! $this->isRunningForEnterpriseLicence) { + throw LicensingException::create("Endpoint annotations on #[Asynchronous] attribute for endpoint `{$targetEndpointId}` require Ecotone Enterprise licence."); + } + $asyncHandlerAnnotations[$handlerExecutionChannel] = array_map( + fn ($a) => AttributeDefinition::fromObject($a), + $endpointAnnotations + ); + $consequentialChannels = $asynchronousMessageChannels; $consequentialChannels[] = $handlerExecutionChannel; /** @@ -452,6 +482,11 @@ private function configureAsynchronousEndpoints(): void } } + $this->registerServiceDefinition( + AsyncHandlerAnnotationRegistry::class, + new Definition(AsyncHandlerAnnotationRegistry::class, [$asyncHandlerAnnotations]) + ); + $asynchronousChannels = array_map( fn (MessageChannelBuilder $channel) => $channel->getMessageChannelName(), array_filter( diff --git a/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php index a650e1da3..313f2cba0 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php @@ -12,6 +12,8 @@ use Ecotone\Messaging\Config\Container\PollingMetadataReference; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Messaging\Endpoint\Interceptor\PcntlTerminationListener; +use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncEndpointAnnotationContext; +use Ecotone\Messaging\Endpoint\PollingConsumer\AsyncHandlerAnnotationRegistry; use Ecotone\Messaging\Endpoint\PollingConsumer\InterceptedConsumerRunner; use Ecotone\Messaging\Endpoint\PollingConsumer\PollingConsumerErrorChannelInterceptor; use Ecotone\Messaging\Gateway\MessagingEntrypointService; @@ -71,6 +73,8 @@ public function registerConsumer(MessagingContainerBuilder $builder): void new Reference(LoggingGateway::class), new Reference(MessagingEntrypointService::class), new Reference(ExpressionEvaluationService::REFERENCE), + new Reference(AsyncHandlerAnnotationRegistry::class), + new Reference(AsyncEndpointAnnotationContext::class), ]); $builder->registerPollingEndpoint($this->endpointId, $consumerRunner); } diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/AsyncEndpointAnnotationContext.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/AsyncEndpointAnnotationContext.php new file mode 100644 index 000000000..cb3531ae2 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/AsyncEndpointAnnotationContext.php @@ -0,0 +1,35 @@ +currentAnnotations = $annotations; + } + + /** + * @return object[] + */ + public function getCurrentAnnotations(): array + { + return $this->currentAnnotations; + } + + public function clear(): void + { + $this->currentAnnotations = []; + } +} diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/AsyncHandlerAnnotationRegistry.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/AsyncHandlerAnnotationRegistry.php new file mode 100644 index 000000000..1aa7ac866 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/AsyncHandlerAnnotationRegistry.php @@ -0,0 +1,26 @@ + $channelToAnnotations + */ + public function __construct(private array $channelToAnnotations) + { + } + + /** + * @return object[] + */ + public function getAnnotationsForChannel(string $channel): array + { + return $this->channelToAnnotations[$channel] ?? []; + } +} diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedConsumerRunner.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedConsumerRunner.php index cfbbf088c..d24a97ad9 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedConsumerRunner.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedConsumerRunner.php @@ -31,6 +31,8 @@ public function __construct( private LoggingGateway $logger, private MessagingEntrypointService $messagingEntrypoint, private ExpressionEvaluationService $expressionEvaluationService, + private AsyncHandlerAnnotationRegistry $asyncHandlerAnnotationRegistry, + private AsyncEndpointAnnotationContext $asyncEndpointAnnotationContext, ) { } @@ -51,7 +53,7 @@ public function createConsumer(?ExecutionPollingMetadata $executionPollingMetada $interceptedConsumer = new ScheduledTaskConsumer( SyncTaskScheduler::createWithEmptyTriggerContext($this->clock, $pollingMetadata), $trigger, - new PollToGatewayTaskExecutor($this->messagePoller, $interceptedGateway, $this->messagingEntrypoint), + new PollToGatewayTaskExecutor($this->messagePoller, $interceptedGateway, $this->messagingEntrypoint, $this->asyncHandlerAnnotationRegistry, $this->asyncEndpointAnnotationContext), ); if ($interceptors) { diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php index 738a26217..406770418 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php @@ -113,6 +113,8 @@ public function registerConsumer(MessagingContainerBuilder $builder, MessageHand new Reference(LoggingGateway::class), new Reference(MessagingEntrypointService::class), new Reference(ExpressionEvaluationService::REFERENCE), + new Reference(AsyncHandlerAnnotationRegistry::class), + new Reference(AsyncEndpointAnnotationContext::class), ]); $builder->registerPollingEndpoint($endpointId, $consumerRunner, $this->withContinuesPolling()); } diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollToGatewayTaskExecutor.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollToGatewayTaskExecutor.php index 79573ca4c..d8d248c5b 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollToGatewayTaskExecutor.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollToGatewayTaskExecutor.php @@ -5,6 +5,7 @@ use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Gateway\MessagingEntrypointService; use Ecotone\Messaging\Handler\NonProxyGateway; +use Ecotone\Messaging\Message; use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\MessagePoller; use Ecotone\Messaging\Scheduling\TaskExecutor; @@ -19,7 +20,9 @@ class PollToGatewayTaskExecutor implements TaskExecutor public function __construct( private MessagePoller $messagePoller, private NonProxyGateway $gateway, - private MessagingEntrypointService $messagingEntrypoint + private MessagingEntrypointService $messagingEntrypoint, + private AsyncHandlerAnnotationRegistry $asyncHandlerAnnotationRegistry, + private AsyncEndpointAnnotationContext $asyncEndpointAnnotationContext, ) { } @@ -34,12 +37,36 @@ public function execute(PollingMetadata $pollingMetadata): void } if ($message) { - $this->gateway->execute([ - MessageBuilder::fromMessage($message) - ->setHeader(MessageHeaders::CONSUMER_POLLING_METADATA, $pollingMetadata) - ->build(), - ]); + $routingSlip = $this->resolveRoutingSlip($message); + if ($routingSlip !== []) { + $this->asyncEndpointAnnotationContext->setAnnotations( + $this->asyncHandlerAnnotationRegistry->getAnnotationsForChannel($routingSlip[0]) + ); + } + try { + $this->gateway->execute([ + MessageBuilder::fromMessage($message) + ->setHeader(MessageHeaders::CONSUMER_POLLING_METADATA, $pollingMetadata) + ->build(), + ]); + } finally { + $this->asyncEndpointAnnotationContext->clear(); + } gc_collect_cycles(); } } + + /** + * @return string[] + */ + private function resolveRoutingSlip(Message $message): array + { + if (! $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP)) { + return []; + } + + $routingSlip = $message->getHeaders()->get(MessageHeaders::ROUTING_SLIP); + + return $routingSlip ? explode(',', $routingSlip) : []; + } } diff --git a/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/Converter/AsyncEndpointAnnotationBuilder.php b/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/Converter/AsyncEndpointAnnotationBuilder.php new file mode 100644 index 000000000..32dfef624 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/Converter/AsyncEndpointAnnotationBuilder.php @@ -0,0 +1,37 @@ +getName() === $this->parameterName; + } + + public function compile(InterfaceToCall $interfaceToCall): Definition + { + return new Definition(AsyncEndpointAnnotationConverter::class, [ + Reference::to(AsyncEndpointAnnotationContext::class), + $this->attributeClassName, + ]); + } +} diff --git a/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/Converter/AsyncEndpointAnnotationConverter.php b/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/Converter/AsyncEndpointAnnotationConverter.php new file mode 100644 index 000000000..292e5db77 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/Converter/AsyncEndpointAnnotationConverter.php @@ -0,0 +1,32 @@ +context->getCurrentAnnotations() as $annotation) { + if ($annotation instanceof $this->attributeClassName) { + return $annotation; + } + } + + return null; + } +} diff --git a/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/MethodArgumentsFactory.php b/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/MethodArgumentsFactory.php index 8ac0dcbd6..d982ef9ba 100644 --- a/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/MethodArgumentsFactory.php +++ b/packages/Ecotone/src/Messaging/Handler/Processor/MethodInvoker/MethodArgumentsFactory.php @@ -2,6 +2,8 @@ namespace Ecotone\Messaging\Handler\Processor\MethodInvoker; +use Ecotone\Messaging\Attribute\AsynchronousEndpointAttribute; +use Ecotone\Messaging\Attribute\AsynchronousRunningEndpoint; use Ecotone\Messaging\Config\Container\AttributeDefinition; use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\InterfaceParameter; @@ -92,7 +94,7 @@ public static function createInterceptedInterfaceAnnotationMethodParameters( /** * @param AttributeDefinition[] $endpointAnnotations */ - public static function getAnnotationValueConverter(InterfaceParameter $interfaceParameter, InterfaceToCall $interceptedInterface, array $endpointAnnotations): AttributeBuilder|AttributeDefinitionBuilder|null + public static function getAnnotationValueConverter(InterfaceParameter $interfaceParameter, InterfaceToCall $interceptedInterface, array $endpointAnnotations): AttributeBuilder|AttributeDefinitionBuilder|Converter\AsyncEndpointAnnotationBuilder|null { $interfaceParameterType = $interfaceParameter->getTypeDescriptor()->withoutNull(); // Endpoint Annotations @@ -131,9 +133,34 @@ public static function getAnnotationValueConverter(InterfaceParameter $interface } } + // Async Endpoint - resolve at runtime from handler annotations via routing slip + if ($interfaceParameter->isAnnotation() + && $interfaceParameterType->isClassOrInterface() + && is_a($interfaceParameterType->toString(), AsynchronousEndpointAttribute::class, true) + && self::hasAsynchronousRunningEndpoint($endpointAnnotations)) { + return new Converter\AsyncEndpointAnnotationBuilder( + $interfaceParameter->getName(), + $interfaceParameterType->toString(), + ); + } + return null; } + /** + * @param AttributeDefinition[] $endpointAnnotations + */ + public static function hasAsynchronousRunningEndpoint(array $endpointAnnotations): bool + { + foreach ($endpointAnnotations as $annotation) { + if ($annotation instanceof AttributeDefinition && $annotation->getClassName() === AsynchronousRunningEndpoint::class) { + return true; + } + } + + return false; + } + /** * @param ParameterConverterBuilder[] $passedMethodParameterConverters * @return bool diff --git a/packages/Ecotone/src/Messaging/Handler/TypeResolver.php b/packages/Ecotone/src/Messaging/Handler/TypeResolver.php index ec8399361..f6124b1c2 100644 --- a/packages/Ecotone/src/Messaging/Handler/TypeResolver.php +++ b/packages/Ecotone/src/Messaging/Handler/TypeResolver.php @@ -10,6 +10,7 @@ use Ecotone\Messaging\Config\Container\AttributeReference; use Ecotone\Messaging\Config\Container\ContainerBuilder; use Ecotone\Messaging\Config\Container\Definition; +use Ecotone\Messaging\Config\Container\DefinitionHelper; use Ecotone\Messaging\Config\Container\InterfaceParameterReference; use Ecotone\Messaging\Config\Container\InterfaceToCallReference; use Ecotone\Messaging\Config\Container\Reference; @@ -216,7 +217,7 @@ private function registerAttributeDefinition(ContainerBuilder $builder, Attribut { $reference = new AttributeReference($attributeDefinition->getClassName(), $className, $methodName); if (! $builder->has($reference)) { - $builder->register($reference, $attributeDefinition); + $builder->register($reference, DefinitionHelper::resolvePotentialComplexAttribute($attributeDefinition)); } return $attributeDefinition; } diff --git a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php index 2c7f1f982..9c2f2e56c 100644 --- a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php @@ -8,6 +8,7 @@ namespace Ecotone\Projecting\Config; use Ecotone\AnnotationFinder\AnnotatedDefinition; +use Ecotone\Messaging\Attribute\Asynchronous; use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Config\Container\Definition; use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; @@ -101,11 +102,24 @@ public function setResetChannel(string $inputChannel): void $this->resetChannel = $inputChannel; } + private ?Asynchronous $asyncAttribute = null; + public function setAsyncChannel(string $asynchronousChannelName): void { $this->asyncChannelName = $asynchronousChannelName; } + public function setAsyncAttribute(Asynchronous $asyncAttribute): void + { + $this->asyncAttribute = $asyncAttribute; + $this->asyncChannelName = $asyncAttribute->getChannelName()[0]; + } + + public function getAsyncAttribute(): ?Asynchronous + { + return $this->asyncAttribute; + } + public function automaticInitialization(): bool { return $this->automaticInitialization; diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index 119fc8df4..a03b02d84 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -107,14 +107,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I rebuildAsyncChannelName: $rebuildAttribute?->asyncChannelName, ); - $asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName); + $asyncAttribute = self::getProjectionAsynchronousAttribute($annotationRegistrationService, $projectionClassName); + $asynchronousChannelName = $asyncAttribute ? $asyncAttribute->getChannelName()[0] : null; $isPolling = $pollingAttribute !== null; $isEventStreaming = $streamingAttribute !== null; self::verifyCorrectApiUsage($isPolling, $asynchronousChannelName, $projectionAttribute, $isEventStreaming, $partitionAttribute !== null); - if ($asynchronousChannelName !== null) { - $projectionBuilder->setAsyncChannel($asynchronousChannelName); + if ($asyncAttribute !== null) { + $projectionBuilder->setAsyncAttribute($asyncAttribute); } if ($isPolling) { @@ -247,14 +248,14 @@ public function getModulePackageName(): string /** * @param class-string $projectionClassName */ - private static function getProjectionAsynchronousChannel(AnnotationFinder $annotationRegistrationService, string $projectionClassName): ?string + private static function getProjectionAsynchronousAttribute(AnnotationFinder $annotationRegistrationService, string $projectionClassName): ?Asynchronous { $attributes = $annotationRegistrationService->getAnnotationsForClass($projectionClassName); foreach ($attributes as $attribute) { if ($attribute instanceof Asynchronous) { $asynchronousChannelName = $attribute->getChannelName(); Assert::isTrue(count($asynchronousChannelName) === 1, "Make use of single channel name in Asynchronous annotation for Projection: {$projectionClassName}"); - return array_pop($asynchronousChannelName); + return $attribute; } } return null; diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index efb99728d..57581bb0d 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -11,6 +11,9 @@ use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Config\Annotation\AnnotationModule; use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ExtensionObjectResolver; +use Ecotone\Messaging\Attribute\Asynchronous; +use Ecotone\Messaging\Attribute\WithoutDatabaseTransaction; +use Ecotone\Messaging\Attribute\WithoutMessageCollector; use Ecotone\Messaging\Config\Configuration; use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Config\Container\AttributeDefinition; @@ -99,25 +102,35 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); - $messagingConfiguration->registerMessageHandler( - MessageProcessorActivatorBuilder::create() - ->chainInterceptedProcessor( - MethodInvokerBuilder::create( - $projectingManagerReference, - InterfaceToCallReference::create(ProjectingManager::class, 'execute'), - [ - $projectionBuilder->partitionHeader() - ? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader()) - : ($projectionBuilder->isPartitioned() - ? new PartitionHeaderBuilder('partitionKeyValue') - : ValueBuilder::create('partitionKeyValue', null)), - HeaderBuilder::createOptional('manualInitialization', ProjectingHeaders::MANUAL_INITIALIZATION), - ], - ) + $handlerBuilder = MessageProcessorActivatorBuilder::create() + ->chainInterceptedProcessor( + MethodInvokerBuilder::create( + $projectingManagerReference, + InterfaceToCallReference::create(ProjectingManager::class, 'execute'), + [ + $projectionBuilder->partitionHeader() + ? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader()) + : ($projectionBuilder->isPartitioned() + ? new PartitionHeaderBuilder('partitionKeyValue') + : ValueBuilder::create('partitionKeyValue', null)), + HeaderBuilder::createOptional('manualInitialization', ProjectingHeaders::MANUAL_INITIALIZATION), + ], ) - ->withEndpointId(self::endpointIdForProjection($projectionName)) - ->withInputChannelName(self::inputChannelForProjectingManager($projectionName)) - ); + ) + ->withEndpointId(self::endpointIdForProjection($projectionName)) + ->withInputChannelName(self::inputChannelForProjectingManager($projectionName)); + + $asyncAttribute = $projectionBuilder instanceof EcotoneProjectionExecutorBuilder ? $projectionBuilder->getAsyncAttribute() : null; + if ($asyncAttribute !== null) { + $handlerBuilder = $handlerBuilder->withEndpointAnnotations([ + AttributeDefinition::fromObject(new Asynchronous( + $asyncAttribute->getChannelName(), + array_merge($asyncAttribute->getEndpointAnnotations(), [new WithoutDatabaseTransaction(), new WithoutMessageCollector()]), + )), + ]); + } + + $messagingConfiguration->registerMessageHandler($handlerBuilder); $messagingConfiguration->registerMessageHandler( MessageProcessorActivatorBuilder::create() diff --git a/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php b/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php new file mode 100644 index 000000000..98ab55c05 --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php @@ -0,0 +1,332 @@ +receivedAttribute = null; + + $handler = new class () { + #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('test-value')])] + #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] + public function handle(string $payload): void + { + } + }; + + $interceptor = new class ($collector) { + public function __construct(private \stdClass $collector) + { + } + + #[Around(pointcut: AsynchronousRunningEndpoint::class)] + public function intercept(MethodInvocation $methodInvocation, ?CustomAsyncAttribute $attr = null): mixed + { + $this->collector->receivedAttribute = $attr; + + return $methodInvocation->proceed(); + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$handler::class, $interceptor::class], + [$handler, $interceptor], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotoneLite->sendCommandWithRoutingKey('doWork', 'test'); + $ecotoneLite->run('async'); + + $this->assertNotNull($collector->receivedAttribute); + $this->assertInstanceOf(CustomAsyncAttribute::class, $collector->receivedAttribute); + $this->assertSame('test-value', $collector->receivedAttribute->value); + } + + public function test_before_interceptor_receives_handler_attribute_on_async_endpoint(): void + { + $collector = new \stdClass(); + $collector->receivedAttribute = null; + + $handler = new class () { + #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('before-value')])] + #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] + public function handle(string $payload): void + { + } + }; + + $interceptor = new class ($collector) { + public function __construct(private \stdClass $collector) + { + } + + #[Before(pointcut: AsynchronousRunningEndpoint::class)] + public function intercept(string $payload, ?CustomAsyncAttribute $attr = null): string + { + $this->collector->receivedAttribute = $attr; + + return $payload; + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$handler::class, $interceptor::class], + [$handler, $interceptor], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotoneLite->sendCommandWithRoutingKey('doWork', 'test'); + $ecotoneLite->run('async'); + + $this->assertNotNull($collector->receivedAttribute); + $this->assertInstanceOf(CustomAsyncAttribute::class, $collector->receivedAttribute); + $this->assertSame('before-value', $collector->receivedAttribute->value); + } + + public function test_multiple_handlers_on_same_channel_resolve_correct_attribute(): void + { + $collector = new \stdClass(); + $collector->receivedAttributes = []; + + $handler = new class () { + #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('handler-one')])] + #[CommandHandler('doWorkOne', endpointId: 'doWorkOne.endpoint')] + public function handleOne(string $payload): void + { + } + + #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('handler-two')])] + #[CommandHandler('doWorkTwo', endpointId: 'doWorkTwo.endpoint')] + public function handleTwo(string $payload): void + { + } + }; + + $interceptor = new class ($collector) { + public function __construct(private \stdClass $collector) + { + } + + #[Around(pointcut: AsynchronousRunningEndpoint::class)] + public function intercept(MethodInvocation $methodInvocation, ?CustomAsyncAttribute $attr = null): mixed + { + $this->collector->receivedAttributes[] = $attr; + + return $methodInvocation->proceed(); + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$handler::class, $interceptor::class], + [$handler, $interceptor], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotoneLite->sendCommandWithRoutingKey('doWorkOne', 'test'); + $ecotoneLite->run('async'); + + $ecotoneLite->sendCommandWithRoutingKey('doWorkTwo', 'test'); + $ecotoneLite->run('async'); + + $this->assertCount(2, $collector->receivedAttributes); + $this->assertSame('handler-one', $collector->receivedAttributes[0]->value); + $this->assertSame('handler-two', $collector->receivedAttributes[1]->value); + } + + public function test_handler_without_custom_attribute_returns_null(): void + { + $collector = new \stdClass(); + $collector->receivedAttribute = 'not-set'; + + $handler = new class () { + #[Asynchronous('async')] + #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] + public function handle(string $payload): void + { + } + }; + + $interceptor = new class ($collector) { + public function __construct(private \stdClass $collector) + { + } + + #[Around(pointcut: AsynchronousRunningEndpoint::class)] + public function intercept(MethodInvocation $methodInvocation, ?CustomAsyncAttribute $attr = null): mixed + { + $this->collector->receivedAttribute = $attr; + + return $methodInvocation->proceed(); + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$handler::class, $interceptor::class], + [$handler, $interceptor], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + ] + ); + + $ecotoneLite->sendCommandWithRoutingKey('doWork', 'test'); + $ecotoneLite->run('async'); + + $this->assertNull($collector->receivedAttribute); + } + + public function test_endpoint_annotations_require_enterprise_licence(): void + { + $this->expectException(LicensingException::class); + + $handler = new class () { + #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('test')])] + #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] + public function handle(string $payload): void + { + } + }; + + EcotoneLite::bootstrapFlowTesting( + [$handler::class], + [$handler], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + ], + ); + } + + public function test_collector_holds_events_published_during_async_handler_and_discards_on_failure(): void + { + $handler = new class () { + #[Asynchronous('async')] + #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] + public function handle(string $payload, EventBus $eventBus): void + { + $eventBus->publish(new stdClass()); + throw new RuntimeException('Handler failure after publishing event'); + } + }; + + $eventHandler = new class () { + #[Asynchronous('events')] + #[EventHandler(endpointId: 'event.endpoint')] + public function handle(stdClass $event): void + { + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$handler::class, $eventHandler::class], + [$handler, $eventHandler], + configuration: ServiceConfiguration::createWithDefaults() + ->withExtensionObjects([ + PollableChannelConfiguration::neverRetry('events')->withCollector(true), + ]), + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + SimpleMessageChannelBuilder::createQueueChannel('events'), + ], + ); + + $ecotoneLite->sendCommandWithRoutingKey('doWork', 'test'); + + try { + $ecotoneLite->run('async'); + } catch (RuntimeException) { + } + + $message = $ecotoneLite->getMessageChannel('events')->receive(); + self::assertNull($message, 'Collector should discard events when handler fails'); + } + + public function test_without_message_collector_events_are_sent_directly_and_survive_handler_failure(): void + { + $handler = new class () { + #[Asynchronous('async', endpointAnnotations: [new WithoutMessageCollector()])] + #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] + public function handle(string $payload, EventBus $eventBus): void + { + $eventBus->publish(new stdClass()); + throw new RuntimeException('Handler failure after publishing event'); + } + }; + + $eventHandler = new class () { + #[Asynchronous('events')] + #[EventHandler(endpointId: 'event.endpoint')] + public function handle(stdClass $event): void + { + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$handler::class, $eventHandler::class], + [$handler, $eventHandler], + configuration: ServiceConfiguration::createWithDefaults() + ->withExtensionObjects([ + PollableChannelConfiguration::neverRetry('events')->withCollector(true), + ]), + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + SimpleMessageChannelBuilder::createQueueChannel('events'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotoneLite->sendCommandWithRoutingKey('doWork', 'test'); + + try { + $ecotoneLite->run('async'); + } catch (RuntimeException) { + } + + $message = $ecotoneLite->getMessageChannel('events')->receive(); + self::assertNotNull($message, 'Without collector, events should be sent directly and survive handler failure'); + } +} diff --git a/packages/PdoEventSourcing/tests/Projecting/WithoutDbalTransactionProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/WithoutDbalTransactionProjectionTest.php new file mode 100644 index 000000000..58fb385fa --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/WithoutDbalTransactionProjectionTest.php @@ -0,0 +1,336 @@ +getConnection(); + $collector = new stdClass(); + $collector->callCount = 0; + + $projection = new + #[ProjectionV2('batch_transaction_test')] + #[Asynchronous('async_projection')] + #[ProjectionExecution(eventLoadingBatchSize: 1)] + #[FromStream(Ticket::class)] + class ($connection, $collector) { + public const NAME = 'batch_transaction_test'; + public const CHANNEL = 'async_projection'; + + public function __construct(private Connection $connection, private stdClass $collector) + { + } + + #[QueryHandler('getBatchTransactionTickets')] + public function getTickets(): array + { + return $this->connection->executeQuery('SELECT * FROM batch_transaction_tickets ORDER BY ticket_id ASC')->fetchAllAssociative(); + } + + #[EventHandler] + public function addTicket(TicketWasRegistered $event): void + { + $this->collector->callCount++; + + $this->connection->executeStatement( + 'INSERT INTO batch_transaction_tickets VALUES (?,?)', + [$event->getTicketId(), $event->getTicketType()] + ); + + if ($this->collector->callCount === 2) { + throw new RuntimeException('Simulated failure on second event batch'); + } + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement('CREATE TABLE IF NOT EXISTS batch_transaction_tickets (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))'); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement('DROP TABLE IF EXISTS batch_transaction_tickets'); + } + + #[ProjectionReset] + public function reset(): void + { + $this->connection->executeStatement('DELETE FROM batch_transaction_tickets'); + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], $projection::CHANNEL); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + $ecotone->sendCommand(new RegisterTicket('ticket-1', 'User1', 'alert')); + $ecotone->sendCommand(new RegisterTicket('ticket-2', 'User2', 'info')); + + try { + $ecotone->run($projection::CHANNEL); + } catch (RuntimeException) { + } + + self::assertEquals(2, $collector->callCount); + + $tickets = $ecotone->sendQueryWithRouting('getBatchTransactionTickets'); + self::assertCount(1, $tickets, 'First batch should be committed even though second batch failed'); + self::assertEquals('ticket-1', $tickets[0]['ticket_id']); + } + + public function test_partitioned_async_projection_first_batch_committed_independently_when_second_batch_fails(): void + { + $connection = $this->getConnection(); + $collector = new stdClass(); + $collector->callCount = 0; + + $projection = new + #[ProjectionV2('partitioned_batch_transaction_test')] + #[Partitioned] + #[Asynchronous('async_partitioned_projection')] + #[ProjectionExecution(eventLoadingBatchSize: 1)] + #[FromStream(stream: Ticket::class, aggregateType: Ticket::class)] + class ($connection, $collector) { + public const NAME = 'partitioned_batch_transaction_test'; + public const CHANNEL = 'async_partitioned_projection'; + + public function __construct(private Connection $connection, private stdClass $collector) + { + } + + #[QueryHandler('getPartitionedBatchTransactionTickets')] + public function getTickets(): array + { + return $this->connection->executeQuery('SELECT * FROM partitioned_batch_transaction_tickets ORDER BY ticket_id ASC')->fetchAllAssociative(); + } + + #[EventHandler] + public function addTicket(TicketWasRegistered $event): void + { + $this->collector->callCount++; + + $this->connection->executeStatement( + 'INSERT INTO partitioned_batch_transaction_tickets VALUES (?,?)', + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[EventHandler] + public function closeTicket(TicketWasClosed $event): void + { + $this->collector->callCount++; + + $this->connection->executeStatement( + 'DELETE FROM partitioned_batch_transaction_tickets WHERE ticket_id = ?', + [$event->getTicketId()] + ); + + if ($this->collector->callCount === 2) { + throw new RuntimeException('Simulated failure on second event batch'); + } + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement('CREATE TABLE IF NOT EXISTS partitioned_batch_transaction_tickets (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))'); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement('DROP TABLE IF EXISTS partitioned_batch_transaction_tickets'); + } + + #[ProjectionReset] + public function reset(): void + { + $this->connection->executeStatement('DELETE FROM partitioned_batch_transaction_tickets'); + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], $projection::CHANNEL); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + $ecotone->sendCommand(new RegisterTicket('ticket-1', 'User1', 'alert')); + $ecotone->sendCommand(new CloseTicket('ticket-1')); + + try { + $ecotone->run($projection::CHANNEL); + } catch (RuntimeException) { + } + + self::assertEquals(2, $collector->callCount); + + $tickets = $ecotone->sendQueryWithRouting('getPartitionedBatchTransactionTickets'); + self::assertCount(1, $tickets, 'First batch (addTicket) should be committed even though second batch (closeTicket) failed'); + self::assertEquals('ticket-1', $tickets[0]['ticket_id']); + } + + public function test_async_projection_has_collector_disabled_by_default(): void + { + $connection = $this->getConnection(); + $collector = new stdClass(); + $collector->callCount = 0; + + $projection = new + #[ProjectionV2('collector_disabled_test')] + #[Asynchronous('async_projection')] + #[ProjectionExecution(eventLoadingBatchSize: 1)] + #[FromStream(Ticket::class)] + class ($connection, $collector) { + public const NAME = 'collector_disabled_test'; + public const CHANNEL = 'async_projection'; + + public function __construct(private Connection $connection, private stdClass $collector) + { + } + + #[EventHandler] + public function addTicket(TicketWasRegistered $event, EventBus $eventBus): void + { + $this->collector->callCount++; + + $this->connection->executeStatement( + 'INSERT INTO collector_disabled_tickets VALUES (?,?)', + [$event->getTicketId(), $event->getTicketType()] + ); + + $eventBus->publish(new stdClass()); + + throw new RuntimeException('Simulated failure after publishing event'); + } + + #[QueryHandler('getCollectorDisabledTickets')] + public function getTickets(): array + { + return $this->connection->executeQuery('SELECT * FROM collector_disabled_tickets ORDER BY ticket_id ASC')->fetchAllAssociative(); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement('CREATE TABLE IF NOT EXISTS collector_disabled_tickets (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))'); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement('DROP TABLE IF EXISTS collector_disabled_tickets'); + } + + #[ProjectionReset] + public function reset(): void + { + $this->connection->executeStatement('DELETE FROM collector_disabled_tickets'); + } + }; + + $notificationHandler = new class () { + #[Asynchronous('notifications')] + #[EventHandler(endpointId: 'notification.endpoint')] + public function handle(stdClass $event): void + { + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: array_merge([$projection::class, $notificationHandler::class], [Ticket::class, TicketEventConverter::class]), + containerOrAvailableServices: [$projection, $notificationHandler, new TicketEventConverter(), self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])) + ->withExtensionObjects([ + PollableChannelConfiguration::neverRetry('notifications')->withCollector(true), + ]), + runForProductionEventStore: true, + enableAsynchronousProcessing: [ + DbalBackedMessageChannelBuilder::create($projection::CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel('notifications'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + $ecotone->sendCommand(new RegisterTicket('ticket-1', 'User1', 'alert')); + + try { + $ecotone->run($projection::CHANNEL); + } catch (RuntimeException) { + } + + self::assertEquals(1, $collector->callCount); + + $notification = $ecotone->getMessageChannel('notifications')->receive(); + self::assertNotNull($notification, 'Event published during projection should bypass collector and be sent directly to channel'); + } + + private function bootstrapEcotone(array $classesToResolve, array $services, string $channel): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: array_merge($classesToResolve, [Ticket::class, TicketEventConverter::class]), + containerOrAvailableServices: array_merge($services, [new TicketEventConverter(), self::getConnectionFactory()]), + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + enableAsynchronousProcessing: [ + DbalBackedMessageChannelBuilder::create($channel), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } +}