vendor/shopware/core/Framework/MessageQueue/Subscriber/MessageFailedHandler.php line 40

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Subscriber;
  3. use Shopware\Core\Framework\Increment\Exception\IncrementGatewayNotFoundException;
  4. use Shopware\Core\Framework\Increment\IncrementGatewayRegistry;
  5. use Shopware\Core\Framework\Log\Package;
  6. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  7. use Symfony\Component\Messenger\Envelope;
  8. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  9. use Symfony\Component\Messenger\Stamp\ReceivedStamp;
  10. /**
  11.  * @deprecated tag:v6.5.0 - reason:becomes-internal - EventSubscribers will become internal in v6.5.0
  12.  */
  13. #[Package('system-settings')]
  14. class MessageFailedHandler implements EventSubscriberInterface
  15. {
  16.     private string $defaultTransportName;
  17.     private IncrementGatewayRegistry $gatewayRegistry;
  18.     /**
  19.      * @internal
  20.      */
  21.     public function __construct(IncrementGatewayRegistry $gatewayRegistrystring $defaultTransportName)
  22.     {
  23.         $this->defaultTransportName $defaultTransportName;
  24.         $this->gatewayRegistry $gatewayRegistry;
  25.     }
  26.     public static function getSubscribedEvents(): array
  27.     {
  28.         return [
  29.             // must have higher priority than SendFailedMessageToFailureTransportListener
  30.             WorkerMessageFailedEvent::class => ['onMessageFailed'99],
  31.         ];
  32.     }
  33.     public function onMessageFailed(WorkerMessageFailedEvent $event): void
  34.     {
  35.         if ($event->willRetry()) {
  36.             return;
  37.         }
  38.         $message $event->getEnvelope();
  39.         if (!$this->wasReceivedByDefaultTransport($message)) {
  40.             return;
  41.         }
  42.         $name = \get_class($message->getMessage());
  43.         try {
  44.             $gateway $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL);
  45.         } catch (IncrementGatewayNotFoundException $exception) {
  46.             return;
  47.         }
  48.         $gateway->decrement('message_queue_stats'$name);
  49.     }
  50.     private function wasReceivedByDefaultTransport(Envelope $message): bool
  51.     {
  52.         foreach ($message->all(ReceivedStamp::class) as $stamp) {
  53.             if ($stamp instanceof ReceivedStamp && $stamp->getTransportName() === $this->defaultTransportName) {
  54.                 return true;
  55.             }
  56.         }
  57.         return false;
  58.     }
  59. }