From 9fa3611570c9129f13f24360654d9385f48f0a66 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 19:57:34 +0300 Subject: [PATCH 1/5] Add ConnectionBlocked, ConnectionUnblocked frames --- .../Protocol/Frame/ConnectionBlocked.php | 28 +++++++++++++++++++ .../Protocol/Frame/ConnectionUnblocked.php | 23 +++++++++++++++ src/Internal/Protocol/Protocol.php | 2 ++ 3 files changed, 53 insertions(+) create mode 100644 src/Internal/Protocol/Frame/ConnectionBlocked.php create mode 100644 src/Internal/Protocol/Frame/ConnectionUnblocked.php diff --git a/src/Internal/Protocol/Frame/ConnectionBlocked.php b/src/Internal/Protocol/Frame/ConnectionBlocked.php new file mode 100644 index 0000000..7c3758d --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionBlocked.php @@ -0,0 +1,28 @@ +readString()); + } + + public function write(Io\WriteBytes $writer): void + { + $writer->writeString($this->reason); + } +} diff --git a/src/Internal/Protocol/Frame/ConnectionUnblocked.php b/src/Internal/Protocol/Frame/ConnectionUnblocked.php new file mode 100644 index 0000000..f17f08c --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionUnblocked.php @@ -0,0 +1,23 @@ + Frame\ConnectionOpenOk::class, ClassMethod::CONNECTION_CLOSE => Frame\ConnectionClose::class, ClassMethod::CONNECTION_CLOSE_OK => Frame\ConnectionCloseOk::class, + ClassMethod::CONNECTION_BLOCKED => Frame\ConnectionBlocked::class, + ClassMethod::CONNECTION_UNBLOCKED => Frame\ConnectionUnblocked::class, ], ClassType::CHANNEL => [ ClassMethod::CHANNEL_OPEN_OK => Frame\ChannelOpenOkFrame::class, From 172f84c6ec1324adf514bdda3e57e99a8ee5e1a6 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 19:59:23 +0300 Subject: [PATCH 2/5] Set hooks in constructor of AmqpConnection, rename ioLoop() to setup() --- .php-cs-fixer.dist.php | 4 ++-- src/Internal/Io/AmqpConnection.php | 4 +++- src/Internal/Io/AmqpConnectionFactory.php | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index 8007ba7..2221b43 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -7,7 +7,7 @@ use PhpCsFixer\Runner\Parallel\ParallelConfigFactory; use PHPyh\CodingStandard\PhpCsFixerCodingStandard; -$config = (new Config()) +$config = new Config() ->setFinder( Finder::create() ->in(__DIR__ . '/src') @@ -20,7 +20,7 @@ ->setParallelConfig(ParallelConfigFactory::detect()) ->setCacheFile(__DIR__ . '/var/' . basename(__FILE__) . '.cache'); -(new PhpCsFixerCodingStandard())->applyTo($config, [ +new PhpCsFixerCodingStandard()->applyTo($config, [ 'numeric_literal_separator' => false, ]); diff --git a/src/Internal/Io/AmqpConnection.php b/src/Internal/Io/AmqpConnection.php index 600b442..7b70613 100644 --- a/src/Internal/Io/AmqpConnection.php +++ b/src/Internal/Io/AmqpConnection.php @@ -33,6 +33,7 @@ final class AmqpConnection implements Writer public function __construct( private readonly Socket $socket, + private readonly Hooks $hooks, ) { $this->buffer = Buffer::empty(); $this->reader = new Protocol\Reader( @@ -91,8 +92,9 @@ public function write(string $bytes): void $this->lastWrite = Amp\now(); } - public function ioLoop(Hooks $hooks): void + public function setup(): void { + $hooks = $this->hooks; $reader = $this->reader; $isClosed = &$this->closed; diff --git a/src/Internal/Io/AmqpConnectionFactory.php b/src/Internal/Io/AmqpConnectionFactory.php index 8685f17..286740c 100644 --- a/src/Internal/Io/AmqpConnectionFactory.php +++ b/src/Internal/Io/AmqpConnectionFactory.php @@ -61,7 +61,7 @@ public function connect(): AmqpConnection Frame\ConnectionOpenOk::class, ); - $connection->ioLoop($this->hooks); + $connection->setup(); $this->hooks->anyOf(0, Frame\ConnectionClose::class, static function () use ($connection): void { $connection->writeFrame(Protocol\Method::connectionCloseOk()); @@ -93,7 +93,7 @@ private function createConnection(): AmqpConnection foreach ($this->config->connectionUrls() as $url) { try { - return new AmqpConnection($this->createSocket($url)); + return new AmqpConnection($this->createSocket($url), $this->hooks); } catch (\Throwable $e) { $exceptions[] = "{$url}: {$e->getMessage()}"; } @@ -109,7 +109,7 @@ private function createConnection(): AmqpConnection */ private function createSocket(string $url): Socket\Socket { - $context = (new Socket\ConnectContext()) + $context = new Socket\ConnectContext() ->withConnectTimeout($this->config->connectionTimeout); if ($this->config->tcpNoDelay) { From faad31c7b6c24e61ae31fdc51c6e6dbc9c098fe1 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 20:14:08 +0300 Subject: [PATCH 3/5] Add AmqpConnection::ensureNotBlocked() method --- src/Exception/ConnectionIsBlocked.php | 23 +++++++++++++++++++ src/Internal/Io/AmqpConnection.php | 32 +++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 src/Exception/ConnectionIsBlocked.php diff --git a/src/Exception/ConnectionIsBlocked.php b/src/Exception/ConnectionIsBlocked.php new file mode 100644 index 0000000..27aa05c --- /dev/null +++ b/src/Exception/ConnectionIsBlocked.php @@ -0,0 +1,23 @@ +hooks; + + $blockedReason = &$this->blockedReason; + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionBlocked::class, + static function (Protocol\Frame\ConnectionBlocked $blocked) use (&$blockedReason): void { + $blockedReason = $blocked->reason; + }, + ); + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionUnblocked::class, + static function () use (&$blockedReason): void { + $blockedReason = null; + }, + ); + $reader = $this->reader; $isClosed = &$this->closed; @@ -129,6 +154,13 @@ public function heartbeat(int $interval): void }); } + public function ensureNotBlocked(): void + { + if ($this->blockedReason !== null) { + throw new ConnectionIsBlocked($this->blockedReason); + } + } + public function close(): void { if (!$this->socket->isClosed()) { From 680d367ec61c873f3fb0cb9bf2ed9a87add9e8f6 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 20:32:40 +0300 Subject: [PATCH 4/5] Use ensureNotBlocked in Channel::publish*() methods --- src/Channel.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Channel.php b/src/Channel.php index b25833c..40923f4 100644 --- a/src/Channel.php +++ b/src/Channel.php @@ -86,6 +86,8 @@ public function publish( bool $mandatory = false, bool $immediate = false, ): ?PublishConfirmation { + $this->connection->ensureNotBlocked(); + /** @var ?PublishConfirmation $confirmation */ $confirmation = null; @@ -110,6 +112,8 @@ public function publish( */ public function publishBatch(array $publishMessages): PublishBatchConfirmation { + $this->connection->ensureNotBlocked(); + /** @var list $confirmations */ $confirmations = []; From ef09ea500881ff18bd1e5c85f6cdd956695617f1 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 22:52:25 +0300 Subject: [PATCH 5/5] Add rabbitmq_low_memory service to docker-compose.yaml --- docker-compose.yaml | 9 +++++++++ docker/rabbitmq/rabbitmq.conf | 1 + 2 files changed, 10 insertions(+) create mode 100644 docker/rabbitmq/rabbitmq.conf diff --git a/docker-compose.yaml b/docker-compose.yaml index 7bb1952..a24a7b0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,3 +14,12 @@ services: ports: - "127.0.0.1:5672:5672" - "127.0.0.1:15672:15672" + + rabbitmq_low_memory: + image: rabbitmq:4.0-management-alpine + restart: always + ports: + - "127.0.0.1:5673:5672" + - "127.0.0.1:15673:15672" + volumes: + - ./docker/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf diff --git a/docker/rabbitmq/rabbitmq.conf b/docker/rabbitmq/rabbitmq.conf new file mode 100644 index 0000000..76549a9 --- /dev/null +++ b/docker/rabbitmq/rabbitmq.conf @@ -0,0 +1 @@ +vm_memory_high_watermark.absolute = 130MiB