From 32274bdc5f94dca63b5ef53f4b6260c07b2cd340 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Sat, 14 Jun 2025 19:29:43 +0500 Subject: [PATCH 1/7] Fix consuming existing messages --- src/ExistingMessagesConsumer.php | 54 +++++++++++++++++--------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index a0a1071..e8596dd 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -29,34 +29,36 @@ public function __construct( */ public function consume(callable $callback): void { - $this->channel->basic_consume( - $this->queueName, - '', - false, - false, - false, - false, - function (AMQPMessage $amqpMessage) use ($callback): void { - try { - $message = $this->serializer->unserialize($amqpMessage->getBody()); - if ($this->messageConsumed = $callback($message)) { - $this->channel->basic_ack($amqpMessage->getDeliveryTag()); - } - } catch (Throwable $exception) { - $this->messageConsumed = false; - $consumerTag = $amqpMessage->getConsumerTag(); - if ($consumerTag !== null) { - $this->channel->basic_cancel($consumerTag); - } + $consumerTag = uniqid(more_entropy: true); + try { + $this->channel->basic_consume( + $this->queueName, + $consumerTag, + false, + false, + false, + false, + function (AMQPMessage $amqpMessage) use ($callback): void { + try { + $message = $this->serializer->unserialize($amqpMessage->getBody()); + if ($this->messageConsumed = $callback($message)) { + $this->channel->basic_ack($amqpMessage->getDeliveryTag()); + } + } catch (Throwable $exception) { + $this->messageConsumed = false; + $this->channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); - throw $exception; + throw $exception; + } } - } - ); + ); - do { - $this->messageConsumed = false; - $this->channel->wait(null, true); - } while ($this->messageConsumed === true); + do { + $this->messageConsumed = false; + $this->channel->wait(null, true); + } while ($this->messageConsumed === true); + } finally { + $this->channel->basic_cancel($consumerTag, false, false); + } } } From 6d897bb27b05fc9a41f0566a6f77317ca926c7e9 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Sat, 28 Jun 2025 13:05:25 +0500 Subject: [PATCH 2/7] Close channel after message consuming is stopped --- src/ExistingMessagesConsumer.php | 1 + src/QueueProvider.php | 44 ++++++--- .../ConsumeExistingMessagesTest.php | 93 +++++++++++++++++++ 3 files changed, 126 insertions(+), 12 deletions(-) create mode 100644 tests/Integration/ConsumeExistingMessagesTest.php diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index e8596dd..3a6612e 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -59,6 +59,7 @@ function (AMQPMessage $amqpMessage) use ($callback): void { } while ($this->messageConsumed === true); } finally { $this->channel->basic_cancel($consumerTag, false, false); + $this->channel->close(); } } } diff --git a/src/QueueProvider.php b/src/QueueProvider.php index 3af0873..98c9dea 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -15,7 +15,7 @@ final class QueueProvider implements QueueProviderInterface { public const EXCHANGE_NAME_DEFAULT = 'yii-queue'; - private ?AMQPChannel $channel = null; + private ?int $channelId = null; public function __construct( private readonly AbstractConnection $connection, @@ -28,24 +28,33 @@ public function __construct( } } + public function __clone() + { + $this->channelId = null; + } + public function __destruct() { - $this->channel?->close(); + if ($this->channelId !== null) { + $this->connection->channel($this->channelId)->close(); + } } + /** + * Returns an AMQPChannel instance. + * IMPORTANT: Do NOT memorise the channel instance, as this will cause memory leaks on channel close! + */ public function getChannel(): AMQPChannel { - if ($this->channel === null) { - $this->channel = $this->connection->channel(); - $this->channel->queue_declare(...$this->queueSettings->getPositionalSettings()); - - if ($this->exchangeSettings !== null) { - $this->channel->exchange_declare(...$this->exchangeSettings->getPositionalSettings()); - $this->channel->queue_bind($this->queueSettings->getName(), $this->exchangeSettings->getName()); - } + $channel = $this->connection->channel($this->getChannelId()); + $channel->queue_declare(...$this->queueSettings->getPositionalSettings()); + + if ($this->exchangeSettings !== null) { + $channel->exchange_declare(...$this->exchangeSettings->getPositionalSettings()); + $channel->queue_bind($this->queueSettings->getName(), $this->exchangeSettings->getName()); } - return $this->channel; + return $channel; } public function getQueueSettings(): QueueSettingsInterface @@ -74,8 +83,10 @@ public function withChannelName(string $channel): self } $instance = clone $this; - $instance->channel = null; $instance->queueSettings = $instance->queueSettings->withName($channel); + if ($this->channelId !== null) { + $instance->channelId = null; + } return $instance; } @@ -112,4 +123,13 @@ public function withMessageProperties(array $properties): QueueProviderInterface return $new; } + + private function getChannelId(): int + { + if ($this->channelId === null) { + $this->channelId = $this->connection->get_free_channel_id(); + } + + return $this->channelId; + } } diff --git a/tests/Integration/ConsumeExistingMessagesTest.php b/tests/Integration/ConsumeExistingMessagesTest.php new file mode 100644 index 0000000..f5f9380 --- /dev/null +++ b/tests/Integration/ConsumeExistingMessagesTest.php @@ -0,0 +1,93 @@ +push(new Message('test', ['payload' => 'test'])); + } + + // wait for messages to be ready to consume + sleep(1); + + $processingCount = 0; + $adapter->runExisting(static function() use (&$processingCount): bool { + $processingCount++; + return true; + }); + + self::assertEquals($messageCount, $processingCount); + } + + public function testConsumeExistingMessagesByOne(): void + { + $loop = new SimpleLoop(); + $serializer = new JsonMessageSerializer(); + $queueProvider = new QueueProvider( + new AMQPStreamConnection( + getenv('RABBITMQ_HOST'), + getenv('RABBITMQ_PORT'), + getenv('RABBITMQ_USER'), + getenv('RABBITMQ_PASSWORD'), + ), + new QueueSettings() + ); + $adapter = new Adapter($queueProvider, $serializer, $loop); + + $messageCount = 10; + for ($i = 0; $i < $messageCount; $i++) { + $adapter->push(new Message('test', ['payload' => 'test'])); + } + + // wait for messages to be ready to consume + sleep(1); + + $processingCount = 0; + $messageProcessed = true; + + // Call the `runExisting` method $messageCount times + while ($messageProcessed) { + $messageProcessed = false; + $adapter->runExisting(static function () use (&$processingCount, &$messageProcessed): bool { + if ($messageProcessed) { + return false; + } + + $messageProcessed = true; + $processingCount++; + + return true; + }); + }; + + self::assertEquals($messageCount, $processingCount); + } +} From a38c42fa8d5b73f76beb0ec07dca5471aa94713d Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Sat, 28 Jun 2025 08:05:45 +0000 Subject: [PATCH 3/7] Apply fixes from StyleCI --- tests/Integration/ConsumeExistingMessagesTest.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/Integration/ConsumeExistingMessagesTest.php b/tests/Integration/ConsumeExistingMessagesTest.php index f5f9380..84d1450 100644 --- a/tests/Integration/ConsumeExistingMessagesTest.php +++ b/tests/Integration/ConsumeExistingMessagesTest.php @@ -7,7 +7,6 @@ use Yiisoft\Queue\AMQP\Adapter; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; -use Yiisoft\Queue\AMQP\Tests\Integration\TestCase; use Yiisoft\Queue\Cli\SimpleLoop; use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; @@ -86,7 +85,7 @@ public function testConsumeExistingMessagesByOne(): void return true; }); - }; + } self::assertEquals($messageCount, $processingCount); } From 3c2d19b11a5e6bf473d59398f828f7bbc3203338 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Sat, 28 Jun 2025 13:18:57 +0500 Subject: [PATCH 4/7] bugfix --- src/ExistingMessagesConsumer.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index 3a6612e..dbd543e 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -43,6 +43,8 @@ function (AMQPMessage $amqpMessage) use ($callback): void { $message = $this->serializer->unserialize($amqpMessage->getBody()); if ($this->messageConsumed = $callback($message)) { $this->channel->basic_ack($amqpMessage->getDeliveryTag()); + } else { + $this->channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); } } catch (Throwable $exception) { $this->messageConsumed = false; From e08ade7cae682619c28c453b415e2ee93482921a Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Sat, 28 Jun 2025 13:31:46 +0500 Subject: [PATCH 5/7] Close channel inside the queue provider --- src/Adapter.php | 6 +----- src/ExistingMessagesConsumer.php | 22 +++++++++++----------- src/QueueProvider.php | 16 ++++++++++++++++ src/QueueProviderInterface.php | 5 +++++ 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 5b25660..2370316 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -38,11 +38,7 @@ public function withChannel(BackedEnum|string $channel): self */ public function runExisting(callable $handlerCallback): void { - $channel = $this->queueProvider->getChannel(); - (new ExistingMessagesConsumer($channel, $this->queueProvider - ->getQueueSettings() - ->getName(), $this->serializer)) - ->consume($handlerCallback); + (new ExistingMessagesConsumer($this->queueProvider, $this->serializer))->consume($handlerCallback); } /** diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index dbd543e..3a5fbd3 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -18,8 +18,7 @@ final class ExistingMessagesConsumer private bool $messageConsumed = false; public function __construct( - private readonly AMQPChannel $channel, - private readonly string $queueName, + private readonly QueueProviderInterface $queueProvider, private readonly MessageSerializerInterface $serializer ) { } @@ -29,26 +28,27 @@ public function __construct( */ public function consume(callable $callback): void { + $channel = $this->queueProvider->getChannel(); $consumerTag = uniqid(more_entropy: true); try { - $this->channel->basic_consume( - $this->queueName, + $channel->basic_consume( + $this->queueProvider->getQueueSettings()->getName(), $consumerTag, false, false, false, false, - function (AMQPMessage $amqpMessage) use ($callback): void { + function (AMQPMessage $amqpMessage) use ($callback, $channel): void { try { $message = $this->serializer->unserialize($amqpMessage->getBody()); if ($this->messageConsumed = $callback($message)) { - $this->channel->basic_ack($amqpMessage->getDeliveryTag()); + $channel->basic_ack($amqpMessage->getDeliveryTag()); } else { - $this->channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); + $channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); } } catch (Throwable $exception) { $this->messageConsumed = false; - $this->channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); + $channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); throw $exception; } @@ -57,11 +57,11 @@ function (AMQPMessage $amqpMessage) use ($callback): void { do { $this->messageConsumed = false; - $this->channel->wait(null, true); + $channel->wait(null, true); } while ($this->messageConsumed === true); } finally { - $this->channel->basic_cancel($consumerTag, false, false); - $this->channel->close(); + $channel->basic_cancel($consumerTag); + $this->queueProvider->channelClose(); } } } diff --git a/src/QueueProvider.php b/src/QueueProvider.php index 98c9dea..0c89838 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -11,6 +11,9 @@ use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface; use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface; +/** + * @internal + */ final class QueueProvider implements QueueProviderInterface { public const EXCHANGE_NAME_DEFAULT = 'yii-queue'; @@ -46,6 +49,11 @@ public function __destruct() */ public function getChannel(): AMQPChannel { + if ($this->channelId !== null) { + return $this->connection->channel($this->channelId); + } + + $this->channelId = $this->connection->get_free_channel_id(); $channel = $this->connection->channel($this->getChannelId()); $channel->queue_declare(...$this->queueSettings->getPositionalSettings()); @@ -124,6 +132,14 @@ public function withMessageProperties(array $properties): QueueProviderInterface return $new; } + public function channelClose(): void + { + if ($this->channelId !== null) { + $this->connection->channel($this->channelId)->close(); + $this->channelId = null; + } + } + private function getChannelId(): int { if ($this->channelId === null) { diff --git a/src/QueueProviderInterface.php b/src/QueueProviderInterface.php index 32b181b..394932d 100644 --- a/src/QueueProviderInterface.php +++ b/src/QueueProviderInterface.php @@ -8,6 +8,9 @@ use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface; use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface; +/** + * @internal + */ interface QueueProviderInterface { public function getChannel(): AMQPChannel; @@ -25,4 +28,6 @@ public function withQueueSettings(QueueSettingsInterface $queueSettings): self; public function withExchangeSettings(?ExchangeSettingsInterface $exchangeSettings): self; public function withMessageProperties(array $properties): self; + + public function channelClose(): void; } From bec15b58bbaf1be3e0402710d95ab8c9d6c33bb4 Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Sat, 28 Jun 2025 11:38:41 +0000 Subject: [PATCH 6/7] Apply fixes from StyleCI --- src/ExistingMessagesConsumer.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index 3a5fbd3..504653d 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -4,7 +4,6 @@ namespace Yiisoft\Queue\AMQP; -use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use Throwable; use Yiisoft\Queue\Message\MessageInterface; From 34c8732dc182138ff54294a0ad13fef8724d4545 Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Sun, 29 Jun 2025 12:11:53 +0000 Subject: [PATCH 7/7] Apply fixes from StyleCI --- tests/Benchmark/QueueConsumeBench.php | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/Benchmark/QueueConsumeBench.php b/tests/Benchmark/QueueConsumeBench.php index b18258c..f19b7c5 100644 --- a/tests/Benchmark/QueueConsumeBench.php +++ b/tests/Benchmark/QueueConsumeBench.php @@ -68,8 +68,6 @@ private static function getAdapter(): Adapter ), new QueueSettings(), ); - $adapter = new Adapter($queueProvider, $serializer, $loop); - - return $adapter; + return new Adapter($queueProvider, $serializer, $loop); } }