Skip to content

Commit e8f6057

Browse files
weaverryansroze
andcommitted
Adding global retry support, events & more to messenger transport
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
1 parent fb109a7 commit e8f6057

File tree

4 files changed

+50
-0
lines changed

4 files changed

+50
-0
lines changed

DependencyInjection/Configuration.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,20 @@ function ($a) {
11111111
->prototype('variable')
11121112
->end()
11131113
->end()
1114+
->arrayNode('retry_strategy')
1115+
->addDefaultsIfNotSet()
1116+
->validate()
1117+
->ifTrue(function ($v) { return null !== $v['service'] && (isset($v['max_retries']) || isset($v['delay']) || isset($v['multiplier']) || isset($v['max_delay'])); })
1118+
->thenInvalid('"service" cannot be used along with the other retry_strategy options.')
1119+
->end()
1120+
->children()
1121+
->scalarNode('service')->defaultNull()->info('Service id to override the retry strategy entirely')->end()
1122+
->integerNode('max_retries')->defaultValue(3)->min(0)->end()
1123+
->integerNode('delay')->defaultValue(1000)->min(0)->info('Time in ms to delay (or the initial value when multiplier is used)')->end()
1124+
->floatNode('multiplier')->defaultValue(2)->min(1)->info('If greater than 1, delay will grow exponentially for each retry: this delay = (delay * (multiple ^ retries))')->end()
1125+
->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end()
1126+
->end()
1127+
->end()
11141128
->end()
11151129
->end()
11161130
->end()

DependencyInjection/FrameworkExtension.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1653,6 +1653,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16531653
}
16541654

16551655
$senderAliases = [];
1656+
$transportRetryReferences = [];
16561657
foreach ($config['transports'] as $name => $transport) {
16571658
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
16581659
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
@@ -1665,6 +1666,21 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16651666
;
16661667
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
16671668
$senderAliases[$name] = $transportId;
1669+
1670+
if (null !== $transport['retry_strategy']['service']) {
1671+
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
1672+
} else {
1673+
$retryServiceId = sprintf('messenger.retry.multiplier_retry_strategy.%s', $name);
1674+
$retryDefinition = new ChildDefinition('messenger.retry.abstract_multiplier_retry_strategy');
1675+
$retryDefinition
1676+
->replaceArgument(0, $transport['retry_strategy']['max_retries'])
1677+
->replaceArgument(1, $transport['retry_strategy']['delay'])
1678+
->replaceArgument(2, $transport['retry_strategy']['multiplier'])
1679+
->replaceArgument(3, $transport['retry_strategy']['max_delay']);
1680+
$container->setDefinition($retryServiceId, $retryDefinition);
1681+
1682+
$transportRetryReferences[$name] = new Reference($retryServiceId);
1683+
}
16681684
}
16691685

16701686
$messageToSendersMapping = [];
@@ -1686,6 +1702,9 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16861702
->replaceArgument(0, $messageToSendersMapping)
16871703
->replaceArgument(1, $messagesToSendAndHandle)
16881704
;
1705+
1706+
$container->getDefinition('messenger.retry_strategy_locator')
1707+
->replaceArgument(0, $transportRetryReferences);
16891708
}
16901709

16911710
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

Resources/config/console.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,12 @@
8282
<argument type="service" id="logger" on-invalid="null" />
8383
<argument type="collection" /> <!-- Receiver names -->
8484
<argument type="collection" /> <!-- Message bus names -->
85+
<argument type="service" id="messenger.retry_strategy_locator" />
86+
<argument type="service" id="event_dispatcher" />
8587

88+
<tag name="console.command" command="messenger:consume" />
8689
<tag name="console.command" command="messenger:consume-messages" />
90+
<tag name="monolog.logger" channel="messenger" />
8791
</service>
8892

8993
<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">

Resources/config/messenger.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,18 @@
6464
<tag name="messenger.transport_factory" />
6565
<argument type="service" id="messenger.transport.serializer" />
6666
</service>
67+
68+
<!-- retry -->
69+
<service id="messenger.retry_strategy_locator">
70+
<tag name="container.service_locator" />
71+
<argument type="collection" />
72+
</service>
73+
74+
<service id="messenger.retry.abstract_multiplier_retry_strategy" class="Symfony\Component\Messenger\Retry\MultiplierRetryStrategy" abstract="true">
75+
<argument /> <!-- max retries -->
76+
<argument /> <!-- delay ms -->
77+
<argument /> <!-- multiplier -->
78+
<argument /> <!-- max delay ms -->
79+
</service>
6780
</services>
6881
</container>

0 commit comments

Comments
 (0)