diff --git a/src/ClusterWatcher.php b/src/ClusterWatcher.php index 14398d9..86a323f 100644 --- a/src/ClusterWatcher.php +++ b/src/ClusterWatcher.php @@ -36,6 +36,9 @@ final class ClusterWatcher use ForbidCloning; use ForbidSerialization; + /** + * The default worker shutdown timeout in seconds. + */ public const WORKER_TIMEOUT = 5; private readonly ContextFactory $contextFactory; @@ -120,8 +123,10 @@ public function getMessageIterator(): ConcurrentIterator /** * @param int $count Number of cluster workers to spawn. + * @param float|null $workerShutdownTimeout The maximum time to wait for a worker to shut down, in seconds, + * or null to wait indefinitely. */ - public function start(int $count): void + public function start(int $count, ?float $workerShutdownTimeout = ClusterWatcher::WORKER_TIMEOUT): void { if ($this->running || $this->queue->isComplete()) { throw new \Error("The cluster watcher is already running or has already run"); @@ -137,7 +142,7 @@ public function start(int $count): void try { for ($i = 0; $i < $count; ++$i) { $id = $this->nextId++; - $this->workers[$id] = $this->startWorker($id); + $this->workers[$id] = $this->startWorker($id, $workerShutdownTimeout); } } catch (\Throwable $exception) { $this->stop(); @@ -146,9 +151,11 @@ public function start(int $count): void } /** - * @param positive-int $id + * @param positive-int $id The worker ID. + * @param float|null $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, + * or null to wait indefinitely. */ - private function startWorker(int $id): ContextClusterWorker + private function startWorker(int $id, ?float $shutdownTimeout): ContextClusterWorker { $context = $this->contextFactory->start($this->script); @@ -203,12 +210,13 @@ private function startWorker(int $id): ContextClusterWorker $socket, $deferredCancellation, $id, + $shutdownTimeout, ): void { async($this->provider->provideFor(...), $socket, $deferredCancellation->getCancellation())->ignore(); try { try { - $worker->run(); + $worker->run($shutdownTimeout); $worker->info("Worker {$id} terminated cleanly" . ($this->running ? ", restarting..." : "")); @@ -230,7 +238,7 @@ private function startWorker(int $id): ContextClusterWorker } if ($this->running) { - $this->workers[$id] = $this->startWorker($this->nextId++); + $this->workers[$id] = $this->startWorker($this->nextId++, $shutdownTimeout); } } catch (\Throwable $exception) { $this->stop(); diff --git a/src/Internal/ContextClusterWorker.php b/src/Internal/ContextClusterWorker.php index ac1f07a..2aa5c87 100644 --- a/src/Internal/ContextClusterWorker.php +++ b/src/Internal/ContextClusterWorker.php @@ -4,7 +4,6 @@ use Amp\Cancellation; use Amp\CancelledException; -use Amp\Cluster\ClusterWatcher; use Amp\Cluster\ClusterWorker; use Amp\Cluster\ClusterWorkerMessage; use Amp\DeferredCancellation; @@ -72,7 +71,13 @@ public function send(mixed $data): void $this->context->send(new WatcherMessage(WatcherMessageType::Data, $data)); } - public function run(): void + /** + * Run the worker. + * + * @param float|null $shutdownTimeout The maximum time to wait for the worker to shut down, in seconds, + * or null to wait indefinitely. + */ + public function run(?float $shutdownTimeout): void { $watcher = EventLoop::repeat(self::PING_TIMEOUT / 2, weakClosure(function (): void { if ($this->lastActivity < \time() - self::PING_TIMEOUT) { @@ -112,10 +117,15 @@ public function run(): void } try { - $this->joinFuture->await(new TimeoutCancellation(ClusterWatcher::WORKER_TIMEOUT)); + if ($shutdownTimeout === null) { + $this->joinFuture->await(); + } else { + $this->joinFuture->await(new TimeoutCancellation($shutdownTimeout)); + } } catch (CancelledException) { $this->close(); - // Give it a second to reap the result. Generally this never should time out, unless something is seriously broken. + // Give it a second to reap the result. Generally this never should time out, unless something is + // seriously broken. $this->joinFuture->await(new TimeoutCancellation(1)); } } catch (\Throwable $exception) {