From 8ac48fac0d20e4458060179b60bf6572d0f71863 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 1 Mar 2025 09:36:05 -0600 Subject: [PATCH 01/18] Add ForkContext --- composer-require-check.json | 3 +- src/Context/ForkContext.php | 122 +++++++++++++++++++++++++++++ src/Context/ForkContextFactory.php | 49 ++++++++++++ src/Context/Internal/functions.php | 17 ++-- 4 files changed, 185 insertions(+), 6 deletions(-) create mode 100644 src/Context/ForkContext.php create mode 100644 src/Context/ForkContextFactory.php diff --git a/composer-require-check.json b/composer-require-check.json index 0d0c4fc..621bf16 100644 --- a/composer-require-check.json +++ b/composer-require-check.json @@ -39,6 +39,7 @@ "SPL", "standard", "hash", - "pcntl" + "pcntl", + "posix" ] } diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php new file mode 100644 index 0000000..76e9c76 --- /dev/null +++ b/src/Context/ForkContext.php @@ -0,0 +1,122 @@ + + */ +final class ForkContext extends AbstractContext +{ + private const DEFAULT_START_TIMEOUT = 5; + + /** + * @param string|non-empty-list $script Path to PHP script or array with first element as path and + * following elements options to the PHP script (e.g.: ['bin/worker.php', 'Option1Value', 'Option2Value']). + * @param positive-int $childConnectTimeout Number of seconds the child will attempt to connect to the parent + * before failing. + * + * @throws ContextException If starting the process fails. + */ + public static function start( + IpcHub $ipcHub, + string|array $script, + ?Cancellation $cancellation = null, + int $childConnectTimeout = self::DEFAULT_START_TIMEOUT, + Serializer $serializer = new NativeSerializer(), + ): self { + $key = $ipcHub->generateKey(); + + // Fork + if (($pid = \pcntl_fork()) < 0) { + throw new ContextException("Forking failed: " . \posix_strerror(\posix_get_last_error())); + } + + // Parent + if ($pid > 0) { + try { + $socket = $ipcHub->accept($key, $cancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); + + $socket = $ipcHub->accept($key, $cancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + $cancellation?->throwIfRequested(); + + throw new ContextException("Connecting failed after forking", previous: $exception); + } + + return new self($pid, $ipcChannel, $resultChannel); + } + + // Child + \define("AMP_CONTEXT", "parallel"); + \define("AMP_CONTEXT_ID", \getmypid()); + + if (\is_string($script)) { + $script = [$script]; + } + + $connectCancellation = new TimeoutCancellation((float) $childConnectTimeout); + Internal\runContext($ipcHub->getUri(), $key, $connectCancellation, $script, $serializer); + + exit(0); + } + + private bool $exited = false; + + /** + * @param StreamChannel $ipcChannel + */ + private function __construct( + private readonly int $pid, + StreamChannel $ipcChannel, + StreamChannel $resultChannel, + ) { + parent::__construct($ipcChannel, $resultChannel); + } + + public function __destruct() + { + $this->close(); + } + + public function close(): void + { + if (!$this->exited) { + \posix_kill($this->pid, \SIGKILL); + $this->exited = true; + } + + parent::close(); + } + + public function join(?Cancellation $cancellation = null): mixed + { + $data = $this->receiveExitResult($cancellation); + + $this->close(); + + return $data->getResult(); + } +} diff --git a/src/Context/ForkContextFactory.php b/src/Context/ForkContextFactory.php new file mode 100644 index 0000000..d6665d6 --- /dev/null +++ b/src/Context/ForkContextFactory.php @@ -0,0 +1,49 @@ + $script + * + * @throws ContextException + */ + public function start(string|array $script, ?Cancellation $cancellation = null): ForkContext + { + return ForkContext::start( + ipcHub: $this->ipcHub, + script: $script, + cancellation: $cancellation, + childConnectTimeout: $this->childConnectTimeout, + ); + } +} diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index e0f130d..c31bc28 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -6,22 +6,29 @@ use Amp\Cancellation; use Amp\Future; use Amp\Parallel\Ipc; +use Amp\Serialization\NativeSerializer; use Amp\Serialization\SerializationException; +use Amp\Serialization\Serializer; use Revolt\EventLoop; /** @internal */ -function runContext(string $uri, string $key, Cancellation $connectCancellation, array $argv): void -{ - EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation): void { +function runContext( + string $uri, + string $key, + Cancellation $connectCancellation, + array $argv, + Serializer $serializer = new NativeSerializer(), +): void { + EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation, $serializer): void { /** @noinspection PhpUnusedLocalVariableInspection */ $argc = \count($argv); try { $socket = Ipc\connect($uri, $key, $connectCancellation); - $ipcChannel = new StreamChannel($socket, $socket); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); $socket = Ipc\connect($uri, $key, $connectCancellation); - $resultChannel = new StreamChannel($socket, $socket); + $resultChannel = new StreamChannel($socket, $socket, $serializer); } catch (\Throwable $exception) { \trigger_error($exception->getMessage(), E_USER_ERROR); } From 31d8b70286542a1fe1750218f4ba588404614704 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 1 Mar 2025 14:48:49 -0600 Subject: [PATCH 02/18] Updates --- src/Context/ForkContext.php | 54 +++++++++-- src/Context/Internal/functions.php | 115 ++++++++++++------------ src/Context/Internal/process-runner.php | 4 +- src/Context/ThreadContext.php | 4 +- 4 files changed, 109 insertions(+), 68 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 76e9c76..e73ac7d 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -12,7 +12,7 @@ /** * USE AT YOUR OWN RISK! This context is not used by default in {@see DefaultContextFactory} because the timing of its - * use must be purposeful and situational. + * creation must be purposeful and situational. * * Forking is not recommended at arbitrary points in an application since the entire state of the parent process is * inherited into the child process, including the event-loop! @@ -83,7 +83,9 @@ public static function start( exit(0); } - private bool $exited = false; + private ?int $exited = null; + + private bool $weKilled = false; /** * @param StreamChannel $ipcChannel @@ -101,11 +103,49 @@ public function __destruct() $this->close(); } + public function receive(?Cancellation $cancellation = null): mixed + { + $this->checkExit(false); + + return parent::receive($cancellation); + } + + public function send(mixed $data): void + { + $this->checkExit(false); + + parent::send($data); + } + + private function checkExit(bool $wait): ?int + { + if ($this->exited === null) { + if (\pcntl_waitpid($this->pid, $status, $wait ? 0 : \WNOHANG) === 0) { + return null; + } + + $this->exited = match (true) { + \pcntl_wifsignaled($status) => \pcntl_wtermsig($status), + \pcntl_wifexited($status) => \pcntl_wexitstatus($status) - 128, + \pcntl_wifstopped($status) => \pcntl_wstopsig($status), + default => -1, + }; + } + + if (!$this->weKilled && $this->exited > 0) { + throw new ContextException("Worker exited due to signal {$this->exited}", $this->exited); + } + + return $this->exited; + } + public function close(): void { if (!$this->exited) { + $this->weKilled = true; \posix_kill($this->pid, \SIGKILL); - $this->exited = true; + + $this->checkExit(true); } parent::close(); @@ -113,9 +153,11 @@ public function close(): void public function join(?Cancellation $cancellation = null): mixed { - $data = $this->receiveExitResult($cancellation); - - $this->close(); + try { + $data = $this->receiveExitResult($cancellation); + } finally { + $this->close(); + } return $data->getResult(); } diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index c31bc28..7dcaf4d 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -9,7 +9,6 @@ use Amp\Serialization\NativeSerializer; use Amp\Serialization\SerializationException; use Amp\Serialization\Serializer; -use Revolt\EventLoop; /** @internal */ function runContext( @@ -19,73 +18,69 @@ function runContext( array $argv, Serializer $serializer = new NativeSerializer(), ): void { - EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation, $serializer): void { - /** @noinspection PhpUnusedLocalVariableInspection */ - $argc = \count($argv); + /** @noinspection PhpUnusedLocalVariableInspection */ + $argc = \count($argv); - try { - $socket = Ipc\connect($uri, $key, $connectCancellation); - $ipcChannel = new StreamChannel($socket, $socket, $serializer); - - $socket = Ipc\connect($uri, $key, $connectCancellation); - $resultChannel = new StreamChannel($socket, $socket, $serializer); - } catch (\Throwable $exception) { - \trigger_error($exception->getMessage(), E_USER_ERROR); - } - - try { - if (!isset($argv[0])) { - throw new \Error("No script path given"); - } + try { + $socket = Ipc\connect($uri, $key, $connectCancellation); + $ipcChannel = new StreamChannel($socket, $socket, $serializer); - if (!\is_file($argv[0])) { - throw new \Error(\sprintf( - "No script found at '%s' (be sure to provide the full path to the script)", - $argv[0], - )); - } + $socket = Ipc\connect($uri, $key, $connectCancellation); + $resultChannel = new StreamChannel($socket, $socket, $serializer); + } catch (\Throwable $exception) { + \trigger_error($exception->getMessage(), E_USER_ERROR); + } - try { - // Protect current scope by requiring script within another function. - // Using $argc, so it is available to the required script. - $callable = (function () use ($argc, $argv): callable { - /** @psalm-suppress UnresolvableInclude */ - return require $argv[0]; - })(); - } catch (\TypeError $exception) { - throw new \Error(\sprintf( - "Script '%s' did not return a callable function: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } catch (\ParseError $exception) { - throw new \Error(\sprintf( - "Script '%s' contains a parse error: %s", - $argv[0], - $exception->getMessage(), - ), 0, $exception); - } + try { + if (!isset($argv[0])) { + throw new \Error("No script path given"); + } - $returnValue = $callable(new ContextChannel($ipcChannel)); - $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); - } catch (\Throwable $exception) { - $result = new ExitFailure($exception); + if (!\is_file($argv[0])) { + throw new \Error(\sprintf( + "No script found at '%s' (be sure to provide the full path to the script)", + $argv[0], + )); } try { - try { - $resultChannel->send($result); - } catch (SerializationException $exception) { - // Serializing the result failed. Send the reason why. - $resultChannel->send(new ExitFailure($exception)); - } - } catch (\Throwable $exception) { - \trigger_error(\sprintf( - "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + // Protect current scope by requiring script within another function. + // Using $argc, so it is available to the required script. + $callable = (function () use ($argc, $argv): callable { + /** @psalm-suppress UnresolvableInclude */ + return require $argv[0]; + })(); + } catch (\TypeError $exception) { + throw new \Error(\sprintf( + "Script '%s' did not return a callable function: %s", + $argv[0], + $exception->getMessage(), + ), 0, $exception); + } catch (\ParseError $exception) { + throw new \Error(\sprintf( + "Script '%s' contains a parse error: %s", + $argv[0], $exception->getMessage(), - ), E_USER_ERROR); + ), 0, $exception); } - }); - EventLoop::run(); + $returnValue = $callable(new ContextChannel($ipcChannel)); + $result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue); + } catch (\Throwable $exception) { + $result = new ExitFailure($exception); + } + + try { + try { + $resultChannel->send($result); + } catch (SerializationException $exception) { + // Serializing the result failed. Send the reason why. + $resultChannel->send(new ExitFailure($exception)); + } + } catch (\Throwable $exception) { + \trigger_error(\sprintf( + "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + $exception->getMessage(), + ), E_USER_ERROR); + } } diff --git a/src/Context/Internal/process-runner.php b/src/Context/Internal/process-runner.php index d10a60f..ae6f94d 100644 --- a/src/Context/Internal/process-runner.php +++ b/src/Context/Internal/process-runner.php @@ -85,5 +85,7 @@ \trigger_error($exception->getMessage(), E_USER_ERROR); } - runContext($uri, $key, $cancellation, $argv); + EventLoop::queue(runContext(...), $uri, $key, $cancellation, $argv); + + EventLoop::run(); })(); diff --git a/src/Context/ThreadContext.php b/src/Context/ThreadContext.php index ef3544d..6f942a4 100644 --- a/src/Context/ThreadContext.php +++ b/src/Context/ThreadContext.php @@ -104,7 +104,9 @@ public static function start( // such as select() will not be interrupted. })); - Internal\runContext($uri, $key, new TimeoutCancellation($connectTimeout), $argv); + EventLoop::queue(Internal\runContext(...), $uri, $key, new TimeoutCancellation($connectTimeout), $argv); + + EventLoop::run(); return 0; // @codeCoverageIgnoreEnd From 03a5cddb84164a57c609fb22b3f8350ee247cff3 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 1 Mar 2025 14:50:55 -0600 Subject: [PATCH 03/18] Fix --- src/Context/ForkContext.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index e73ac7d..465c8ce 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -141,7 +141,7 @@ private function checkExit(bool $wait): ?int public function close(): void { - if (!$this->exited) { + if ($this->checkExit(false) === null) { $this->weKilled = true; \posix_kill($this->pid, \SIGKILL); From ffedc390043c40facd101e69acd59474acf9e653 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 7 Mar 2025 17:39:50 +0100 Subject: [PATCH 04/18] Fixup tests --- src/Context/ForkContext.php | 13 ++++++++----- test/Context/ForkContextTest.php | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) create mode 100644 test/Context/ForkContextTest.php diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 465c8ce..8859113 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -30,6 +30,11 @@ final class ForkContext extends AbstractContext { private const DEFAULT_START_TIMEOUT = 5; + public static function isSupported(): bool + { + return \function_exists('pcntl_fork'); + } + /** * @param string|non-empty-list $script Path to PHP script or array with first element as path and * following elements options to the PHP script (e.g.: ['bin/worker.php', 'Option1Value', 'Option2Value']). @@ -153,11 +158,9 @@ public function close(): void public function join(?Cancellation $cancellation = null): mixed { - try { - $data = $this->receiveExitResult($cancellation); - } finally { - $this->close(); - } + $data = $this->receiveExitResult($cancellation); + + $this->close(); return $data->getResult(); } diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php new file mode 100644 index 0000000..da80077 --- /dev/null +++ b/test/Context/ForkContextTest.php @@ -0,0 +1,30 @@ +markTestSkipped('pcntl_fork required'); + } + + return (new ForkContextFactory())->start($script); + } + + public function testThrowingProcessOnReceive(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } + + public function testThrowingProcessOnSend(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } +} From 981895dd9e2122e73725f5792ce155698ce650a5 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 10 Mar 2025 13:05:23 +0100 Subject: [PATCH 05/18] Fixup --- src/Context/ForkContext.php | 5 ++++- test/Context/ForkContextTest.php | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 8859113..fdf7f73 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -9,6 +9,8 @@ use Amp\Serialization\NativeSerializer; use Amp\Serialization\Serializer; use Amp\TimeoutCancellation; +use Revolt\EventLoop; +use Revolt\EventLoop\Driver\UvDriver; /** * USE AT YOUR OWN RISK! This context is not used by default in {@see DefaultContextFactory} because the timing of its @@ -32,7 +34,8 @@ final class ForkContext extends AbstractContext public static function isSupported(): bool { - return \function_exists('pcntl_fork'); + return \function_exists('pcntl_fork') + && !EventLoop::getDriver() instanceof UvDriver; } /** diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php index da80077..c662b88 100644 --- a/test/Context/ForkContextTest.php +++ b/test/Context/ForkContextTest.php @@ -3,14 +3,15 @@ namespace Amp\Parallel\Test\Context; use Amp\Parallel\Context\Context; +use Amp\Parallel\Context\ForkContext; use Amp\Parallel\Context\ForkContextFactory; class ForkContextTest extends AbstractContextTest { public function createContext(string|array $script): Context { - if (!\function_exists('pcntl_fork')) { - $this->markTestSkipped('pcntl_fork required'); + if (!ForkContext::isSupported()) { + $this->markTestSkipped('Not supported on the current platform/driver'); } return (new ForkContextFactory())->start($script); From ec3d242b7c2f91221311d3adfcaaa0c0f21ccb84 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 22 Mar 2025 00:33:51 +0100 Subject: [PATCH 06/18] Fork context (#214) * Skip another test * cs-fix --- test/Context/ForkContextTest.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php index c662b88..47604a1 100644 --- a/test/Context/ForkContextTest.php +++ b/test/Context/ForkContextTest.php @@ -28,4 +28,10 @@ public function testThrowingProcessOnSend(): void // tmp $this->expectNotToPerformAssertions(); } + + public function testImmediateJoin(): void + { + // tmp + $this->expectNotToPerformAssertions(); + } } From e8207c4799e03be1ac0bc8b7390efa6299ee27c8 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 20 Apr 2025 10:10:10 -0500 Subject: [PATCH 07/18] Fix copy/pasta mistake --- src/Context/ForkContext.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index fdf7f73..0a2f1b3 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -78,7 +78,7 @@ public static function start( } // Child - \define("AMP_CONTEXT", "parallel"); + \define("AMP_CONTEXT", "fork"); \define("AMP_CONTEXT_ID", \getmypid()); if (\is_string($script)) { From 141c62935e85bcd89dcca9a23d79a87b6c34b8f7 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 20 Apr 2025 10:13:48 -0500 Subject: [PATCH 08/18] Look for StreamSelectLoop --- src/Context/ForkContext.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 0a2f1b3..8d7b918 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -9,6 +9,7 @@ use Amp\Serialization\NativeSerializer; use Amp\Serialization\Serializer; use Amp\TimeoutCancellation; +use React\EventLoop\StreamSelectLoop; use Revolt\EventLoop; use Revolt\EventLoop\Driver\UvDriver; @@ -34,8 +35,7 @@ final class ForkContext extends AbstractContext public static function isSupported(): bool { - return \function_exists('pcntl_fork') - && !EventLoop::getDriver() instanceof UvDriver; + return \function_exists('pcntl_fork') && EventLoop::getDriver() instanceof StreamSelectLoop; } /** From a538c04b86e03e50bb0c04328a9e683d3d6c5c9b Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 20 Apr 2025 10:35:50 -0500 Subject: [PATCH 09/18] Unused import --- src/Context/ForkContext.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 8d7b918..b678f9a 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -11,7 +11,6 @@ use Amp\TimeoutCancellation; use React\EventLoop\StreamSelectLoop; use Revolt\EventLoop; -use Revolt\EventLoop\Driver\UvDriver; /** * USE AT YOUR OWN RISK! This context is not used by default in {@see DefaultContextFactory} because the timing of its From 8a606f852932ce09766b4e12e52f13a7d4b07393 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 09:57:25 -0500 Subject: [PATCH 10/18] Fix loop type check --- src/Context/ForkContext.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index b678f9a..15ca8f7 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -9,7 +9,6 @@ use Amp\Serialization\NativeSerializer; use Amp\Serialization\Serializer; use Amp\TimeoutCancellation; -use React\EventLoop\StreamSelectLoop; use Revolt\EventLoop; /** @@ -34,7 +33,7 @@ final class ForkContext extends AbstractContext public static function isSupported(): bool { - return \function_exists('pcntl_fork') && EventLoop::getDriver() instanceof StreamSelectLoop; + return \function_exists('pcntl_fork') && EventLoop::getDriver()->getHandle() === null; } /** From 125ac6ff4781afdefc20b95ba01def9f7bb050aa Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 10:17:50 -0500 Subject: [PATCH 11/18] Style fix after merge --- src/Context/Internal/functions.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index 750ab09..80fbbd0 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -8,8 +8,8 @@ use Amp\Parallel\Ipc; use Amp\Serialization\NativeSerializer; use Amp\Serialization\SerializationException; -use Amp\Sync\ChannelException; use Amp\Serialization\Serializer; +use Amp\Sync\ChannelException; /** @internal */ function runContext( From 45e791bc72b66848bb5c43ba2e48536e7a858559 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 10:32:46 -0500 Subject: [PATCH 12/18] Close IPC channel immediately after callback finishes executing --- src/Context/Internal/functions.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php index 80fbbd0..4e98659 100644 --- a/src/Context/Internal/functions.php +++ b/src/Context/Internal/functions.php @@ -72,12 +72,16 @@ function runContext( $result = new ExitFailure($exception); } + $ipcChannel->close(); + try { try { $resultChannel->send($result); } catch (SerializationException $exception) { // Serializing the result failed. Send the reason why. $resultChannel->send(new ExitFailure($exception)); + } finally { + $resultChannel->close(); } } catch (ChannelException) { // The parent may have already closed the channel after reading From e73237951f35a5892dc5d8ca4cdd9e4ef5cd39b3 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 10:33:30 -0500 Subject: [PATCH 13/18] Remove commented-out tests --- test/Context/ForkContextTest.php | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/test/Context/ForkContextTest.php b/test/Context/ForkContextTest.php index 47604a1..ea51f88 100644 --- a/test/Context/ForkContextTest.php +++ b/test/Context/ForkContextTest.php @@ -6,6 +6,10 @@ use Amp\Parallel\Context\ForkContext; use Amp\Parallel\Context\ForkContextFactory; +/** + * @requires extension pcntl + * @requires extension posix + */ class ForkContextTest extends AbstractContextTest { public function createContext(string|array $script): Context @@ -16,22 +20,4 @@ public function createContext(string|array $script): Context return (new ForkContextFactory())->start($script); } - - public function testThrowingProcessOnReceive(): void - { - // tmp - $this->expectNotToPerformAssertions(); - } - - public function testThrowingProcessOnSend(): void - { - // tmp - $this->expectNotToPerformAssertions(); - } - - public function testImmediateJoin(): void - { - // tmp - $this->expectNotToPerformAssertions(); - } } From 70f93abbb5a3b31ced7dfc90a12b8d52ff8deb77 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 10:33:43 -0500 Subject: [PATCH 14/18] A couple notes --- src/Context/ForkContext.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 15ca8f7..f460f5c 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -111,14 +111,14 @@ public function __destruct() public function receive(?Cancellation $cancellation = null): mixed { - $this->checkExit(false); + $this->checkExit(false); // Will throw if the process exited unexpectedly. return parent::receive($cancellation); } public function send(mixed $data): void { - $this->checkExit(false); + $this->checkExit(false); // Will throw if the process exited unexpectedly. parent::send($data); } From b4c1bcf73c16501adb82ba584318890cd0c39793 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 10:49:07 -0500 Subject: [PATCH 15/18] Distinguish between send and receive --- src/Context/Internal/AbstractContext.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Context/Internal/AbstractContext.php b/src/Context/Internal/AbstractContext.php index 9044154..65b7c26 100644 --- a/src/Context/Internal/AbstractContext.php +++ b/src/Context/Internal/AbstractContext.php @@ -42,7 +42,7 @@ public function receive(?Cancellation $cancellation = null): mixed $this->ipcChannel->close(); throw new ContextException( - "The context stopped responding, potentially due to a fatal error or calling exit", + "The context stopped responding during receive, potentially due to a fatal error or calling exit", previous: $exception, ); } @@ -74,7 +74,7 @@ public function send(mixed $data): void $this->ipcChannel->close(); throw new ContextException( - "The context stopped responding, potentially due to a fatal error or calling exit", + "The context stopped responding during send, potentially due to a fatal error or calling exit", previous: $exception, ); } From bf4389c8a0c5c80a0eaa7295c9709a81ddd22eb1 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 11:07:02 -0500 Subject: [PATCH 16/18] Check for extensions --- src/Context/ForkContext.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index f460f5c..816bd7f 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -33,7 +33,10 @@ final class ForkContext extends AbstractContext public static function isSupported(): bool { - return \function_exists('pcntl_fork') && EventLoop::getDriver()->getHandle() === null; + return \extension_loaded('pcntl') + && \extension_loaded('posix') + && \function_exists('pcntl_fork') // pcntl_fork may be disabled. + && EventLoop::getDriver()->getHandle() === null; } /** @@ -104,11 +107,6 @@ private function __construct( parent::__construct($ipcChannel, $resultChannel); } - public function __destruct() - { - $this->close(); - } - public function receive(?Cancellation $cancellation = null): mixed { $this->checkExit(false); // Will throw if the process exited unexpectedly. From 9be63d62f97441fa7d5ec16849a9a6691041f193 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 11:16:00 -0500 Subject: [PATCH 17/18] Don't call close in join --- src/Context/ForkContext.php | 2 -- src/Context/ThreadContext.php | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 816bd7f..248b837 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -159,8 +159,6 @@ public function join(?Cancellation $cancellation = null): mixed { $data = $this->receiveExitResult($cancellation); - $this->close(); - return $data->getResult(); } } diff --git a/src/Context/ThreadContext.php b/src/Context/ThreadContext.php index fbabe0f..dd8ef9a 100644 --- a/src/Context/ThreadContext.php +++ b/src/Context/ThreadContext.php @@ -211,8 +211,6 @@ public function join(?Cancellation $cancellation = null): mixed { $data = $this->receiveExitResult($cancellation); - $this->close(); - return $data->getResult(); } } From 6e98e5235482261e9014ede82b59de129d020e3e Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 16 May 2026 11:18:49 -0500 Subject: [PATCH 18/18] Change variable name to match interface suffix --- src/Context/ForkContext.php | 4 ++-- src/Context/ProcessContext.php | 4 ++-- src/Context/ThreadContext.php | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Context/ForkContext.php b/src/Context/ForkContext.php index 248b837..973e104 100644 --- a/src/Context/ForkContext.php +++ b/src/Context/ForkContext.php @@ -157,8 +157,8 @@ public function close(): void public function join(?Cancellation $cancellation = null): mixed { - $data = $this->receiveExitResult($cancellation); + $result = $this->receiveExitResult($cancellation); - return $data->getResult(); + return $result->getResult(); } } diff --git a/src/Context/ProcessContext.php b/src/Context/ProcessContext.php index 8b303a0..65f34b2 100644 --- a/src/Context/ProcessContext.php +++ b/src/Context/ProcessContext.php @@ -282,12 +282,12 @@ public function __destruct() */ public function join(?Cancellation $cancellation = null): mixed { - $data = $this->receiveExitResult($cancellation); + $result = $this->receiveExitResult($cancellation); $code = $this->process->join(); try { - return $data->getResult(); + return $result->getResult(); } finally { if ($code !== 0) { // If an ExitFailure throws above, the exception will be automatically attached as the previous diff --git a/src/Context/ThreadContext.php b/src/Context/ThreadContext.php index dd8ef9a..004703b 100644 --- a/src/Context/ThreadContext.php +++ b/src/Context/ThreadContext.php @@ -209,8 +209,8 @@ public function close(): void public function join(?Cancellation $cancellation = null): mixed { - $data = $this->receiveExitResult($cancellation); + $result = $this->receiveExitResult($cancellation); - return $data->getResult(); + return $result->getResult(); } }