From 1f77077bfa9d01696f4bf4a037369b0a8f56409a Mon Sep 17 00:00:00 2001 From: Maximilian Graf Schimmelmann Date: Wed, 11 Mar 2026 11:59:39 +0100 Subject: [PATCH] BC-XX Prevent messaging loop when handeling zabbix messages. --- src/Message/EnsureZabbixSetupMessage.php | 3 +- .../MessengerMonitoringSubscriber.php | 36 ++++++++++++++++--- .../MessengerMonitoringSubscriberTest.php | 27 +++++++++++++- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/src/Message/EnsureZabbixSetupMessage.php b/src/Message/EnsureZabbixSetupMessage.php index c91a83a..e1f2b55 100644 --- a/src/Message/EnsureZabbixSetupMessage.php +++ b/src/Message/EnsureZabbixSetupMessage.php @@ -4,10 +4,11 @@ namespace BytesCommerce\ZabbixApi\Message; +use BytesCommerce\ZabbixApi\Contract\MonitoringMessageInterface; use Symfony\Component\Messenger\Attribute\AsMessage; #[AsMessage('async')] -final readonly class EnsureZabbixSetupMessage +final readonly class EnsureZabbixSetupMessage implements MonitoringMessageInterface { public function __construct(public bool $force = false) { diff --git a/src/Subscriber/MessengerMonitoringSubscriber.php b/src/Subscriber/MessengerMonitoringSubscriber.php index 7810f88..f7fcff4 100644 --- a/src/Subscriber/MessengerMonitoringSubscriber.php +++ b/src/Subscriber/MessengerMonitoringSubscriber.php @@ -9,6 +9,7 @@ use BytesCommerce\ZabbixApi\Message\PushMetricMessage; use Symfony\Component\DependencyInjection\Attribute\Autowire; use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; @@ -16,6 +17,8 @@ final readonly class MessengerMonitoringSubscriber implements EventSubscriberInterface { + private const string ZABBIX_MESSAGE_NAMESPACE = 'BytesCommerce\\ZabbixApi\\Message\\'; + public function __construct( private MessageBusInterface $bus, private ZabbixNamingProviderInterface $naming, @@ -39,7 +42,12 @@ public function onMessageReceived(WorkerMessageReceivedEvent $event): void return; } - $messageClass = $this->getShortClassName($event->getEnvelope()->getMessage()::class); + $message = $event->getEnvelope()->getMessage(); + while ($message instanceof Envelope) { + $message = $message->getMessage(); + } + + $messageClass = $this->getShortClassName($message::class); $this->bus->dispatch(new PushMetricMessage( key: $this->naming->getItemKey('messenger.received'), @@ -58,7 +66,12 @@ public function onMessageHandled(WorkerMessageHandledEvent $event): void return; } - $messageClass = $this->getShortClassName($event->getEnvelope()->getMessage()::class); + $message = $event->getEnvelope()->getMessage(); + while ($message instanceof Envelope) { + $message = $message->getMessage(); + } + + $messageClass = $this->getShortClassName($message::class); $this->bus->dispatch(new PushMetricMessage( key: $this->naming->getItemKey('messenger.handled'), @@ -76,7 +89,12 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void return; } - $messageClass = $this->getShortClassName($event->getEnvelope()->getMessage()::class); + $message = $event->getEnvelope()->getMessage(); + while ($message instanceof Envelope) { + $message = $message->getMessage(); + } + + $messageClass = $this->getShortClassName($message::class); $errorClass = $this->getShortClassName($event->getThrowable()::class); $this->bus->dispatch(new PushMetricMessage( @@ -92,7 +110,17 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void private function isMonitoringMessage(WorkerMessageReceivedEvent|WorkerMessageHandledEvent|WorkerMessageFailedEvent $event): bool { - return $event->getEnvelope()->getMessage() instanceof MonitoringMessageInterface; + $message = $event->getEnvelope()->getMessage(); + + while ($message instanceof Envelope) { + $message = $message->getMessage(); + } + + if ($message instanceof MonitoringMessageInterface) { + return true; + } + + return str_starts_with($message::class, self::ZABBIX_MESSAGE_NAMESPACE); } private function getShortClassName(string $fqcn): string diff --git a/tests/Subscriber/MessengerMonitoringSubscriberTest.php b/tests/Subscriber/MessengerMonitoringSubscriberTest.php index 2453395..06e78e0 100644 --- a/tests/Subscriber/MessengerMonitoringSubscriberTest.php +++ b/tests/Subscriber/MessengerMonitoringSubscriberTest.php @@ -6,6 +6,7 @@ use BytesCommerce\ZabbixApi\Contract\MonitoringMessageInterface; use BytesCommerce\ZabbixApi\Contract\ZabbixNamingProviderInterface; +use BytesCommerce\ZabbixApi\Message\EnsureZabbixSetupMessage; use BytesCommerce\ZabbixApi\Message\PushEventMessage; use BytesCommerce\ZabbixApi\Message\PushMetricMessage; use BytesCommerce\ZabbixApi\Subscriber\MessengerMonitoringSubscriber; @@ -140,4 +141,28 @@ public function testNoRecursiveMessageLoopOccursForPushEventMessage(): void $this->subscriber->onMessageHandled(new WorkerMessageHandledEvent($envelope, 'async')); $this->subscriber->onMessageFailed(new WorkerMessageFailedEvent($envelope, 'async', new \RuntimeException())); } -} \ No newline at end of file + + public function testNoRecursiveMessageLoopOccursForEnsureZabbixSetupMessage(): void + { + $monitoringMessage = new EnsureZabbixSetupMessage(); + $envelope = new Envelope($monitoringMessage); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageReceived(new WorkerMessageReceivedEvent($envelope, 'async')); + $this->subscriber->onMessageHandled(new WorkerMessageHandledEvent($envelope, 'async')); + $this->subscriber->onMessageFailed(new WorkerMessageFailedEvent($envelope, 'async', new \RuntimeException())); + } + + public function testNoRecursiveMessageLoopOccursForNestedEnvelope(): void + { + $monitoringMessage = new PushMetricMessage('test.metric', 100); + $envelope = new Envelope(new Envelope($monitoringMessage)); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageReceived(new WorkerMessageReceivedEvent($envelope, 'async')); + $this->subscriber->onMessageHandled(new WorkerMessageHandledEvent($envelope, 'async')); + $this->subscriber->onMessageFailed(new WorkerMessageFailedEvent($envelope, 'async', new \RuntimeException())); + } +}