From 8fa92ff658e0a1647e7801fd456f9584151fe2b5 Mon Sep 17 00:00:00 2001 From: azjezz Date: Tue, 28 May 2024 22:43:07 +0100 Subject: [PATCH 1/4] feat: allow configuring worker shutdown timeout Signed-off-by: azjezz --- src/ClusterWatcher.php | 18 +++++++++++++----- src/Internal/ContextClusterWorker.php | 14 ++++++++++++-- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/ClusterWatcher.php b/src/ClusterWatcher.php index 14398d9..0e789a5 100644 --- a/src/ClusterWatcher.php +++ b/src/ClusterWatcher.php @@ -36,6 +36,9 @@ final class ClusterWatcher use ForbidCloning; use ForbidSerialization; + /** + * The default worker shutdown timeout in seconds. + */ public const WORKER_TIMEOUT = 5; private readonly ContextFactory $contextFactory; @@ -120,8 +123,10 @@ public function getMessageIterator(): ConcurrentIterator /** * @param int $count Number of cluster workers to spawn. + * @param null|int|float $workerShutdownTimeout The maximum time to wait for a worker to shut down, in seconds, + * or null to wait indefinitely. */ - public function start(int $count): void + public function start(int $count, null|int|float $workerShutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): void { if ($this->running || $this->queue->isComplete()) { throw new \Error("The cluster watcher is already running or has already run"); @@ -137,7 +142,7 @@ public function start(int $count): void try { for ($i = 0; $i < $count; ++$i) { $id = $this->nextId++; - $this->workers[$id] = $this->startWorker($id); + $this->workers[$id] = $this->startWorker($id, $workerShutdownTimeout); } } catch (\Throwable $exception) { $this->stop(); @@ -146,9 +151,11 @@ public function start(int $count): void } /** - * @param positive-int $id + * @param positive-int $id The worker ID. + * @param null|int|float $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, + * or null to wait indefinitely. */ - private function startWorker(int $id): ContextClusterWorker + private function startWorker(int $id, null|int|float $shutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): ContextClusterWorker { $context = $this->contextFactory->start($this->script); @@ -203,12 +210,13 @@ private function startWorker(int $id): ContextClusterWorker $socket, $deferredCancellation, $id, + $shutdownTimeout, ): void { async($this->provider->provideFor(...), $socket, $deferredCancellation->getCancellation())->ignore(); try { try { - $worker->run(); + $worker->run($shutdownTimeout); $worker->info("Worker {$id} terminated cleanly" . ($this->running ? ", restarting..." : "")); diff --git a/src/Internal/ContextClusterWorker.php b/src/Internal/ContextClusterWorker.php index 7e3bddb..eb1c81c 100644 --- a/src/Internal/ContextClusterWorker.php +++ b/src/Internal/ContextClusterWorker.php @@ -70,7 +70,13 @@ public function send(mixed $data): void $this->context->send(new WatcherMessage(WatcherMessageType::Data, $data)); } - public function run(): void + /** + * Run the worker. + * + * @param null|int|float $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, + * or null to wait indefinitely. + */ + public function run(null|int|float $shutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): void { $watcher = EventLoop::repeat(self::PING_TIMEOUT / 2, weakClosure(function (): void { if ($this->lastActivity < \time() - self::PING_TIMEOUT) { @@ -109,7 +115,11 @@ public function run(): void }; } - $this->joinFuture->await(new TimeoutCancellation(ClusterWatcher::WORKER_TIMEOUT)); + if ($shutdownTimeout === null) { + $this->joinFuture->await(); + } else { + $this->joinFuture->await(new TimeoutCancellation($shutdownTimeout)); + } } catch (\Throwable $exception) { $this->joinFuture->ignore(); throw $exception; From 51b6370b834bf431805f9669737ea544b5e2f99a Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 17 May 2026 09:47:33 -0500 Subject: [PATCH 2/4] Change parameter to float --- src/ClusterWatcher.php | 8 ++++---- src/Internal/ContextClusterWorker.php | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/ClusterWatcher.php b/src/ClusterWatcher.php index 0e789a5..b23e654 100644 --- a/src/ClusterWatcher.php +++ b/src/ClusterWatcher.php @@ -123,10 +123,10 @@ public function getMessageIterator(): ConcurrentIterator /** * @param int $count Number of cluster workers to spawn. - * @param null|int|float $workerShutdownTimeout The maximum time to wait for a worker to shut down, in seconds, + * @param float|null $workerShutdownTimeout The maximum time to wait for a worker to shut down, in seconds, * or null to wait indefinitely. */ - public function start(int $count, null|int|float $workerShutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): void + public function start(int $count, ?float $workerShutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): void { if ($this->running || $this->queue->isComplete()) { throw new \Error("The cluster watcher is already running or has already run"); @@ -152,10 +152,10 @@ public function start(int $count, null|int|float $workerShutdownTimeout = Cluste /** * @param positive-int $id The worker ID. - * @param null|int|float $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, + * @param float|null $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, * or null to wait indefinitely. */ - private function startWorker(int $id, null|int|float $shutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): ContextClusterWorker + private function startWorker(int $id, ?float $shutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): ContextClusterWorker { $context = $this->contextFactory->start($this->script); diff --git a/src/Internal/ContextClusterWorker.php b/src/Internal/ContextClusterWorker.php index f762327..efbf2bb 100644 --- a/src/Internal/ContextClusterWorker.php +++ b/src/Internal/ContextClusterWorker.php @@ -75,10 +75,10 @@ public function send(mixed $data): void /** * Run the worker. * - * @param null|int|float $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, + * @param float|null $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, * or null to wait indefinitely. */ - public function run(null|int|float $shutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): void + public function run(?float $shutdownTimeout): void { $watcher = EventLoop::repeat(self::PING_TIMEOUT / 2, weakClosure(function (): void { if ($this->lastActivity < \time() - self::PING_TIMEOUT) { @@ -125,7 +125,8 @@ public function run(null|int|float $shutdownTimeout = ClusterWatcher::WORKER_TIM } } catch (CancelledException) { $this->close(); - // Give it a second to reap the result. Generally this never should time out, unless something is seriously broken. + // Give it a second to reap the result. Generally this never should time out, unless something is + // seriously broken. $this->joinFuture->await(new TimeoutCancellation(1)); } } catch (\Throwable $exception) { From 6960040d5f7660b6ab47bc629a9f72c7db335a73 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 17 May 2026 09:49:22 -0500 Subject: [PATCH 3/4] Remove parameter default --- src/ClusterWatcher.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ClusterWatcher.php b/src/ClusterWatcher.php index b23e654..86a323f 100644 --- a/src/ClusterWatcher.php +++ b/src/ClusterWatcher.php @@ -155,7 +155,7 @@ public function start(int $count, ?float $workerShutdownTimeout = ClusterWatcher * @param float|null $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, * or null to wait indefinitely. */ - private function startWorker(int $id, ?float $shutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): ContextClusterWorker + private function startWorker(int $id, ?float $shutdownTimeout): ContextClusterWorker { $context = $this->contextFactory->start($this->script); @@ -238,7 +238,7 @@ private function startWorker(int $id, ?float $shutdownTimeout = ClusterWatcher:: } if ($this->running) { - $this->workers[$id] = $this->startWorker($this->nextId++); + $this->workers[$id] = $this->startWorker($this->nextId++, $shutdownTimeout); } } catch (\Throwable $exception) { $this->stop(); From 4ff29be32f023eba36666c1446cb9610ff5823ca Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 17 May 2026 09:51:03 -0500 Subject: [PATCH 4/4] Remove unused import --- src/Internal/ContextClusterWorker.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Internal/ContextClusterWorker.php b/src/Internal/ContextClusterWorker.php index efbf2bb..2aa5c87 100644 --- a/src/Internal/ContextClusterWorker.php +++ b/src/Internal/ContextClusterWorker.php @@ -4,7 +4,6 @@ use Amp\Cancellation; use Amp\CancelledException; -use Amp\Cluster\ClusterWatcher; use Amp\Cluster\ClusterWorker; use Amp\Cluster\ClusterWorkerMessage; use Amp\DeferredCancellation;