diff --git a/composer.json b/composer.json index 428f884..33147f7 100644 --- a/composer.json +++ b/composer.json @@ -33,6 +33,7 @@ "require-dev": { "infection/infection": "^0.27.8||^0.29.0", "maglnet/composer-require-checker": "^4.4", + "phpbench/phpbench": "^1.4", "phpunit/phpunit": "^9.5", "rector/rector": "^2.0.15", "roave/infection-static-analysis-plugin": "^1.16", diff --git a/phpbench.json b/phpbench.json new file mode 100644 index 0000000..0443602 --- /dev/null +++ b/phpbench.json @@ -0,0 +1,8 @@ +{ + "$schema":"./vendor/phpbench/phpbench/phpbench.schema.json", + "runner.bootstrap": "vendor/autoload.php", + "runner.path": "tests/Benchmark", + "runner.revs": 100000, + "runner.iterations": 5, + "runner.warmup": 5 +} diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index a0a1071..e8596dd 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -29,34 +29,36 @@ public function __construct( */ public function consume(callable $callback): void { - $this->channel->basic_consume( - $this->queueName, - '', - false, - false, - false, - false, - function (AMQPMessage $amqpMessage) use ($callback): void { - try { - $message = $this->serializer->unserialize($amqpMessage->getBody()); - if ($this->messageConsumed = $callback($message)) { - $this->channel->basic_ack($amqpMessage->getDeliveryTag()); - } - } catch (Throwable $exception) { - $this->messageConsumed = false; - $consumerTag = $amqpMessage->getConsumerTag(); - if ($consumerTag !== null) { - $this->channel->basic_cancel($consumerTag); - } + $consumerTag = uniqid(more_entropy: true); + try { + $this->channel->basic_consume( + $this->queueName, + $consumerTag, + false, + false, + false, + false, + function (AMQPMessage $amqpMessage) use ($callback): void { + try { + $message = $this->serializer->unserialize($amqpMessage->getBody()); + if ($this->messageConsumed = $callback($message)) { + $this->channel->basic_ack($amqpMessage->getDeliveryTag()); + } + } catch (Throwable $exception) { + $this->messageConsumed = false; + $this->channel->basic_nack($amqpMessage->getDeliveryTag(), false, true); - throw $exception; + throw $exception; + } } - } - ); + ); - do { - $this->messageConsumed = false; - $this->channel->wait(null, true); - } while ($this->messageConsumed === true); + do { + $this->messageConsumed = false; + $this->channel->wait(null, true); + } while ($this->messageConsumed === true); + } finally { + $this->channel->basic_cancel($consumerTag, false, false); + } } } diff --git a/tests/Benchmark/QueueBench.php b/tests/Benchmark/QueueBench.php new file mode 100644 index 0000000..b9c8306 --- /dev/null +++ b/tests/Benchmark/QueueBench.php @@ -0,0 +1,72 @@ +adapter = $adapter; + } + + #[\PhpBench\Attributes\BeforeMethods('cleanupQueue')] + #[OutputMode('throughput')] + #[OutputTimeUnit('seconds', 3)] + #[Skip] + public function benchPush(): void + { + $this->adapter->push(new Message('test', ['payload' => 'test'])); + } + + public function cleanupQueue(): void + { + $this->adapter->runExisting(static fn (): bool => true); + } + + #[\PhpBench\Attributes\Iterations(5)] + #[\PhpBench\Attributes\Revs(self::CONSUME_REVISIONS)] + #[\PhpBench\Attributes\BeforeMethods('pushMessagesForConsume')] + #[OutputMode('throughput')] + #[OutputTimeUnit('seconds', 3)] + public function benchConsume(): void + { + $this->adapter->runExisting(static fn (): bool => false); + } + + public function pushMessagesForConsume(): void + { + for ($i = 0; $i < self::CONSUME_REVISIONS; $i++) { + $this->adapter->push(new Message('test', ['payload' => 'test'])); + } + } +}