diff --git a/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php index af60033ee..3f991bffe 100644 --- a/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php +++ b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php @@ -1,7 +1,7 @@ partitionHeader && ! $this->automaticInitialization) { throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled"); @@ -150,6 +153,14 @@ public function rebuildAsyncChannelName(): ?string return $this->rebuildAsyncChannelName; } + public function isOpenSourceEligible(): bool + { + return ! $this->isPartitioned() + && $this->backfillAsyncChannelName === null + && ! $this->hasRebuild + && ! $this->hasDeployment; + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { $routerProcessor = $this->buildExecutionRouter($builder); @@ -158,6 +169,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc new Reference(MessageHeadersPropagatorInterceptor::class), $this->projectionName, $routerProcessor, + Reference::to(LicenceDecider::class), $this->initChannel, $this->deleteChannel, $this->flushChannel, diff --git a/packages/Ecotone/src/Projecting/Config/PartitionProviderRegistryModule.php b/packages/Ecotone/src/Projecting/Config/PartitionProviderRegistryModule.php index 4e9a4477f..7694dce90 100644 --- a/packages/Ecotone/src/Projecting/Config/PartitionProviderRegistryModule.php +++ b/packages/Ecotone/src/Projecting/Config/PartitionProviderRegistryModule.php @@ -1,7 +1,7 @@ userlandPartitionProviderReferences) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { + throw LicensingException::create('Custom #[PartitionProvider] implementations require Ecotone Enterprise licence.'); + } + $partitionProviderReferences = ExtensionObjectResolver::resolve( PartitionProviderReference::class, $extensionObjects diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index a03b02d84..8da160d12 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -1,7 +1,7 @@ partitionBatchSize, rebuildAsyncChannelName: $rebuildAttribute?->asyncChannelName, + hasRebuild: $rebuildAttribute !== null, + hasDeployment: $projectionDeployment !== null, ); $asyncAttribute = self::getProjectionAsynchronousAttribute($annotationRegistrationService, $projectionClassName); @@ -180,6 +183,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void { + if (! $messagingConfiguration->isRunningForEnterpriseLicence()) { + if (! empty($this->pollingProjections)) { + throw LicensingException::create('#[Polling] projections require Ecotone Enterprise licence.'); + } + if (! empty($this->eventStreamingProjections)) { + throw LicensingException::create('#[Streaming] projections require Ecotone Enterprise licence.'); + } + } + foreach ($this->lifecycleHandlers as $lifecycleHandler) { $messagingConfiguration->registerMessageHandler($lifecycleHandler); } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php index 1dd04dc00..333e3ddf7 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php @@ -1,7 +1,7 @@ isRunningForEnterpriseLicence()) { - throw ConfigurationException::create('Projections are part of Ecotone Enterprise. To use projections, please acquire an enterprise licence.'); + foreach ($projectionBuilders as $builder) { + if (! $builder instanceof EcotoneProjectionExecutorBuilder || ! $builder->isOpenSourceEligible()) { + throw LicensingException::create('Projections with enterprise features (Partitioned, Streaming, Polling, ProjectionRebuild, ProjectionDeployment, async backfill) require Ecotone Enterprise licence.'); + } + } } $messagingConfiguration->registerServiceDefinition( @@ -122,10 +126,14 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $asyncAttribute = $projectionBuilder instanceof EcotoneProjectionExecutorBuilder ? $projectionBuilder->getAsyncAttribute() : null; if ($asyncAttribute !== null) { + $endpointAnnotations = $asyncAttribute->getEndpointAnnotations(); + if ($messagingConfiguration->isRunningForEnterpriseLicence()) { + $endpointAnnotations = array_merge($endpointAnnotations, [new WithoutDatabaseTransaction(), new WithoutMessageCollector()]); + } $handlerBuilder = $handlerBuilder->withEndpointAnnotations([ AttributeDefinition::fromObject(new Asynchronous( $asyncAttribute->getChannelName(), - array_merge($asyncAttribute->getEndpointAnnotations(), [new WithoutDatabaseTransaction(), new WithoutMessageCollector()]), + $endpointAnnotations, )), ]); } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModuleRoutingExtension.php b/packages/Ecotone/src/Projecting/Config/ProjectingModuleRoutingExtension.php index dcae07a4f..a5ea9e313 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModuleRoutingExtension.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModuleRoutingExtension.php @@ -1,7 +1,7 @@ userlandStateStorageReferences) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { + throw LicensingException::create('Custom #[StateStorage] implementations require Ecotone Enterprise licence.'); + } + $stateStorageReferences = ExtensionObjectResolver::resolve( ProjectionStateStorageReference::class, $extensionObjects diff --git a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php index 4aeeb41fd..e6aec936b 100644 --- a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php +++ b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php @@ -1,7 +1,7 @@ userlandStreamSourceReferences) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { + throw LicensingException::create('Custom #[StreamSource] implementations require Ecotone Enterprise licence.'); + } + $streamSourceReferences = ExtensionObjectResolver::resolve( StreamSourceReference::class, $extensionObjects diff --git a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php index 215b17035..685dc061f 100644 --- a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php @@ -1,13 +1,14 @@ getMetadata(); $metadata[ProjectingHeaders::PROJECTION_STATE] = $userState ?? null; $metadata[ProjectingHeaders::PROJECTION_EVENT_NAME] = $event->getEventName(); - $metadata[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName; + if ($this->licenceDecider->hasEnterpriseLicence()) { + $metadata[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName; + } $metadata[ProjectingHeaders::PROJECTION_LIVE] = $this->isLive; $metadata[MessageHeaders::STREAM_BASED_SOURCED] = true; // this one is required for correct header propagation in EventStreamEmitter... $metadata[MessageHeaders::REPLY_CHANNEL] = $responseQueue = new QueueChannel('response_channel'); @@ -65,37 +69,30 @@ function () use ($requestMessage) { public function init(): void { if ($this->initChannel) { - $this->messagingEntrypoint->sendWithHeaders([], [ - ProjectingHeaders::PROJECTION_NAME => $this->projectionName, - ], $this->initChannel); + $this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([]), $this->initChannel); } } public function delete(): void { if ($this->deleteChannel) { - $this->messagingEntrypoint->sendWithHeaders([], [ - ProjectingHeaders::PROJECTION_NAME => $this->projectionName, - ], $this->deleteChannel); + $this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([]), $this->deleteChannel); } } public function flush(mixed $userState = null): void { if ($this->flushChannel) { - $this->messagingEntrypoint->sendWithHeaders([], [ - ProjectingHeaders::PROJECTION_NAME => $this->projectionName, + $this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([ ProjectingHeaders::PROJECTION_STATE => $userState, - ], $this->flushChannel); + ]), $this->flushChannel); } } public function reset(?string $partitionKey = null): void { if ($this->resetChannel) { - $headers = [ - ProjectingHeaders::PROJECTION_NAME => $this->projectionName, - ]; + $headers = $this->withProjectionName([]); if ($partitionKey !== null) { $headers[ProjectingHeaders::REBUILD_PARTITION_KEY] = $partitionKey; @@ -110,4 +107,12 @@ public function reset(?string $partitionKey = null): void $this->messagingEntrypoint->sendWithHeaders([], $headers, $this->resetChannel); } } + + private function withProjectionName(array $headers): array + { + if ($this->licenceDecider->hasEnterpriseLicence()) { + $headers[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName; + } + return $headers; + } } diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionRegistry.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionRegistry.php index 9b3ab3f16..375a7d7de 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionRegistry.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionRegistry.php @@ -1,7 +1,7 @@ expectException(ConfigurationException::class); - $this->expectExceptionMessage('Projections are part of Ecotone Enterprise. To use projections, please acquire an enterprise licence.'); + $this->expectException(LicensingException::class); $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { #[EventHandler('*')] diff --git a/packages/Ecotone/tests/Projecting/ProjectionV2EnterpriseTest.php b/packages/Ecotone/tests/Projecting/ProjectionV2EnterpriseTest.php new file mode 100644 index 000000000..02620b5aa --- /dev/null +++ b/packages/Ecotone/tests/Projecting/ProjectionV2EnterpriseTest.php @@ -0,0 +1,451 @@ +handledEvents[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + + $ecotone->withEvents([Event::createWithType('test-event', ['name' => 'Test'])]); + $ecotone->publishEventWithRoutingKey('trigger', []); + + $this->assertCount(1, $projection->handledEvents); + } + + public function test_global_async_projection_works_without_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Asynchronous('async')] class { + public array $handledEvents = []; + + #[EventHandler('*')] + public function handle(array $event): void + { + $this->handledEvents[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->addExtensionObject(SimpleMessageChannelBuilder::createQueueChannel('async')) + ); + + $this->assertNotNull($ecotone); + } + + public function test_sync_backfill_works_without_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), ProjectionBackfill] class { + public array $handledEvents = []; + + #[EventHandler('*')] + public function handle(array $event): void + { + $this->handledEvents[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + + $this->assertNotNull($ecotone); + } + + public function test_lifecycle_hooks_work_without_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { + public bool $initialized = false; + public bool $deleted = false; + public bool $reset = false; + public bool $flushed = false; + public array $handledEvents = []; + + #[EventHandler('*')] + public function handle(array $event): void + { + $this->handledEvents[] = $event; + } + + #[ProjectionInitialization] + public function init(): void + { + $this->initialized = true; + } + + #[ProjectionDelete] + public function remove(): void + { + $this->deleted = true; + } + + #[ProjectionReset] + public function resetState(): void + { + $this->reset = true; + } + + #[ProjectionFlush] + public function flush(): void + { + $this->flushed = true; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + + $this->assertNotNull($ecotone); + } + + public function test_projection_execution_batch_size_works_without_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), ProjectionExecution(eventLoadingBatchSize: 50)] class { + public array $handledEvents = []; + + #[EventHandler('*')] + public function handle(array $event): void + { + $this->handledEvents[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + + $this->assertNotNull($ecotone); + } + + public function test_partitioned_projection_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Partitioned] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_streaming_projection_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), Streaming('streaming_channel')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ); + } + + public function test_polling_projection_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Polling('test_endpoint')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_async_backfill_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), ProjectionBackfill(asyncChannelName: 'backfill_channel')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_rebuild_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), ProjectionRebuild] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_deployment_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: true)] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_custom_stream_source_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $customStreamSource = new #[Attribute\StreamSource] class implements StreamSource { + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + return new StreamPage([], '0'); + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class, $customStreamSource::class], + [$projection, $customStreamSource], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_custom_state_storage_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $customStateStorage = new #[Attribute\StateStorage] class implements ProjectionStateStorage { + public function canHandle(string $projectionName): bool + { + return true; + } + + public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState + { + return null; + } + + public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState + { + return null; + } + + public function savePartition(ProjectionPartitionState $projectionState): void + { + } + + public function delete(string $projectionName): void + { + } + + public function init(string $projectionName): void + { + } + + public function beginTransaction(): Transaction + { + return new NoOpTransaction(); + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class, $customStateStorage::class], + [$projection, $customStateStorage], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_custom_partition_provider_requires_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $customPartitionProvider = new #[Attribute\PartitionProvider] class implements PartitionProvider { + public function canHandle(string $projectionName): bool + { + return true; + } + + public function count(StreamFilter $filter): int + { + return 0; + } + + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable + { + return []; + } + }; + + $this->expectException(LicensingException::class); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class, $customPartitionProvider::class], + [$projection, $customPartitionProvider], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + } + + public function test_projection_name_header_is_not_available_without_licence(): void + { + $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { + public array $handledEvents = []; + + #[EventHandler('*')] + public function handle(array $event, #[ProjectionName] string $projectionName): void + { + $this->handledEvents[] = ['event' => $event, 'projectionName' => $projectionName]; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ); + + $ecotone->withEvents([Event::createWithType('test-event', ['name' => 'Test'], [MessageHeaders::EVENT_AGGREGATE_ID => '1'])]); + + try { + $ecotone->publishEventWithRoutingKey('trigger', []); + self::fail('Should have thrown exception'); + } catch (MethodInvocationException $e) { + self::assertStringContainsString('projection.name', $e->getMessage(), 'Exception should mention missing projection.name header. Got: ' . $e->getMessage()); + } + } +} diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index dc7c728e9..57740b01d 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -1,7 +1,7 @@