From e87d726baa18f389b16fabc846d649d5c4f840c5 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 19:57:34 +0300 Subject: [PATCH 01/10] Add ConnectionBlocked, ConnectionUnblocked frames --- .../Protocol/Frame/ConnectionBlocked.php | 28 +++++++++++++++++++ .../Protocol/Frame/ConnectionUnblocked.php | 23 +++++++++++++++ src/Internal/Protocol/Protocol.php | 2 ++ 3 files changed, 53 insertions(+) create mode 100644 src/Internal/Protocol/Frame/ConnectionBlocked.php create mode 100644 src/Internal/Protocol/Frame/ConnectionUnblocked.php diff --git a/src/Internal/Protocol/Frame/ConnectionBlocked.php b/src/Internal/Protocol/Frame/ConnectionBlocked.php new file mode 100644 index 0000000..7c3758d --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionBlocked.php @@ -0,0 +1,28 @@ +readString()); + } + + public function write(Io\WriteBytes $writer): void + { + $writer->writeString($this->reason); + } +} diff --git a/src/Internal/Protocol/Frame/ConnectionUnblocked.php b/src/Internal/Protocol/Frame/ConnectionUnblocked.php new file mode 100644 index 0000000..f17f08c --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionUnblocked.php @@ -0,0 +1,23 @@ + Frame\ConnectionOpenOk::class, ClassMethod::CONNECTION_CLOSE => Frame\ConnectionClose::class, ClassMethod::CONNECTION_CLOSE_OK => Frame\ConnectionCloseOk::class, + ClassMethod::CONNECTION_BLOCKED => Frame\ConnectionBlocked::class, + ClassMethod::CONNECTION_UNBLOCKED => Frame\ConnectionUnblocked::class, ], ClassType::CHANNEL => [ ClassMethod::CHANNEL_OPEN_OK => Frame\ChannelOpenOkFrame::class, From a7c122455d0bb610701651f790f196b241e9bfba Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 19:59:23 +0300 Subject: [PATCH 02/10] Set hooks in constructor of AmqpConnection, rename ioLoop() to setup() --- .php-cs-fixer.dist.php | 4 ++-- src/Internal/Io/AmqpConnection.php | 4 +++- src/Internal/Io/AmqpConnectionFactory.php | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index 8007ba7..2221b43 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -7,7 +7,7 @@ use PhpCsFixer\Runner\Parallel\ParallelConfigFactory; use PHPyh\CodingStandard\PhpCsFixerCodingStandard; -$config = (new Config()) +$config = new Config() ->setFinder( Finder::create() ->in(__DIR__ . '/src') @@ -20,7 +20,7 @@ ->setParallelConfig(ParallelConfigFactory::detect()) ->setCacheFile(__DIR__ . '/var/' . basename(__FILE__) . '.cache'); -(new PhpCsFixerCodingStandard())->applyTo($config, [ +new PhpCsFixerCodingStandard()->applyTo($config, [ 'numeric_literal_separator' => false, ]); diff --git a/src/Internal/Io/AmqpConnection.php b/src/Internal/Io/AmqpConnection.php index 600b442..7b70613 100644 --- a/src/Internal/Io/AmqpConnection.php +++ b/src/Internal/Io/AmqpConnection.php @@ -33,6 +33,7 @@ final class AmqpConnection implements Writer public function __construct( private readonly Socket $socket, + private readonly Hooks $hooks, ) { $this->buffer = Buffer::empty(); $this->reader = new Protocol\Reader( @@ -91,8 +92,9 @@ public function write(string $bytes): void $this->lastWrite = Amp\now(); } - public function ioLoop(Hooks $hooks): void + public function setup(): void { + $hooks = $this->hooks; $reader = $this->reader; $isClosed = &$this->closed; diff --git a/src/Internal/Io/AmqpConnectionFactory.php b/src/Internal/Io/AmqpConnectionFactory.php index 8685f17..286740c 100644 --- a/src/Internal/Io/AmqpConnectionFactory.php +++ b/src/Internal/Io/AmqpConnectionFactory.php @@ -61,7 +61,7 @@ public function connect(): AmqpConnection Frame\ConnectionOpenOk::class, ); - $connection->ioLoop($this->hooks); + $connection->setup(); $this->hooks->anyOf(0, Frame\ConnectionClose::class, static function () use ($connection): void { $connection->writeFrame(Protocol\Method::connectionCloseOk()); @@ -93,7 +93,7 @@ private function createConnection(): AmqpConnection foreach ($this->config->connectionUrls() as $url) { try { - return new AmqpConnection($this->createSocket($url)); + return new AmqpConnection($this->createSocket($url), $this->hooks); } catch (\Throwable $e) { $exceptions[] = "{$url}: {$e->getMessage()}"; } @@ -109,7 +109,7 @@ private function createConnection(): AmqpConnection */ private function createSocket(string $url): Socket\Socket { - $context = (new Socket\ConnectContext()) + $context = new Socket\ConnectContext() ->withConnectTimeout($this->config->connectionTimeout); if ($this->config->tcpNoDelay) { From 21f186e0f550a2b4b776f338a9fb17315da339bc Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 20:14:08 +0300 Subject: [PATCH 03/10] Add AmqpConnection::ensureNotBlocked() method --- src/Exception/ConnectionIsBlocked.php | 23 +++++++++++++++++++ src/Internal/Io/AmqpConnection.php | 32 +++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 src/Exception/ConnectionIsBlocked.php diff --git a/src/Exception/ConnectionIsBlocked.php b/src/Exception/ConnectionIsBlocked.php new file mode 100644 index 0000000..27aa05c --- /dev/null +++ b/src/Exception/ConnectionIsBlocked.php @@ -0,0 +1,23 @@ +hooks; + + $blockedReason = &$this->blockedReason; + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionBlocked::class, + static function (Protocol\Frame\ConnectionBlocked $blocked) use (&$blockedReason): void { + $blockedReason = $blocked->reason; + }, + ); + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionUnblocked::class, + static function () use (&$blockedReason): void { + $blockedReason = null; + }, + ); + $reader = $this->reader; $isClosed = &$this->closed; @@ -129,6 +154,13 @@ public function heartbeat(int $interval): void }); } + public function ensureNotBlocked(): void + { + if ($this->blockedReason !== null) { + throw new ConnectionIsBlocked($this->blockedReason); + } + } + public function close(): void { if (!$this->socket->isClosed()) { From 8dd1096d2529ed204dfdac68bfd95e16cc05b7de Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 20:32:40 +0300 Subject: [PATCH 04/10] Use ensureNotBlocked in Channel::publish*() methods --- src/Channel.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Channel.php b/src/Channel.php index b25833c..40923f4 100644 --- a/src/Channel.php +++ b/src/Channel.php @@ -86,6 +86,8 @@ public function publish( bool $mandatory = false, bool $immediate = false, ): ?PublishConfirmation { + $this->connection->ensureNotBlocked(); + /** @var ?PublishConfirmation $confirmation */ $confirmation = null; @@ -110,6 +112,8 @@ public function publish( */ public function publishBatch(array $publishMessages): PublishBatchConfirmation { + $this->connection->ensureNotBlocked(); + /** @var list $confirmations */ $confirmations = []; From 5d19f1f7c4878862650b81a396ac7616c714d0f4 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Thu, 24 Jul 2025 22:52:25 +0300 Subject: [PATCH 05/10] Add rabbitmq_low_memory service to docker-compose.yaml --- docker-compose.yaml | 9 +++++++++ docker/rabbitmq/rabbitmq.conf | 1 + 2 files changed, 10 insertions(+) create mode 100644 docker/rabbitmq/rabbitmq.conf diff --git a/docker-compose.yaml b/docker-compose.yaml index 7bb1952..a24a7b0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,3 +14,12 @@ services: ports: - "127.0.0.1:5672:5672" - "127.0.0.1:15672:15672" + + rabbitmq_low_memory: + image: rabbitmq:4.0-management-alpine + restart: always + ports: + - "127.0.0.1:5673:5672" + - "127.0.0.1:15673:15672" + volumes: + - ./docker/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf diff --git a/docker/rabbitmq/rabbitmq.conf b/docker/rabbitmq/rabbitmq.conf new file mode 100644 index 0000000..76549a9 --- /dev/null +++ b/docker/rabbitmq/rabbitmq.conf @@ -0,0 +1 @@ +vm_memory_high_watermark.absolute = 130MiB From 0152edfd3665d6a58eae8a03df747bf2d4cc4ede Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Sat, 26 Jul 2025 22:36:06 +0300 Subject: [PATCH 06/10] Revert new w/o parentheses --- .php-cs-fixer.dist.php | 4 ++-- src/Internal/Io/AmqpConnectionFactory.php | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index 2221b43..8007ba7 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -7,7 +7,7 @@ use PhpCsFixer\Runner\Parallel\ParallelConfigFactory; use PHPyh\CodingStandard\PhpCsFixerCodingStandard; -$config = new Config() +$config = (new Config()) ->setFinder( Finder::create() ->in(__DIR__ . '/src') @@ -20,7 +20,7 @@ ->setParallelConfig(ParallelConfigFactory::detect()) ->setCacheFile(__DIR__ . '/var/' . basename(__FILE__) . '.cache'); -new PhpCsFixerCodingStandard()->applyTo($config, [ +(new PhpCsFixerCodingStandard())->applyTo($config, [ 'numeric_literal_separator' => false, ]); diff --git a/src/Internal/Io/AmqpConnectionFactory.php b/src/Internal/Io/AmqpConnectionFactory.php index 286740c..025a7ee 100644 --- a/src/Internal/Io/AmqpConnectionFactory.php +++ b/src/Internal/Io/AmqpConnectionFactory.php @@ -109,7 +109,7 @@ private function createConnection(): AmqpConnection */ private function createSocket(string $url): Socket\Socket { - $context = new Socket\ConnectContext() + $context = (new Socket\ConnectContext()) ->withConnectTimeout($this->config->connectionTimeout); if ($this->config->tcpNoDelay) { From 285f46a29fd1ecc4f0b798bcce3a61bd2532fdb0 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Sun, 27 Jul 2025 10:08:08 +0300 Subject: [PATCH 07/10] Rename docker/rabbitmq to docker/rabbitmq_low_memory --- docker-compose.yaml | 2 +- docker/{rabbitmq => rabbitmq_low_memory}/rabbitmq.conf | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename docker/{rabbitmq => rabbitmq_low_memory}/rabbitmq.conf (100%) diff --git a/docker-compose.yaml b/docker-compose.yaml index a24a7b0..cc8ed60 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -22,4 +22,4 @@ services: - "127.0.0.1:5673:5672" - "127.0.0.1:15673:15672" volumes: - - ./docker/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf + - ./docker/rabbitmq_low_memory/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf diff --git a/docker/rabbitmq/rabbitmq.conf b/docker/rabbitmq_low_memory/rabbitmq.conf similarity index 100% rename from docker/rabbitmq/rabbitmq.conf rename to docker/rabbitmq_low_memory/rabbitmq.conf From e6787186a6567986244ff7d1138552ccf29ae2c1 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Sun, 27 Jul 2025 10:28:05 +0300 Subject: [PATCH 08/10] Add EventDispatcher and ConnectionBlocked/ConnectionUnblocked events --- src/Event/ConnectionBlocked.php | 12 ++++++++ src/Event/ConnectionUnblocked.php | 7 +++++ src/EventDispatcher.php | 47 +++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 src/Event/ConnectionBlocked.php create mode 100644 src/Event/ConnectionUnblocked.php create mode 100644 src/EventDispatcher.php diff --git a/src/Event/ConnectionBlocked.php b/src/Event/ConnectionBlocked.php new file mode 100644 index 0000000..a0db315 --- /dev/null +++ b/src/Event/ConnectionBlocked.php @@ -0,0 +1,12 @@ +, non-empty-list> + */ + private array $listeners = []; + + /** + * @template TEvent of Events + * @param class-string|non-empty-list> $events + * @param callable(TEvent): void $listener + */ + public function listen(string|array $events, callable $listener): self + { + $dispatcher = clone $this; + + foreach ((array) $events as $event) { + /** @phpstan-ignore assign.propertyType */ + $dispatcher->listeners[$event][] = $listener; + } + + return $dispatcher; + } + + /** + * @param Events $event + */ + public function dispatch(object $event): void + { + foreach ($this->listeners[$event::class] ?? [] as $listener) { + /** @phpstan-ignore argument.type */ + EventLoop::queue($listener, $event); + } + } +} From 686a787983db87c36f28ea0f714fb6f80b6735e7 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Sun, 27 Jul 2025 10:40:29 +0300 Subject: [PATCH 09/10] Integrate EventDispatcher into connection --- src/Client.php | 8 +++++--- src/Internal/Io/AmqpConnection.php | 11 +++++++++-- src/Internal/Io/AmqpConnectionFactory.php | 8 +++++++- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/Client.php b/src/Client.php index f37971a..fe03cbc 100644 --- a/src/Client.php +++ b/src/Client.php @@ -31,14 +31,16 @@ final class Client public function __construct( public readonly Config $config, + public readonly EventDispatcher $eventDispatcher = new EventDispatcher(), ) { $properties = Properties::createDefault(); $this->hooks = new Hooks(); $this->connectionFactory = new AmqpConnectionFactory( - $this->config, - $properties, - $this->hooks, + config: $this->config, + properties: $properties, + hooks: $this->hooks, + eventDispatcher: $this->eventDispatcher, ); $this->channelFactory = new ChannelFactory( diff --git a/src/Internal/Io/AmqpConnection.php b/src/Internal/Io/AmqpConnection.php index 42e09de..6ebda30 100644 --- a/src/Internal/Io/AmqpConnection.php +++ b/src/Internal/Io/AmqpConnection.php @@ -8,6 +8,9 @@ use Amp\Socket\Socket; use Revolt\EventLoop; use Thesis\AmpBridge\ReaderWriter as AmpReaderWriter; +use Thesis\Amqp\Event\ConnectionBlocked; +use Thesis\Amqp\Event\ConnectionUnblocked; +use Thesis\Amqp\EventDispatcher; use Thesis\Amqp\Exception\ConnectionIsBlocked; use Thesis\Amqp\Exception\UnexpectedFrameReceived; use Thesis\Amqp\Internal\Hooks; @@ -40,6 +43,7 @@ final class AmqpConnection implements Writer public function __construct( private readonly Socket $socket, private readonly Hooks $hooks, + private readonly EventDispatcher $eventDispatcher, ) { $this->buffer = Buffer::empty(); $this->reader = new Protocol\Reader( @@ -101,22 +105,25 @@ public function write(string $bytes): void public function setup(): void { $hooks = $this->hooks; + $eventDispatcher = $this->eventDispatcher; $blockedReason = &$this->blockedReason; $hooks->anyOf( 0, Protocol\Frame\ConnectionBlocked::class, - static function (Protocol\Frame\ConnectionBlocked $blocked) use (&$blockedReason): void { + static function (Protocol\Frame\ConnectionBlocked $blocked) use (&$blockedReason, $eventDispatcher): void { $blockedReason = $blocked->reason; + $eventDispatcher->dispatch(new ConnectionBlocked($blockedReason)); }, ); $hooks->anyOf( 0, Protocol\Frame\ConnectionUnblocked::class, - static function () use (&$blockedReason): void { + static function () use (&$blockedReason, $eventDispatcher): void { $blockedReason = null; + $eventDispatcher->dispatch(new ConnectionUnblocked()); }, ); diff --git a/src/Internal/Io/AmqpConnectionFactory.php b/src/Internal/Io/AmqpConnectionFactory.php index 025a7ee..cf6a89b 100644 --- a/src/Internal/Io/AmqpConnectionFactory.php +++ b/src/Internal/Io/AmqpConnectionFactory.php @@ -7,6 +7,7 @@ use Amp\Cancellation; use Amp\Socket; use Thesis\Amqp\Config; +use Thesis\Amqp\EventDispatcher; use Thesis\Amqp\Exception; use Thesis\Amqp\Internal\Hooks; use Thesis\Amqp\Internal\Properties; @@ -24,6 +25,7 @@ public function __construct( private Config $config, private Properties $properties, private Hooks $hooks, + private EventDispatcher $eventDispatcher, ) {} public function connect(): AmqpConnection @@ -93,7 +95,11 @@ private function createConnection(): AmqpConnection foreach ($this->config->connectionUrls() as $url) { try { - return new AmqpConnection($this->createSocket($url), $this->hooks); + return new AmqpConnection( + socket: $this->createSocket($url), + hooks: $this->hooks, + eventDispatcher: $this->eventDispatcher, + ); } catch (\Throwable $e) { $exceptions[] = "{$url}: {$e->getMessage()}"; } From c7f73861aa75e691c25dd74f15df105e1d034397 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Sun, 27 Jul 2025 11:24:13 +0300 Subject: [PATCH 10/10] Add RabbitMQTest with skipped testConnectionBlockedUnblockedOnLowMemory() --- tests/RabbitMQTest.php | 63 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 tests/RabbitMQTest.php diff --git a/tests/RabbitMQTest.php b/tests/RabbitMQTest.php new file mode 100644 index 0000000..4b375aa --- /dev/null +++ b/tests/RabbitMQTest.php @@ -0,0 +1,63 @@ +listen(ConnectionBlocked::class, static function () use ($deferredBlocked): void { + $deferredBlocked->complete(); + }) + ->listen(ConnectionUnblocked::class, static function () use ($deferredUnblocked): void { + $deferredUnblocked->complete(); + }), + ); + $publishChannel = $publishClient->channel(); + + $fixClient = new Client($publishClient->config); + $fixChannel = $fixClient->channel(); + + $queue = $publishChannel->queueDeclare(autoDelete: true); + + $message = new Message(body: str_repeat('x', 1024 * 10)); + + try { + while (true) { + $publishChannel->publish($message, routingKey: $queue->name); + } + } catch (ConnectionIsBlocked) { + } + + $deferredBlocked->getFuture()->await(); + + $fixChannel->queueDelete($queue->name); + + $deferredUnblocked->getFuture()->await(); + } +}