diff --git a/src/Adapter.php b/src/Adapter.php index 5b25660..2370316 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -38,11 +38,7 @@ public function withChannel(BackedEnum|string $channel): self */ public function runExisting(callable $handlerCallback): void { - $channel = $this->queueProvider->getChannel(); - (new ExistingMessagesConsumer($channel, $this->queueProvider - ->getQueueSettings() - ->getName(), $this->serializer)) - ->consume($handlerCallback); + (new ExistingMessagesConsumer($this->queueProvider, $this->serializer))->consume($handlerCallback); } /** diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index a0a1071..504653d 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -4,7 +4,6 @@ namespace Yiisoft\Queue\AMQP; -use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use Throwable; use Yiisoft\Queue\Message\MessageInterface; @@ -18,8 +17,7 @@ final class ExistingMessagesConsumer private bool $messageConsumed = false; public function __construct( - private readonly AMQPChannel $channel, - private readonly string $queueName, + private readonly QueueProviderInterface $queueProvider, private readonly MessageSerializerInterface $serializer ) { } @@ -29,34 +27,40 @@ public function __construct( */ public function consume(callable $callback): void { - $this->channel->basic_consume( - $this->queueName, - '', - false, - false, - false, - false, - function (AMQPMessage $amqpMessage) use ($callback): void { - try { - $message = $this->serializer->unserialize($amqpMessage->getBody()); - if ($this->messageConsumed = $callback($message)) { - $this->channel->basic_ack($amqpMessage->getDeliveryTag()); - } - } catch (Throwable $exception) { - $this->messageConsumed = false; - $consumerTag = $amqpMessage->getConsumerTag(); - if ($consumerTag !== null) { - $this->channel->basic_cancel($consumerTag); - } + $channel = $this->queueProvider->getChannel(); + $consumerTag = uniqid(more_entropy: true); + try { + $channel->basic_consume( + $this->queueProvider->getQueueSettings()->getName(), + $consumerTag, + false, + false, + false, + false, + function (AMQPMessage $amqpMessage) use ($callback, $channel): void { + try { + $message = $this->serializer->unserialize($amqpMessage->getBody()); + if ($this->messageConsumed = $callback($message)) { + $channel->basic_ack($amqpMessage->getDeliveryTag()); + } else { + $channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); + } + } catch (Throwable $exception) { + $this->messageConsumed = false; + $channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); - throw $exception; + throw $exception; + } } - } - ); + ); - do { - $this->messageConsumed = false; - $this->channel->wait(null, true); - } while ($this->messageConsumed === true); + do { + $this->messageConsumed = false; + $channel->wait(null, true); + } while ($this->messageConsumed === true); + } finally { + $channel->basic_cancel($consumerTag); + $this->queueProvider->channelClose(); + } } } diff --git a/src/QueueProvider.php b/src/QueueProvider.php index 3af0873..0c89838 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -11,11 +11,14 @@ use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface; use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface; +/** + * @internal + */ final class QueueProvider implements QueueProviderInterface { public const EXCHANGE_NAME_DEFAULT = 'yii-queue'; - private ?AMQPChannel $channel = null; + private ?int $channelId = null; public function __construct( private readonly AbstractConnection $connection, @@ -28,24 +31,38 @@ public function __construct( } } + public function __clone() + { + $this->channelId = null; + } + public function __destruct() { - $this->channel?->close(); + if ($this->channelId !== null) { + $this->connection->channel($this->channelId)->close(); + } } + /** + * Returns an AMQPChannel instance. + * IMPORTANT: Do NOT memorise the channel instance, as this will cause memory leaks on channel close! + */ public function getChannel(): AMQPChannel { - if ($this->channel === null) { - $this->channel = $this->connection->channel(); - $this->channel->queue_declare(...$this->queueSettings->getPositionalSettings()); - - if ($this->exchangeSettings !== null) { - $this->channel->exchange_declare(...$this->exchangeSettings->getPositionalSettings()); - $this->channel->queue_bind($this->queueSettings->getName(), $this->exchangeSettings->getName()); - } + if ($this->channelId !== null) { + return $this->connection->channel($this->channelId); + } + + $this->channelId = $this->connection->get_free_channel_id(); + $channel = $this->connection->channel($this->getChannelId()); + $channel->queue_declare(...$this->queueSettings->getPositionalSettings()); + + if ($this->exchangeSettings !== null) { + $channel->exchange_declare(...$this->exchangeSettings->getPositionalSettings()); + $channel->queue_bind($this->queueSettings->getName(), $this->exchangeSettings->getName()); } - return $this->channel; + return $channel; } public function getQueueSettings(): QueueSettingsInterface @@ -74,8 +91,10 @@ public function withChannelName(string $channel): self } $instance = clone $this; - $instance->channel = null; $instance->queueSettings = $instance->queueSettings->withName($channel); + if ($this->channelId !== null) { + $instance->channelId = null; + } return $instance; } @@ -112,4 +131,21 @@ public function withMessageProperties(array $properties): QueueProviderInterface return $new; } + + public function channelClose(): void + { + if ($this->channelId !== null) { + $this->connection->channel($this->channelId)->close(); + $this->channelId = null; + } + } + + private function getChannelId(): int + { + if ($this->channelId === null) { + $this->channelId = $this->connection->get_free_channel_id(); + } + + return $this->channelId; + } } diff --git a/src/QueueProviderInterface.php b/src/QueueProviderInterface.php index 32b181b..394932d 100644 --- a/src/QueueProviderInterface.php +++ b/src/QueueProviderInterface.php @@ -8,6 +8,9 @@ use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface; use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface; +/** + * @internal + */ interface QueueProviderInterface { public function getChannel(): AMQPChannel; @@ -25,4 +28,6 @@ public function withQueueSettings(QueueSettingsInterface $queueSettings): self; public function withExchangeSettings(?ExchangeSettingsInterface $exchangeSettings): self; public function withMessageProperties(array $properties): self; + + public function channelClose(): void; } diff --git a/tests/Integration/ConsumeExistingMessagesTest.php b/tests/Integration/ConsumeExistingMessagesTest.php new file mode 100644 index 0000000..84d1450 --- /dev/null +++ b/tests/Integration/ConsumeExistingMessagesTest.php @@ -0,0 +1,92 @@ +push(new Message('test', ['payload' => 'test'])); + } + + // wait for messages to be ready to consume + sleep(1); + + $processingCount = 0; + $adapter->runExisting(static function() use (&$processingCount): bool { + $processingCount++; + return true; + }); + + self::assertEquals($messageCount, $processingCount); + } + + public function testConsumeExistingMessagesByOne(): void + { + $loop = new SimpleLoop(); + $serializer = new JsonMessageSerializer(); + $queueProvider = new QueueProvider( + new AMQPStreamConnection( + getenv('RABBITMQ_HOST'), + getenv('RABBITMQ_PORT'), + getenv('RABBITMQ_USER'), + getenv('RABBITMQ_PASSWORD'), + ), + new QueueSettings() + ); + $adapter = new Adapter($queueProvider, $serializer, $loop); + + $messageCount = 10; + for ($i = 0; $i < $messageCount; $i++) { + $adapter->push(new Message('test', ['payload' => 'test'])); + } + + // wait for messages to be ready to consume + sleep(1); + + $processingCount = 0; + $messageProcessed = true; + + // Call the `runExisting` method $messageCount times + while ($messageProcessed) { + $messageProcessed = false; + $adapter->runExisting(static function () use (&$processingCount, &$messageProcessed): bool { + if ($messageProcessed) { + return false; + } + + $messageProcessed = true; + $processingCount++; + + return true; + }); + } + + self::assertEquals($messageCount, $processingCount); + } +}