77
88namespace Magento \AsynchronousOperations \Model ;
99
10+ use Magento \Framework \App \ObjectManager ;
1011use Magento \Framework \MessageQueue \CallbackInvokerInterface ;
1112use Magento \Framework \MessageQueue \ConsumerConfigurationInterface ;
1213use Magento \Framework \MessageQueue \ConsumerInterface ;
1314use Magento \Framework \MessageQueue \EnvelopeInterface ;
1415use Magento \Framework \MessageQueue \QueueInterface ;
1516use Magento \Framework \Registry ;
17+ use Magento \Framework \MessageQueue \Consumer \ConfigInterface as ConsumerConfig ;
1618
1719/**
1820 * Class Consumer used to process OperationInterface messages.
@@ -41,24 +43,32 @@ class MassConsumer implements ConsumerInterface
4143 */
4244 private $ registry ;
4345
46+ /**
47+ * @var ConsumerConfig
48+ */
49+ private $ consumerConfig ;
50+
4451 /**
4552 * Initialize dependencies.
4653 *
4754 * @param CallbackInvokerInterface $invoker
4855 * @param ConsumerConfigurationInterface $configuration
4956 * @param MassConsumerEnvelopeCallbackFactory $massConsumerEnvelopeCallback
5057 * @param Registry $registry
58+ * @param ConsumerConfig|null $consumerConfig
5159 */
5260 public function __construct (
5361 CallbackInvokerInterface $ invoker ,
5462 ConsumerConfigurationInterface $ configuration ,
5563 MassConsumerEnvelopeCallbackFactory $ massConsumerEnvelopeCallback ,
56- Registry $ registry
64+ Registry $ registry ,
65+ ?ConsumerConfig $ consumerConfig = null
5766 ) {
5867 $ this ->invoker = $ invoker ;
5968 $ this ->configuration = $ configuration ;
6069 $ this ->massConsumerEnvelopeCallback = $ massConsumerEnvelopeCallback ;
6170 $ this ->registry = $ registry ;
71+ $ this ->consumerConfig = $ consumerConfig ?: ObjectManager::getInstance ()->get (ConsumerConfig::class);
6272 }
6373
6474 /**
@@ -75,12 +85,16 @@ public function process($maxNumberOfMessages = null)
7585 if (!isset ($ maxNumberOfMessages )) {
7686 $ queue ->subscribe ($ this ->getTransactionCallback ($ queue ));
7787 } else {
88+ $ connectionName = $ this ->consumerConfig
89+ ->getConsumer ($ this ->configuration ->getConsumerName ())
90+ ->getConnection ();
7891 $ this ->invoker ->invoke (
7992 $ queue ,
8093 $ maxNumberOfMessages ,
8194 $ this ->getTransactionCallback ($ queue ),
8295 $ maxIdleTime ,
83- $ sleep
96+ $ sleep ,
97+ $ connectionName
8498 );
8599 }
86100
0 commit comments