diff --git a/docker-compose.yaml b/docker-compose.yaml index 7bb1952..cc8ed60 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,3 +14,12 @@ services: ports: - "127.0.0.1:5672:5672" - "127.0.0.1:15672:15672" + + rabbitmq_low_memory: + image: rabbitmq:4.0-management-alpine + restart: always + ports: + - "127.0.0.1:5673:5672" + - "127.0.0.1:15673:15672" + volumes: + - ./docker/rabbitmq_low_memory/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf diff --git a/docker/rabbitmq_low_memory/rabbitmq.conf b/docker/rabbitmq_low_memory/rabbitmq.conf new file mode 100644 index 0000000..76549a9 --- /dev/null +++ b/docker/rabbitmq_low_memory/rabbitmq.conf @@ -0,0 +1 @@ +vm_memory_high_watermark.absolute = 130MiB diff --git a/src/Channel.php b/src/Channel.php index b25833c..40923f4 100644 --- a/src/Channel.php +++ b/src/Channel.php @@ -86,6 +86,8 @@ public function publish( bool $mandatory = false, bool $immediate = false, ): ?PublishConfirmation { + $this->connection->ensureNotBlocked(); + /** @var ?PublishConfirmation $confirmation */ $confirmation = null; @@ -110,6 +112,8 @@ public function publish( */ public function publishBatch(array $publishMessages): PublishBatchConfirmation { + $this->connection->ensureNotBlocked(); + /** @var list $confirmations */ $confirmations = []; diff --git a/src/Client.php b/src/Client.php index f37971a..fe03cbc 100644 --- a/src/Client.php +++ b/src/Client.php @@ -31,14 +31,16 @@ final class Client public function __construct( public readonly Config $config, + public readonly EventDispatcher $eventDispatcher = new EventDispatcher(), ) { $properties = Properties::createDefault(); $this->hooks = new Hooks(); $this->connectionFactory = new AmqpConnectionFactory( - $this->config, - $properties, - $this->hooks, + config: $this->config, + properties: $properties, + hooks: $this->hooks, + eventDispatcher: $this->eventDispatcher, ); $this->channelFactory = new ChannelFactory( diff --git a/src/Event/ConnectionBlocked.php b/src/Event/ConnectionBlocked.php new file mode 100644 index 0000000..a0db315 --- /dev/null +++ b/src/Event/ConnectionBlocked.php @@ -0,0 +1,12 @@ +, non-empty-list> + */ + private array $listeners = []; + + /** + * @template TEvent of Events + * @param class-string|non-empty-list> $events + * @param callable(TEvent): void $listener + */ + public function listen(string|array $events, callable $listener): self + { + $dispatcher = clone $this; + + foreach ((array) $events as $event) { + /** @phpstan-ignore assign.propertyType */ + $dispatcher->listeners[$event][] = $listener; + } + + return $dispatcher; + } + + /** + * @param Events $event + */ + public function dispatch(object $event): void + { + foreach ($this->listeners[$event::class] ?? [] as $listener) { + /** @phpstan-ignore argument.type */ + EventLoop::queue($listener, $event); + } + } +} diff --git a/src/Exception/ConnectionIsBlocked.php b/src/Exception/ConnectionIsBlocked.php new file mode 100644 index 0000000..27aa05c --- /dev/null +++ b/src/Exception/ConnectionIsBlocked.php @@ -0,0 +1,23 @@ +buffer = Buffer::empty(); $this->reader = new Protocol\Reader( @@ -91,8 +102,31 @@ public function write(string $bytes): void $this->lastWrite = Amp\now(); } - public function ioLoop(Hooks $hooks): void + public function setup(): void { + $hooks = $this->hooks; + $eventDispatcher = $this->eventDispatcher; + + $blockedReason = &$this->blockedReason; + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionBlocked::class, + static function (Protocol\Frame\ConnectionBlocked $blocked) use (&$blockedReason, $eventDispatcher): void { + $blockedReason = $blocked->reason; + $eventDispatcher->dispatch(new ConnectionBlocked($blockedReason)); + }, + ); + + $hooks->anyOf( + 0, + Protocol\Frame\ConnectionUnblocked::class, + static function () use (&$blockedReason, $eventDispatcher): void { + $blockedReason = null; + $eventDispatcher->dispatch(new ConnectionUnblocked()); + }, + ); + $reader = $this->reader; $isClosed = &$this->closed; @@ -127,6 +161,13 @@ public function heartbeat(int $interval): void }); } + public function ensureNotBlocked(): void + { + if ($this->blockedReason !== null) { + throw new ConnectionIsBlocked($this->blockedReason); + } + } + public function close(): void { if (!$this->socket->isClosed()) { diff --git a/src/Internal/Io/AmqpConnectionFactory.php b/src/Internal/Io/AmqpConnectionFactory.php index 8685f17..cf6a89b 100644 --- a/src/Internal/Io/AmqpConnectionFactory.php +++ b/src/Internal/Io/AmqpConnectionFactory.php @@ -7,6 +7,7 @@ use Amp\Cancellation; use Amp\Socket; use Thesis\Amqp\Config; +use Thesis\Amqp\EventDispatcher; use Thesis\Amqp\Exception; use Thesis\Amqp\Internal\Hooks; use Thesis\Amqp\Internal\Properties; @@ -24,6 +25,7 @@ public function __construct( private Config $config, private Properties $properties, private Hooks $hooks, + private EventDispatcher $eventDispatcher, ) {} public function connect(): AmqpConnection @@ -61,7 +63,7 @@ public function connect(): AmqpConnection Frame\ConnectionOpenOk::class, ); - $connection->ioLoop($this->hooks); + $connection->setup(); $this->hooks->anyOf(0, Frame\ConnectionClose::class, static function () use ($connection): void { $connection->writeFrame(Protocol\Method::connectionCloseOk()); @@ -93,7 +95,11 @@ private function createConnection(): AmqpConnection foreach ($this->config->connectionUrls() as $url) { try { - return new AmqpConnection($this->createSocket($url)); + return new AmqpConnection( + socket: $this->createSocket($url), + hooks: $this->hooks, + eventDispatcher: $this->eventDispatcher, + ); } catch (\Throwable $e) { $exceptions[] = "{$url}: {$e->getMessage()}"; } diff --git a/src/Internal/Protocol/Frame/ConnectionBlocked.php b/src/Internal/Protocol/Frame/ConnectionBlocked.php new file mode 100644 index 0000000..7c3758d --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionBlocked.php @@ -0,0 +1,28 @@ +readString()); + } + + public function write(Io\WriteBytes $writer): void + { + $writer->writeString($this->reason); + } +} diff --git a/src/Internal/Protocol/Frame/ConnectionUnblocked.php b/src/Internal/Protocol/Frame/ConnectionUnblocked.php new file mode 100644 index 0000000..f17f08c --- /dev/null +++ b/src/Internal/Protocol/Frame/ConnectionUnblocked.php @@ -0,0 +1,23 @@ + Frame\ConnectionOpenOk::class, ClassMethod::CONNECTION_CLOSE => Frame\ConnectionClose::class, ClassMethod::CONNECTION_CLOSE_OK => Frame\ConnectionCloseOk::class, + ClassMethod::CONNECTION_BLOCKED => Frame\ConnectionBlocked::class, + ClassMethod::CONNECTION_UNBLOCKED => Frame\ConnectionUnblocked::class, ], ClassType::CHANNEL => [ ClassMethod::CHANNEL_OPEN_OK => Frame\ChannelOpenOkFrame::class, diff --git a/tests/RabbitMQTest.php b/tests/RabbitMQTest.php new file mode 100644 index 0000000..4b375aa --- /dev/null +++ b/tests/RabbitMQTest.php @@ -0,0 +1,63 @@ +listen(ConnectionBlocked::class, static function () use ($deferredBlocked): void { + $deferredBlocked->complete(); + }) + ->listen(ConnectionUnblocked::class, static function () use ($deferredUnblocked): void { + $deferredUnblocked->complete(); + }), + ); + $publishChannel = $publishClient->channel(); + + $fixClient = new Client($publishClient->config); + $fixChannel = $fixClient->channel(); + + $queue = $publishChannel->queueDeclare(autoDelete: true); + + $message = new Message(body: str_repeat('x', 1024 * 10)); + + try { + while (true) { + $publishChannel->publish($message, routingKey: $queue->name); + } + } catch (ConnectionIsBlocked) { + } + + $deferredBlocked->getFuture()->await(); + + $fixChannel->queueDelete($queue->name); + + $deferredUnblocked->getFuture()->await(); + } +}