diff --git a/composer-require-check.json b/composer-require-check.json index 7410049..6b675ba 100644 --- a/composer-require-check.json +++ b/composer-require-check.json @@ -40,6 +40,7 @@ "SPL", "standard", "hash", - "pcntl" + "pcntl", + "posix" ] } diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php new file mode 100644 index 0000000..973e104 --- /dev/null +++ b/src/Context/ForkContext.php @@ -0,0 +1,164 @@ + + */ +final class ForkContext extends AbstractContext +{ + private const DEFAULT_START_TIMEOUT = 5; + + public static function isSupported(): bool + { + return \extension_loaded('pcntl') + && \extension_loaded('posix') + && \function_exists('pcntl_fork') // pcntl_fork may be disabled. + && EventLoop::getDriver()->getHandle() === null; + } + + /** + * @param string|non-empty-list $script Path to PHP script or array with first element as path and + * following elements options to the PHP script (e.g.: ['bin/worker.php', 'Option1Value', 'Option2Value']). + * @param positive-int $childConnectTimeout Number of seconds the child will attempt to connect to the parent + * before failing. + * + * @throws ContextException If starting the process fails. + */ + public static function start( + IpcHub $ipcHub, + string|array $script, + ?Cancellation $cancellation = null, + int $childConnectTimeout = self::DEFAULT_START_TIMEOUT, + Serializer $serializer = new NativeSerializer(), + ): self { + $key = $ipcHub->generateKey(); + + // Fork + if (($pid = \pcntl_fork()) < 0) { + throw new ContextException("Forking failed: " . \posix_strerror(\posix_get_last_error())); + } + + // Parent + if ($pid > 0) { + try { + $socket = $ipcHub->accept($key, $cancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); + + $socket = $ipcHub->accept($key, $cancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + $cancellation?->throwIfRequested(); + + throw new ContextException("Connecting failed after forking", previous: $exception); + } + + return new self($pid, $ipcChannel, $resultChannel); + } + + // Child + \define("AMP_CONTEXT", "fork"); + \define("AMP_CONTEXT_ID", \getmypid()); + + if (\is_string($script)) { + $script = [$script]; + } + + $connectCancellation = new TimeoutCancellation((float) $childConnectTimeout); + Internal\runContext($ipcHub->getUri(), $key, $connectCancellation, $script, $serializer); + + exit(0); + } + + private ?int $exited = null; + + private bool $weKilled = false; + + /** + * @param StreamChannel $ipcChannel + */ + private function __construct( + private readonly int $pid, + StreamChannel $ipcChannel, + StreamChannel $resultChannel, + ) { + parent::__construct($ipcChannel, $resultChannel); + } + + public function receive(?Cancellation $cancellation = null): mixed + { + $this->checkExit(false); // Will throw if the process exited unexpectedly. + + return parent::receive($cancellation); + } + + public function send(mixed $data): void + { + $this->checkExit(false); // Will throw if the process exited unexpectedly. + + parent::send($data); + } + + private function checkExit(bool $wait): ?int + { + if ($this->exited === null) { + if (\pcntl_waitpid($this->pid, $status, $wait ? 0 : \WNOHANG) === 0) { + return null; + } + + $this->exited = match (true) { + \pcntl_wifsignaled($status) => \pcntl_wtermsig($status), + \pcntl_wifexited($status) => \pcntl_wexitstatus($status) - 128, + \pcntl_wifstopped($status) => \pcntl_wstopsig($status), + default => -1, + }; + } + + if (!$this->weKilled && $this->exited > 0) { + throw new ContextException("Worker exited due to signal {$this->exited}", $this->exited); + } + + return $this->exited; + } + + public function close(): void + { + if ($this->checkExit(false) === null) { + $this->weKilled = true; + \posix_kill($this->pid, \SIGKILL); + + $this->checkExit(true); + } + + parent::close(); + } + + public function join(?Cancellation $cancellation = null): mixed + { + $result = $this->receiveExitResult($cancellation); + + return $result->getResult(); + } +} diff --git a/src/Context/ForkContextFactory.php b/src/Context/ForkContextFactory.php new file mode 100644 index 0000000..d6665d6 --- /dev/null +++ b/src/Context/ForkContextFactory.php @@ -0,0 +1,49 @@ + $script + * + * @throws ContextException + */ + public function start(string|array $script, ?Cancellation $cancellation = null): ForkContext + { + return ForkContext::start( + ipcHub: $this->ipcHub, + script: $script, + cancellation: $cancellation, + childConnectTimeout: $this->childConnectTimeout, + ); + } +} diff --git a/src/Context/Internal/AbstractContext.php b/src/Context/Internal/AbstractContext.php index 9044154..65b7c26 100644 --- a/src/Context/Internal/AbstractContext.php +++ b/src/Context/Internal/AbstractContext.php @@ -42,7 +42,7 @@ public function receive(?Cancellation $cancellation = null): mixed $this->ipcChannel->close(); throw new ContextException( - "The context stopped responding, potentially due to a fatal error or calling exit", + "The context stopped responding during receive, potentially due to a fatal error or calling exit", previous: $exception, ); } @@ -74,7 +74,7 @@ public function send(mixed $data): void $this->ipcChannel->close(); throw new ContextException( - "The context stopped responding, potentially due to a fatal error or calling exit", + "The context stopped responding during send, potentially due to a fatal error or calling exit", previous: $exception, ); } diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index 60ddd90..4e98659 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -6,82 +6,88 @@ use Amp\Cancellation; use Amp\Future; use Amp\Parallel\Ipc; +use Amp\Serialization\NativeSerializer; use Amp\Serialization\SerializationException; +use Amp\Serialization\Serializer; use Amp\Sync\ChannelException; -use Revolt\EventLoop; /** @internal */ -function runContext(string $uri, string $key, Cancellation $connectCancellation, array $argv): void -{ - EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation): void { - /** @noinspection PhpUnusedLocalVariableInspection */ - $argc = \count($argv); +function runContext( + string $uri, + string $key, + Cancellation $connectCancellation, + array $argv, + Serializer $serializer = new NativeSerializer(), +): void { + /** @noinspection PhpUnusedLocalVariableInspection */ + $argc = \count($argv); - try { - $socket = Ipc\connect($uri, $key, $connectCancellation); - $ipcChannel = new StreamChannel($socket, $socket); + try { + $socket = Ipc\connect($uri, $key, $connectCancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); + + $socket = Ipc\connect($uri, $key, $connectCancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + \file_put_contents('php://stderr', $exception->getMessage(), \FILE_APPEND); + exit(255); + } - $socket = Ipc\connect($uri, $key, $connectCancellation); - $resultChannel = new StreamChannel($socket, $socket); - } catch (\Throwable $exception) { - \file_put_contents('php://stderr', $exception->getMessage(), \FILE_APPEND); - exit(255); + try { + if (!isset($argv[0])) { + throw new \Error("No script path given"); } - try { - if (!isset($argv[0])) { - throw new \Error("No script path given"); - } + if (!\is_file($argv[0])) { + throw new \Error(\sprintf( + "No script found at '%s' (be sure to provide the full path to the script)", + $argv[0], + )); + } - if (!\is_file($argv[0])) { - throw new \Error(\sprintf( - "No script found at '%s' (be sure to provide the full path to the script)", - $argv[0], - )); - } + try { + // Protect current scope by requiring script within another function. + // Using $argc, so it is available to the required script. + $callable = (function () use ($argc, $argv): callable { + /** @psalm-suppress UnresolvableInclude */ + return require $argv[0]; + })(); + } catch (\TypeError $exception) { + throw new \Error(\sprintf( + "Script '%s' did not return a callable function: %s", + $argv[0], + $exception->getMessage(), + ), 0, $exception); + } catch (\ParseError $exception) { + throw new \Error(\sprintf( + "Script '%s' contains a parse error: %s", + $argv[0], + $exception->getMessage(), + ), 0, $exception); + } - try { - // Protect current scope by requiring script within another function. - // Using $argc, so it is available to the required script. - $callable = (function () use ($argc, $argv): callable { - /** @psalm-suppress UnresolvableInclude */ - return require $argv[0]; - })(); - } catch (\TypeError $exception) { - throw new \Error(\sprintf( - "Script '%s' did not return a callable function: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } catch (\ParseError $exception) { - throw new \Error(\sprintf( - "Script '%s' contains a parse error: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } + $returnValue = $callable(new ContextChannel($ipcChannel)); + $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); + } catch (\Throwable $exception) { + $result = new ExitFailure($exception); + } - $returnValue = $callable(new ContextChannel($ipcChannel)); - $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); - } catch (\Throwable $exception) { - $result = new ExitFailure($exception); - } + $ipcChannel->close(); + try { try { - try { - $resultChannel->send($result); - } catch (SerializationException $exception) { - // Serializing the result failed. Send the reason why. - $resultChannel->send(new ExitFailure($exception)); - } - } catch (ChannelException) { - // The parent may have already closed the channel after reading - // the result (e.g. during shutdown). Nothing left to do. - } catch (\Throwable $exception) { - \file_put_contents('php://stderr', $exception->getMessage(), \FILE_APPEND); - exit(255); + $resultChannel->send($result); + } catch (SerializationException $exception) { + // Serializing the result failed. Send the reason why. + $resultChannel->send(new ExitFailure($exception)); + } finally { + $resultChannel->close(); } - }); - - EventLoop::run(); + } catch (ChannelException) { + // The parent may have already closed the channel after reading + // the result (e.g. during shutdown). Nothing left to do. + } catch (\Throwable $exception) { + \file_put_contents('php://stderr', $exception->getMessage(), \FILE_APPEND); + exit(255); + } } diff --git a/src/Context/Internal/process-runner.php b/src/Context/Internal/process-runner.php index 2372e0c..eed24b2 100644 --- a/src/Context/Internal/process-runner.php +++ b/src/Context/Internal/process-runner.php @@ -87,5 +87,7 @@ exit(255); } - runContext($uri, $key, $cancellation, $argv); + EventLoop::queue(runContext(...), $uri, $key, $cancellation, $argv); + + EventLoop::run(); })(); diff --git a/src/Context/ProcessContext.php b/src/Context/ProcessContext.php index 8b303a0..65f34b2 100644 --- a/src/Context/ProcessContext.php +++ b/src/Context/ProcessContext.php @@ -282,12 +282,12 @@ public function __destruct() */ public function join(?Cancellation $cancellation = null): mixed { - $data = $this->receiveExitResult($cancellation); + $result = $this->receiveExitResult($cancellation); $code = $this->process->join(); try { - return $data->getResult(); + return $result->getResult(); } finally { if ($code !== 0) { // If an ExitFailure throws above, the exception will be automatically attached as the previous diff --git a/src/Context/ThreadContext.php b/src/Context/ThreadContext.php index adccf2a..004703b 100644 --- a/src/Context/ThreadContext.php +++ b/src/Context/ThreadContext.php @@ -104,7 +104,9 @@ public static function start( // such as select() will not be interrupted. })); - Internal\runContext($uri, $key, new TimeoutCancellation($connectTimeout), $argv); + EventLoop::queue(Internal\runContext(...), $uri, $key, new TimeoutCancellation($connectTimeout), $argv); + + EventLoop::run(); return 0; // @codeCoverageIgnoreEnd @@ -207,10 +209,8 @@ public function close(): void public function join(?Cancellation $cancellation = null): mixed { - $data = $this->receiveExitResult($cancellation); - - $this->close(); + $result = $this->receiveExitResult($cancellation); - return $data->getResult(); + return $result->getResult(); } } diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php new file mode 100644 index 0000000..ea51f88 --- /dev/null +++ b/test/Context/ForkContextTest.php @@ -0,0 +1,23 @@ +markTestSkipped('Not supported on the current platform/driver'); + } + + return (new ForkContextFactory())->start($script); + } +}