Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
2 changes: 1 addition & 1 deletion packages/Ecotone/src/Projecting/Attribute/ProjectionV2.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand All @@ -13,6 +13,7 @@
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Config\LicenceDecider;
use Ecotone\Messaging\Gateway\MessagingEntrypointService;
use Ecotone\Messaging\Handler\ChannelResolver;
use Ecotone\Messaging\Handler\Router\RouterProcessor;
Expand Down Expand Up @@ -51,6 +52,8 @@ public function __construct(
private ?string $resetChannel = null,
private ?int $rebuildPartitionBatchSize = null,
private ?string $rebuildAsyncChannelName = null,
private bool $hasRebuild = false,
private bool $hasDeployment = false,
) {
if ($this->partitionHeader && ! $this->automaticInitialization) {
throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled");
Expand Down Expand Up @@ -150,6 +153,14 @@ public function rebuildAsyncChannelName(): ?string
return $this->rebuildAsyncChannelName;
}

public function isOpenSourceEligible(): bool
{
return ! $this->isPartitioned()
&& $this->backfillAsyncChannelName === null
&& ! $this->hasRebuild
&& ! $this->hasDeployment;
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
$routerProcessor = $this->buildExecutionRouter($builder);
Expand All @@ -158,6 +169,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
new Reference(MessageHeadersPropagatorInterceptor::class),
$this->projectionName,
$routerProcessor,
Reference::to(LicenceDecider::class),
$this->initChannel,
$this->deleteChannel,
$this->flushChannel,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand All @@ -19,6 +19,7 @@
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Projecting\Attribute\PartitionProvider as PartitionProviderAttribute;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Ecotone\Projecting\PartitionProviderReference;
Expand Down Expand Up @@ -56,6 +57,10 @@ public static function create(AnnotationFinder $annotationFinder, InterfaceToCal

public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
if (! empty($this->userlandPartitionProviderReferences) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) {
throw LicensingException::create('Custom #[PartitionProvider] implementations require Ecotone Enterprise licence.');
}

$partitionProviderReferences = ExtensionObjectResolver::resolve(
PartitionProviderReference::class,
$extensionObjects
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down Expand Up @@ -31,6 +31,7 @@
use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder;
use Ecotone\Messaging\Handler\ServiceActivator\ServiceActivatorBuilder;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\NamedEvent;
use Ecotone\Projecting\Attribute\Partitioned;
Expand Down Expand Up @@ -105,6 +106,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
partitioned: $partitionAttribute !== null,
rebuildPartitionBatchSize: $rebuildAttribute?->partitionBatchSize,
rebuildAsyncChannelName: $rebuildAttribute?->asyncChannelName,
hasRebuild: $rebuildAttribute !== null,
hasDeployment: $projectionDeployment !== null,
);

$asyncAttribute = self::getProjectionAsynchronousAttribute($annotationRegistrationService, $projectionClassName);
Expand Down Expand Up @@ -180,6 +183,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I

public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
if (! $messagingConfiguration->isRunningForEnterpriseLicence()) {
if (! empty($this->pollingProjections)) {
throw LicensingException::create('#[Polling] projections require Ecotone Enterprise licence.');
}
if (! empty($this->eventStreamingProjections)) {
throw LicensingException::create('#[Streaming] projections require Ecotone Enterprise licence.');
}
}

foreach ($this->lifecycleHandlers as $lifecycleHandler) {
$messagingConfiguration->registerMessageHandler($lifecycleHandler);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
16 changes: 12 additions & 4 deletions packages/Ecotone/src/Projecting/Config/ProjectingModule.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand All @@ -15,14 +15,14 @@
use Ecotone\Messaging\Attribute\WithoutDatabaseTransaction;
use Ecotone\Messaging\Attribute\WithoutMessageCollector;
use Ecotone\Messaging\Config\Configuration;
use Ecotone\Messaging\Config\ConfigurationException;
use Ecotone\Messaging\Config\Container\AttributeDefinition;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\InterfaceToCallReference;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener;
use Ecotone\Messaging\Gateway\MessagingEntrypointService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
Expand Down Expand Up @@ -67,7 +67,11 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
$projectionBuilders = ExtensionObjectResolver::resolve(ProjectionExecutorBuilder::class, $extensionObjects);

if (! empty($projectionBuilders) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) {
throw ConfigurationException::create('Projections are part of Ecotone Enterprise. To use projections, please acquire an enterprise licence.');
foreach ($projectionBuilders as $builder) {
if (! $builder instanceof EcotoneProjectionExecutorBuilder || ! $builder->isOpenSourceEligible()) {
throw LicensingException::create('Projections with enterprise features (Partitioned, Streaming, Polling, ProjectionRebuild, ProjectionDeployment, async backfill) require Ecotone Enterprise licence.');
}
}
}

$messagingConfiguration->registerServiceDefinition(
Expand Down Expand Up @@ -122,10 +126,14 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO

$asyncAttribute = $projectionBuilder instanceof EcotoneProjectionExecutorBuilder ? $projectionBuilder->getAsyncAttribute() : null;
if ($asyncAttribute !== null) {
$endpointAnnotations = $asyncAttribute->getEndpointAnnotations();
if ($messagingConfiguration->isRunningForEnterpriseLicence()) {
$endpointAnnotations = array_merge($endpointAnnotations, [new WithoutDatabaseTransaction(), new WithoutMessageCollector()]);
}
$handlerBuilder = $handlerBuilder->withEndpointAnnotations([
AttributeDefinition::fromObject(new Asynchronous(
$asyncAttribute->getChannelName(),
array_merge($asyncAttribute->getEndpointAnnotations(), [new WithoutDatabaseTransaction(), new WithoutMessageCollector()]),
$endpointAnnotations,
)),
]);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand All @@ -21,6 +21,7 @@
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Ecotone\Projecting\Attribute\StateStorage as StateStorageAttribute;
use Ecotone\Projecting\InMemory\InMemoryProjectionStateStorage;
Expand Down Expand Up @@ -58,6 +59,10 @@ public static function create(AnnotationFinder $annotationFinder, InterfaceToCal

public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
if (! empty($this->userlandStateStorageReferences) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) {
throw LicensingException::create('Custom #[StateStorage] implementations require Ecotone Enterprise licence.');
}

$stateStorageReferences = ExtensionObjectResolver::resolve(
ProjectionStateStorageReference::class,
$extensionObjects
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

Expand All @@ -20,6 +20,7 @@
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Projecting\Attribute\StreamSource as StreamSourceAttribute;
use Ecotone\Projecting\StreamSource;
use Ecotone\Projecting\StreamSourceReference;
Expand Down Expand Up @@ -48,6 +49,10 @@ public static function create(AnnotationFinder $annotationFinder, InterfaceToCal

public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
if (! empty($this->userlandStreamSourceReferences) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) {
throw LicensingException::create('Custom #[StreamSource] implementations require Ecotone Enterprise licence.');
}

$streamSourceReferences = ExtensionObjectResolver::resolve(
StreamSourceReference::class,
$extensionObjects
Expand Down
35 changes: 20 additions & 15 deletions packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
<?php

/*
* licence Enterprise
* licence Apache-2.0
*/
declare(strict_types=1);

namespace Ecotone\Projecting;

use Ecotone\Messaging\Channel\QueueChannel;
use Ecotone\Messaging\Config\LicenceDecider;
use Ecotone\Messaging\Gateway\MessagingEntrypointService;
use Ecotone\Messaging\Handler\MessageProcessor;
use Ecotone\Messaging\MessageHeaders;
Expand All @@ -22,8 +23,9 @@ class EcotoneProjectorExecutor implements ProjectorExecutor
public function __construct(
private MessagingEntrypointService $messagingEntrypoint,
private MessageHeadersPropagatorInterceptor $messageHeadersPropagatorInterceptor,
private string $projectionName, // this is required for event stream emitter so it can create a stream with this name
private string $projectionName,
private MessageProcessor $routerProcessor,
private LicenceDecider $licenceDecider,
private ?string $initChannel = null,
private ?string $deleteChannel = null,
private ?string $flushChannel = null,
Expand All @@ -37,7 +39,9 @@ public function project(Event $event, mixed $userState = null): mixed
$metadata = $event->getMetadata();
$metadata[ProjectingHeaders::PROJECTION_STATE] = $userState ?? null;
$metadata[ProjectingHeaders::PROJECTION_EVENT_NAME] = $event->getEventName();
$metadata[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName;
if ($this->licenceDecider->hasEnterpriseLicence()) {
$metadata[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName;
}
$metadata[ProjectingHeaders::PROJECTION_LIVE] = $this->isLive;
$metadata[MessageHeaders::STREAM_BASED_SOURCED] = true; // this one is required for correct header propagation in EventStreamEmitter...
$metadata[MessageHeaders::REPLY_CHANNEL] = $responseQueue = new QueueChannel('response_channel');
Expand Down Expand Up @@ -65,37 +69,30 @@ function () use ($requestMessage) {
public function init(): void
{
if ($this->initChannel) {
$this->messagingEntrypoint->sendWithHeaders([], [
ProjectingHeaders::PROJECTION_NAME => $this->projectionName,
], $this->initChannel);
$this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([]), $this->initChannel);
}
}

public function delete(): void
{
if ($this->deleteChannel) {
$this->messagingEntrypoint->sendWithHeaders([], [
ProjectingHeaders::PROJECTION_NAME => $this->projectionName,
], $this->deleteChannel);
$this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([]), $this->deleteChannel);
}
}

public function flush(mixed $userState = null): void
{
if ($this->flushChannel) {
$this->messagingEntrypoint->sendWithHeaders([], [
ProjectingHeaders::PROJECTION_NAME => $this->projectionName,
$this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([
ProjectingHeaders::PROJECTION_STATE => $userState,
], $this->flushChannel);
]), $this->flushChannel);
}
}

public function reset(?string $partitionKey = null): void
{
if ($this->resetChannel) {
$headers = [
ProjectingHeaders::PROJECTION_NAME => $this->projectionName,
];
$headers = $this->withProjectionName([]);

if ($partitionKey !== null) {
$headers[ProjectingHeaders::REBUILD_PARTITION_KEY] = $partitionKey;
Expand All @@ -110,4 +107,12 @@ public function reset(?string $partitionKey = null): void
$this->messagingEntrypoint->sendWithHeaders([], $headers, $this->resetChannel);
}
}

private function withProjectionName(array $headers): array
{
if ($this->licenceDecider->hasEnterpriseLicence()) {
$headers[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName;
}
return $headers;
}
}
Loading
Loading