diff --git a/src/Contract/MonitoringMessageInterface.php b/src/Contract/MonitoringMessageInterface.php new file mode 100644 index 0000000..814917b --- /dev/null +++ b/src/Contract/MonitoringMessageInterface.php @@ -0,0 +1,9 @@ +isMonitoringMessage($event)) { + return; + } + $messageClass = $this->getShortClassName($event->getEnvelope()->getMessage()::class); $this->bus->dispatch(new PushMetricMessage( @@ -49,6 +54,10 @@ public function onMessageReceived(WorkerMessageReceivedEvent $event): void public function onMessageHandled(WorkerMessageHandledEvent $event): void { + if ($this->isMonitoringMessage($event)) { + return; + } + $messageClass = $this->getShortClassName($event->getEnvelope()->getMessage()::class); $this->bus->dispatch(new PushMetricMessage( @@ -63,6 +72,10 @@ public function onMessageHandled(WorkerMessageHandledEvent $event): void public function onMessageFailed(WorkerMessageFailedEvent $event): void { + if ($this->isMonitoringMessage($event)) { + return; + } + $messageClass = $this->getShortClassName($event->getEnvelope()->getMessage()::class); $errorClass = $this->getShortClassName($event->getThrowable()::class); @@ -77,6 +90,11 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void )); } + private function isMonitoringMessage(WorkerMessageReceivedEvent|WorkerMessageHandledEvent|WorkerMessageFailedEvent $event): bool + { + return $event->getEnvelope()->getMessage() instanceof MonitoringMessageInterface; + } + private function getShortClassName(string $fqcn): string { $parts = explode('\\', $fqcn); diff --git a/tests/Subscriber/MessengerMonitoringSubscriberTest.php b/tests/Subscriber/MessengerMonitoringSubscriberTest.php new file mode 100644 index 0000000..2453395 --- /dev/null +++ b/tests/Subscriber/MessengerMonitoringSubscriberTest.php @@ -0,0 +1,143 @@ +bus = $this->createMock(MessageBusInterface::class); + $this->naming = $this->createMock(ZabbixNamingProviderInterface::class); + $this->naming->method('getItemKey')->willReturnCallback(static fn (string $key) => 'app.' . $key); + + $this->subscriber = new MessengerMonitoringSubscriber( + $this->bus, + $this->naming, + 'test', + ); + } + + public function testPushMetricMessageImplementsMonitoringMessageInterface(): void + { + $message = new PushMetricMessage('test.key', 1); + $this->assertInstanceOf(MonitoringMessageInterface::class, $message); + } + + public function testPushEventMessageImplementsMonitoringMessageInterface(): void + { + $message = new PushEventMessage('test.key', ['foo' => 'bar']); + $this->assertInstanceOf(MonitoringMessageInterface::class, $message); + } + + public function testOnMessageReceivedDoesNotDispatchForPushMetricMessage(): void + { + $monitoringMessage = new PushMetricMessage('test.key', 42); + $envelope = new Envelope($monitoringMessage, [new ReceivedStamp('async')]); + $event = new WorkerMessageReceivedEvent($envelope, 'async'); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageReceived($event); + } + + public function testOnMessageHandledDoesNotDispatchForPushMetricMessage(): void + { + $monitoringMessage = new PushMetricMessage('test.key', 42); + $envelope = new Envelope($monitoringMessage); + $event = new WorkerMessageHandledEvent($envelope, 'async'); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageHandled($event); + } + + public function testOnMessageFailedDoesNotDispatchForPushMetricMessage(): void + { + $monitoringMessage = new PushMetricMessage('test.key', 42); + $envelope = new Envelope($monitoringMessage); + $event = new WorkerMessageFailedEvent($envelope, 'async', new \RuntimeException('Test error')); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageFailed($event); + } + + public function testOnMessageReceivedDoesNotDispatchForPushEventMessage(): void + { + $monitoringMessage = new PushEventMessage('test.key', ['data' => 'value']); + $envelope = new Envelope($monitoringMessage, [new ReceivedStamp('async')]); + $event = new WorkerMessageReceivedEvent($envelope, 'async'); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageReceived($event); + } + + public function testOnMessageHandledDoesNotDispatchForPushEventMessage(): void + { + $monitoringMessage = new PushEventMessage('test.key', ['data' => 'value']); + $envelope = new Envelope($monitoringMessage); + $event = new WorkerMessageHandledEvent($envelope, 'async'); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageHandled($event); + } + + public function testOnMessageFailedDoesNotDispatchForPushEventMessage(): void + { + $monitoringMessage = new PushEventMessage('test.key', ['data' => 'value']); + $envelope = new Envelope($monitoringMessage); + $event = new WorkerMessageFailedEvent($envelope, 'async', new \RuntimeException('Test error')); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageFailed($event); + } + + public function testNoRecursiveMessageLoopOccursForPushMetricMessage(): void + { + $monitoringMessage = new PushMetricMessage('test.metric', 100); + $envelope = new Envelope($monitoringMessage); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageReceived(new WorkerMessageReceivedEvent($envelope, 'async')); + $this->subscriber->onMessageHandled(new WorkerMessageHandledEvent($envelope, 'async')); + $this->subscriber->onMessageFailed(new WorkerMessageFailedEvent($envelope, 'async', new \RuntimeException())); + } + + public function testNoRecursiveMessageLoopOccursForPushEventMessage(): void + { + $monitoringMessage = new PushEventMessage('test.event', ['data' => 'value']); + $envelope = new Envelope($monitoringMessage); + + $this->bus->expects($this->never())->method('dispatch'); + + $this->subscriber->onMessageReceived(new WorkerMessageReceivedEvent($envelope, 'async')); + $this->subscriber->onMessageHandled(new WorkerMessageHandledEvent($envelope, 'async')); + $this->subscriber->onMessageFailed(new WorkerMessageFailedEvent($envelope, 'async', new \RuntimeException())); + } +} \ No newline at end of file