vendor/symfony/messenger/EventListener/SendFailedMessageToFailureTransportListener.php line 46

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <[email protected]>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\EventListener;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  14. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  15. use Symfony\Component\Messenger\Stamp\DelayStamp;
  16. use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
  17. use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
  18. use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
  19. /**
  20.  * Sends a rejected message to a "failure transport".
  21.  *
  22.  * @author Ryan Weaver <[email protected]>
  23.  */
  24. class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
  25. {
  26.     private $failureSenders;
  27.     private $logger;
  28.     /**
  29.      * @param ContainerInterface $failureSenders
  30.      */
  31.     public function __construct($failureSenders, ?LoggerInterface $logger null)
  32.     {
  33.         if (!$failureSenders instanceof ContainerInterface) {
  34.             trigger_deprecation('symfony/messenger''5.3''Passing a SenderInterface value as 1st argument to "%s()" is deprecated, pass a ServiceLocator instead.'__METHOD__);
  35.         }
  36.         $this->failureSenders $failureSenders;
  37.         $this->logger $logger;
  38.     }
  39.     public function onMessageFailed(WorkerMessageFailedEvent $event)
  40.     {
  41.         if ($event->willRetry()) {
  42.             return;
  43.         }
  44.         if (!$this->hasFailureTransports($event)) {
  45.             return;
  46.         }
  47.         $failureSender $this->getFailureSender($event->getReceiverName());
  48.         if (null === $failureSender) {
  49.             return;
  50.         }
  51.         $envelope $event->getEnvelope();
  52.         // avoid re-sending to the failed sender
  53.         if (null !== $envelope->last(SentToFailureTransportStamp::class)) {
  54.             return;
  55.         }
  56.         $envelope $envelope->with(
  57.             new SentToFailureTransportStamp($event->getReceiverName()),
  58.             new DelayStamp(0),
  59.             new RedeliveryStamp(0)
  60.         );
  61.         if (null !== $this->logger) {
  62.             $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
  63.                 'class' => \get_class($envelope->getMessage()),
  64.                 'transport' => \get_class($failureSender),
  65.             ]);
  66.         }
  67.         $failureSender->send($envelope);
  68.     }
  69.     public static function getSubscribedEvents()
  70.     {
  71.         return [
  72.             WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
  73.         ];
  74.     }
  75.     private function getFailureSender(string $receiverName): SenderInterface
  76.     {
  77.         if ($this->failureSenders instanceof SenderInterface) {
  78.             return $this->failureSenders;
  79.         }
  80.         return $this->failureSenders->get($receiverName);
  81.     }
  82.     private function hasFailureTransports(WorkerMessageFailedEvent $event): bool
  83.     {
  84.         return ($this->failureSenders instanceof ContainerInterface && $this->failureSenders->has($event->getReceiverName())) || $this->failureSenders instanceof SenderInterface;
  85.     }
  86. }