Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions packages/Amqp/tests/Integration/DynamicStreamChannelRetryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp\Integration;

use Ecotone\Amqp\AmqpQueue;
use Ecotone\Amqp\AmqpStreamChannelBuilder;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Channel\DynamicChannel\DynamicMessageChannelBuilder;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Test\LicenceTesting;
use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnection;
use Exception;
use Symfony\Component\Uid\Uuid;
use Test\Ecotone\Amqp\AmqpMessagingTestCase;

/**
* @internal
*/
final class DynamicStreamChannelRetryTest extends AmqpMessagingTestCase
{
public function setUp(): void
{
if (getenv('AMQP_IMPLEMENTATION') !== 'lib') {
$this->markTestSkipped('Stream tests require AMQP lib');
}
}

public function test_resend_works_for_dynamic_channel_wrapping_amqp_stream_channels(): void
{
$queueTenantA = 'stream_queue_tenant_a_' . Uuid::v7()->toRfc4122();
$queueTenantB = 'stream_queue_tenant_b_' . Uuid::v7()->toRfc4122();
$handler = new DynamicStreamRetryHandler();

$ecotoneLite = $this->bootstrapForTesting(
[DynamicStreamRetryHandler::class],
[
$handler,
...$this->getConnectionFactoryReferences(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withLicenceKey(LicenceTesting::VALID_LICENCE)
->withExtensionObjects([
AmqpQueue::createStreamQueue($queueTenantA),
AmqpStreamChannelBuilder::create(
channelName: 'async_tenant_a',
startPosition: 'first',
amqpConnectionReferenceName: AmqpLibConnection::class,
queueName: $queueTenantA,
)->withFinalFailureStrategy(FinalFailureStrategy::RESEND),
AmqpQueue::createStreamQueue($queueTenantB),
AmqpStreamChannelBuilder::create(
channelName: 'async_tenant_b',
startPosition: 'first',
amqpConnectionReferenceName: AmqpLibConnection::class,
queueName: $queueTenantB,
)->withFinalFailureStrategy(FinalFailureStrategy::RESEND),
DynamicMessageChannelBuilder::createRoundRobin(
thisMessageChannelName: 'async',
channelNames: ['async_tenant_a', 'async_tenant_b'],
),
]),
);

$ecotoneLite->getCommandBus()->sendWithRouting('execute.dynamic_stream', 'test_message');

$ecotoneLite->run('async', ExecutionPollingMetadata::createWithFinishWhenNoMessages(failAtError: false)->withExecutionTimeLimitInMilliseconds(5000));

self::assertTrue($handler->failedOnce, 'Handler should have failed on first attempt');

$ecotoneLite->run('async', ExecutionPollingMetadata::createWithFinishWhenNoMessages(failAtError: true)->withExecutionTimeLimitInMilliseconds(5000));

self::assertTrue($handler->succeeded, 'Handler should have succeeded on retry after resend');
}
}

class DynamicStreamRetryHandler
{
public bool $failedOnce = false;
public bool $succeeded = false;

#[Asynchronous('async')]
#[CommandHandler('execute.dynamic_stream', 'dynamic_stream_endpoint')]
public function handle(string $command): void
{
if (! $this->failedOnce) {
$this->failedOnce = true;
throw new Exception('Simulated failure to trigger resend');
}
$this->succeeded = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message
Assert::notNullAndEmpty($channelName, "Channel name to poll message from cannot be null. If you want to skip message receiving, return 'nullChannel' instead.");

$channel = $this->resolveMessageChannel($channelName);
$message = $channel->receiveWithTimeout($pollingMetadata);
$message = $channel->receiveWithTimeout(
$pollingMetadata->setPolledChannelName($channelName)
);
$this->loggingGateway->info("Decided to received message from `{$channelName}` for `{$this->channelName}`", $message, ['channel_name' => $this->channelName, 'chosen_channel_name' => $channelName]);

return $message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public function __construct(private string $pollableChannelName, private Pollabl

public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message
{
$pollingMetadata = $pollingMetadata->setPolledChannelName($this->pollableChannelName);
$timeoutInMilliseconds = $pollingMetadata->getFixedRateInMilliseconds();
$message = $timeoutInMilliseconds
? $this->pollableChannel->receiveWithTimeout($pollingMetadata)
Expand Down
14 changes: 14 additions & 0 deletions packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ final class PollingMetadata implements DefinedObject
public const DEFAULT_FINISH_WHEN_NO_MESSAGES = false;

private bool $withSignalInterceptors;
private ?string $polledChannelName = null;


/**
Expand Down Expand Up @@ -442,6 +443,19 @@ public function getCronExpression(): ?string
return $this->cronExpression;
}

public function getPolledChannelName(): ?string
{
return $this->polledChannelName;
}

public function setPolledChannelName(string $polledChannelName): self
{
$copy = $this->createCopy();
$copy->polledChannelName = $polledChannelName;

return $copy;
}

public function hasFixedRateExpression(): bool
{
return $this->fixedRateExpression !== null;
Expand Down
3 changes: 2 additions & 1 deletion packages/Kafka/src/Inbound/InboundMessageConverter.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function toMessage(
ConversionService $conversionService,
BatchCommitCoordinator $batchCommitCoordinator,
): MessageBuilder {
$kafkaConsumerConfiguration = $this->kafkaAdmin->getRdKafkaConfiguration($endpointId, $channelName);
$kafkaConsumerConfiguration = $this->kafkaAdmin->getRdKafkaConfiguration($channelName);
$headerMapper = $kafkaConsumerConfiguration->getHeaderMapper();
$acknowledgeMode = $kafkaConsumerConfiguration->getAcknowledgeMode();

Expand All @@ -49,6 +49,7 @@ public function toMessage(
$this->loggingGateway,
$this->kafkaAdmin,
$endpointId,
$channelName,
$this->finalFailureStrategy,
$acknowledgeMode === KafkaAcknowledgementCallback::AUTO_ACK,
$batchCommitCoordinator,
Expand Down
7 changes: 5 additions & 2 deletions packages/Kafka/src/Inbound/KafkaAcknowledgementCallback.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ private function __construct(
private LoggingGateway $loggingGateway,
private KafkaAdmin $kafkaAdmin,
private string $endpointId,
private string $channelName,
private BatchCommitCoordinator $batchCommitCoordinator,
) {
}
Expand All @@ -39,6 +40,7 @@ public static function create(
LoggingGateway $loggingGateway,
KafkaAdmin $kafkaAdmin,
string $endpointId,
string $channelName,
FinalFailureStrategy $finalFailureStrategy,
bool $isAutoAcked,
BatchCommitCoordinator $batchCommitCoordinator,
Expand All @@ -51,6 +53,7 @@ public static function create(
$loggingGateway,
$kafkaAdmin,
$endpointId,
$channelName,
$batchCommitCoordinator,
);
}
Expand Down Expand Up @@ -105,8 +108,8 @@ public function reject(): void
public function resend(): void
{
try {
$this->kafkaAdmin->getProducer($this->endpointId);
$topic = $this->kafkaAdmin->getTopicForProducer($this->endpointId);
$this->kafkaAdmin->getProducer($this->channelName);
$topic = $this->kafkaAdmin->getTopicForProducer($this->channelName);
$topic->producev(
$this->message->partition,
0,
Expand Down
5 changes: 3 additions & 2 deletions packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public function __construct(
public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message
{
$endpointId = $pollingMetadata->getEndpointId();
$consumer = $this->kafkaAdmin->getConsumer($endpointId, $this->channelName);
$channelName = $pollingMetadata->getPolledChannelName() ?? $this->channelName;
$consumer = $this->kafkaAdmin->getConsumer($endpointId, $channelName);

if ($this->batchCommitCoordinator === null || $this->batchCommitCoordinator->consumer !== $consumer) {
$this->batchCommitCoordinator = new BatchCommitCoordinator(
Expand All @@ -60,7 +61,7 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message
if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
return $this->inboundMessageConverter->toMessage(
$endpointId,
$this->channelName,
$channelName,
$consumer,
$message,
$this->conversionService,
Expand Down
97 changes: 97 additions & 0 deletions packages/Kafka/tests/Integration/DynamicChannelRetryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Kafka\Integration;

use Ecotone\Kafka\Channel\KafkaMessageChannelBuilder;
use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Channel\DynamicChannel\DynamicMessageChannelBuilder;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Test\LicenceTesting;
use Exception;
use PHPUnit\Framework\Attributes\RunTestsInSeparateProcesses;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Uid\Uuid;
use Test\Ecotone\Kafka\ConnectionTestCase;

/**
* @internal
*/
#[RunTestsInSeparateProcesses]
final class DynamicChannelRetryTest extends TestCase
{
public function test_resend_works_for_dynamic_channel_wrapping_kafka_channels(): void
{
$topicTenantA = 'topic_async_tenant_a_' . Uuid::v7()->toRfc4122();
$topicTenantB = 'topic_async_tenant_b_' . Uuid::v7()->toRfc4122();
$handler = new DynamicChannelRetryHandler();

$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
[DynamicChannelRetryHandler::class],
[$handler, KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection()],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::KAFKA_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withExtensionObjects([
KafkaMessageChannelBuilder::create(
channelName: 'async_tenant_a',
topicName: $topicTenantA,
)
->withFinalFailureStrategy(FinalFailureStrategy::RESEND)
->withReceiveTimeout(3000),
KafkaMessageChannelBuilder::create(
channelName: 'async_tenant_b',
topicName: $topicTenantB,
)
->withFinalFailureStrategy(FinalFailureStrategy::RESEND)
->withReceiveTimeout(3000),
DynamicMessageChannelBuilder::createRoundRobin(
thisMessageChannelName: 'async',
channelNames: ['async_tenant_a', 'async_tenant_b'],
),
]),
licenceKey: LicenceTesting::VALID_LICENCE,
);

$ecotoneTestSupport->sendCommandWithRoutingKey('execute.dynamic', 'test_message');

$ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup(
amountOfMessagesToHandle: 1,
maxExecutionTimeInMilliseconds: 10000,
failAtError: false,
));

self::assertTrue($handler->failedOnce, 'Handler should have failed on first attempt');

$ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup(
amountOfMessagesToHandle: 1,
maxExecutionTimeInMilliseconds: 10000,
failAtError: true,
));

self::assertTrue($handler->succeeded, 'Handler should have succeeded on retry after resend');
}
}

class DynamicChannelRetryHandler
{
public bool $failedOnce = false;
public bool $succeeded = false;

#[Asynchronous('async')]
#[CommandHandler('execute.dynamic', 'dynamic_endpoint')]
public function handle(string $command): void
{
if (! $this->failedOnce) {
$this->failedOnce = true;
throw new Exception('Simulated failure to trigger resend');
}
$this->succeeded = true;
}
}
Loading