diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index 8007ba7..2221b43 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -7,7 +7,7 @@ use PhpCsFixer\Runner\Parallel\ParallelConfigFactory; use PHPyh\CodingStandard\PhpCsFixerCodingStandard; -$config = (new Config()) +$config = new Config() ->setFinder( Finder::create() ->in(__DIR__ . '/src') @@ -20,7 +20,7 @@ ->setParallelConfig(ParallelConfigFactory::detect()) ->setCacheFile(__DIR__ . '/var/' . basename(__FILE__) . '.cache'); -(new PhpCsFixerCodingStandard())->applyTo($config, [ +new PhpCsFixerCodingStandard()->applyTo($config, [ 'numeric_literal_separator' => false, ]); diff --git a/docker-compose.yaml b/docker-compose.yaml index 7bb1952..a24a7b0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,3 +14,12 @@ services: ports: - "127.0.0.1:5672:5672" - "127.0.0.1:15672:15672" + + rabbitmq_low_memory: + image: rabbitmq:4.0-management-alpine + restart: always + ports: + - "127.0.0.1:5673:5672" + - "127.0.0.1:15673:15672" + volumes: + - ./docker/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf diff --git a/docker/rabbitmq/rabbitmq.conf b/docker/rabbitmq/rabbitmq.conf new file mode 100644 index 0000000..76549a9 --- /dev/null +++ b/docker/rabbitmq/rabbitmq.conf @@ -0,0 +1 @@ +vm_memory_high_watermark.absolute = 130MiB diff --git a/src/Channel.php b/src/Channel.php index b25833c..40923f4 100644 --- a/src/Channel.php +++ b/src/Channel.php @@ -86,6 +86,8 @@ public function publish( bool $mandatory = false, bool $immediate = false, ): ?PublishConfirmation { + $this->connection->ensureNotBlocked(); + /** @var ?PublishConfirmation $confirmation */ $confirmation = null; @@ -110,6 +112,8 @@ public function publish( */ public function publishBatch(array $publishMessages): PublishBatchConfirmation { + $this->connection->ensureNotBlocked(); + /** @var list $confirmations */ $confirmations = []; diff --git a/src/Exception/ConnectionIsBlocked.php b/src/Exception/ConnectionIsBlocked.php new file mode 100644 index 0000000..27aa05c --- /dev/null +++ b/src/Exception/ConnectionIsBlocked.php @@ -0,0 +1,23 @@ +buffer = Buffer::empty(); $this->reader = new Protocol\Reader( @@ -91,8 +98,28 @@ public function write(string $bytes): void $this->lastWrite = Amp\now(); } - public function ioLoop(Hooks $hooks): void + public function setup(): void { + $hooks = $this->hooks; + + $blockedReason = &$this->blockedReason; + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionBlocked::class, + static function (Protocol\Frame\ConnectionBlocked $blocked) use (&$blockedReason): void { + $blockedReason = $blocked->reason; + }, + ); + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionUnblocked::class, + static function () use (&$blockedReason): void { + $blockedReason = null; + }, + ); + $reader = $this->reader; $isClosed = &$this->closed; @@ -127,6 +154,13 @@ public function heartbeat(int $interval): void }); } + public function ensureNotBlocked(): void + { + if ($this->blockedReason !== null) { + throw new ConnectionIsBlocked($this->blockedReason); + } + } + public function close(): void { if (!$this->socket->isClosed()) { diff --git a/src/Internal/Io/AmqpConnectionFactory.php b/src/Internal/Io/AmqpConnectionFactory.php index 8685f17..286740c 100644 --- a/src/Internal/Io/AmqpConnectionFactory.php +++ b/src/Internal/Io/AmqpConnectionFactory.php @@ -61,7 +61,7 @@ public function connect(): AmqpConnection Frame\ConnectionOpenOk::class, ); - $connection->ioLoop($this->hooks); + $connection->setup(); $this->hooks->anyOf(0, Frame\ConnectionClose::class, static function () use ($connection): void { $connection->writeFrame(Protocol\Method::connectionCloseOk()); @@ -93,7 +93,7 @@ private function createConnection(): AmqpConnection foreach ($this->config->connectionUrls() as $url) { try { - return new AmqpConnection($this->createSocket($url)); + return new AmqpConnection($this->createSocket($url), $this->hooks); } catch (\Throwable $e) { $exceptions[] = "{$url}: {$e->getMessage()}"; } @@ -109,7 +109,7 @@ private function createConnection(): AmqpConnection */ private function createSocket(string $url): Socket\Socket { - $context = (new Socket\ConnectContext()) + $context = new Socket\ConnectContext() ->withConnectTimeout($this->config->connectionTimeout); if ($this->config->tcpNoDelay) { diff --git a/src/Internal/Protocol/Frame/ConnectionBlocked.php b/src/Internal/Protocol/Frame/ConnectionBlocked.php new file mode 100644 index 0000000..7c3758d --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionBlocked.php @@ -0,0 +1,28 @@ +readString()); + } + + public function write(Io\WriteBytes $writer): void + { + $writer->writeString($this->reason); + } +} diff --git a/src/Internal/Protocol/Frame/ConnectionUnblocked.php b/src/Internal/Protocol/Frame/ConnectionUnblocked.php new file mode 100644 index 0000000..f17f08c --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionUnblocked.php @@ -0,0 +1,23 @@ + Frame\ConnectionOpenOk::class, ClassMethod::CONNECTION_CLOSE => Frame\ConnectionClose::class, ClassMethod::CONNECTION_CLOSE_OK => Frame\ConnectionCloseOk::class, + ClassMethod::CONNECTION_BLOCKED => Frame\ConnectionBlocked::class, + ClassMethod::CONNECTION_UNBLOCKED => Frame\ConnectionUnblocked::class, ], ClassType::CHANNEL => [ ClassMethod::CHANNEL_OPEN_OK => Frame\ChannelOpenOkFrame::class,