Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Fixed
- **#161 `spawn_thread`/`ThreadPool`/`ThreadChannel`: transferred closures lost their class scope** — a `static` closure declared inside a class arrived in the worker unscoped, so its first `self::`/`static::` threw `Cannot access "self" when no class scope is active`; a `$this`-bound closure got `Z_OBJCE($this)` instead of its declaring class, breaking `self::` and private-member visibility under inheritance. The snapshot now carries `scope`/`called_scope` by name and re-resolves them in the target thread; a missing class throws `Cannot restore closure scope: class "X" not found in the target thread` instead of silently dropping the scope. Closures scoped to anonymous classes are rejected at transfer time. Tests `tests/thread/072`–`073`; `057` re-pinned to native scope semantics.
- **`ThreadPool` synchronous task: snapshot use-after-free when the task spawns an un-awaited coroutine** — a sync-mode task body ran inline in the worker and its per-task snapshot arena (which backs every spawned closure's op_array) was freed the instant the body returned, while a coroutine the body had spawned was still pending; running it later dereferenced freed memory (Windows debug-heap crash; ASAN-caught on Linux). The body now runs as a coroutine in its own per-task **nursery** `Scope`: `Async\spawn()` inside the body lands in that scope on its own (no scope-pointer hijacking), and on task exit the scope is cancelled and *drained* — awaited until every spawned coroutine is physically disposed — before the snapshot is freed. ABI bumped to v0.20.0: new `zend_async_scope_await_after_cancellation_fn` exposes the C core of `Scope::awaitAfterCancellation` so the worker reuses the canonical zombie-aware drain instead of hand-rolling it. Regression test `tests/thread_pool/065-task_scope_nursery_no_uaf.phpt`.
- **`ThreadPool`: fatal in a task no longer leaves a use-after-free or a leaked libuv loop** — a fatal (e.g. OOM) in a task body now rejects the future with `ThreadTransferException` and tears the pool down cleanly. The snapshot's op_array name strings (`function_name`, `filename`) are materialized into refcounted heap strings so holders that outlive the snapshot arena (the closure, `PG(last_error_file)`) are freed by refcount instead of dangling. The `zend_bailout()` that a fatal re-raises through a parked `ThreadChannel` send/recv or the worker's slot wait is now caught so the channel/slot trigger is disposed before re-raising — an undisposed trigger's open `uv_async` would block `uv_loop_close` and leak the reactor's loop internals. Debug builds dump any libuv handle that survives reactor shutdown. Regression tests `tests/thread_pool/066`–`068`.
- **`ThreadPool` coroutine-mode task: a fatal now reports its cause instead of resolving to `null`** — in `coroutine: true` mode a task that hit a fatal/OOM (or `exit()`/`die()`) resolved its future to a silent `null`, because the completion path only checked for a thrown exception and a bailout is not one. It now detects the bailout (no exception, `UNDEF` result) and rejects the future with a `ThreadTransferException` carrying the fatal message, matching synchronous mode. Tests `tests/thread_pool/067`, `068`.
Expand Down
53 changes: 31 additions & 22 deletions fuzzy-tests/_harness/Steps.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@
require_once __DIR__ . '/StepRegistry.php';

final class StandardSteps {
/**
* Worker-thread payloads below are lexically declared inside this class,
* but chaos workers boot bare (no harness classes). Strip the class scope
* so the transfer does not require StandardSteps on the worker side.
*/
private static function unscoped(\Closure $fn): \Closure {
return \Closure::bind($fn, null, null);
}

public static function register(StepRegistry $r): StepRegistry {
// ---- Given: setup ----

Expand Down Expand Up @@ -3660,7 +3669,7 @@ function(Context $ctx, string $coro, string $nExpr, string $pool) {
for ($i = 0; $i < $n; $i++) {
$ctx->inc("tp_submit_attempts_$pool");
try {
$f = $p->submit(static fn(int $idx): int => $idx, $i);
$f = $p->submit(self::unscoped(static fn(int $idx): int => $idx), $i);
$ctx->threadPoolFutures[$pool][] = $f;
$ctx->inc("tp_submitted_$pool");
} catch (\Throwable $e) {
Expand Down Expand Up @@ -3760,7 +3769,7 @@ function(Context $ctx, string $coro, string $nExpr, string $pool) {
$items = range(0, $n - 1);
$ctx->inc("tp_map_attempts_$pool");
try {
$res = $ctx->threadPools[$pool]->map($items, static fn(int $i): int => $i * $i);
$res = $ctx->threadPools[$pool]->map($items, self::unscoped(static fn(int $i): int => $i * $i));
$ctx->inc("tp_map_succeeded_$pool");
$ctx->inc("tp_map_results_$pool", count($res));
} catch (\Throwable $e) {
Expand Down Expand Up @@ -3831,11 +3840,11 @@ function(Context $ctx, string $coro, string $nExpr) {
for ($i = 0; $i < $n; $i++) {
$ctx->inc("thr_spawn_attempts_$coro");
try {
$h = \Async\spawn_thread(static function() use ($i): array {
$h = \Async\spawn_thread(self::unscoped(static function() use ($i): array {
$x = 0.0;
for ($j = 0; $j < 20000; $j++) { $x += sqrt($j); }
return ['idx' => $i, 'x' => $x];
});
}));
$ctx->threadHandles[$coro][] = $h;
$ctx->inc("thr_spawned_$coro");
} catch (\Throwable $e) {
Expand All @@ -3854,11 +3863,11 @@ function(Context $ctx, string $coro, string $nExpr) {
for ($i = 0; $i < $n; $i++) {
$ctx->inc("thr_spawn_attempts_$coro");
try {
$h = \Async\spawn_thread(static function() use ($i): void {
$h = \Async\spawn_thread(self::unscoped(static function() use ($i): void {
$x = 0.0;
for ($j = 0; $j < 20000; $j++) { $x += sqrt($j); }
throw new \RuntimeException('thread boom ' . $i);
});
}));
$ctx->threadHandles[$coro][] = $h;
$ctx->inc("thr_spawned_$coro");
} catch (\Throwable $e) {
Expand All @@ -3880,11 +3889,11 @@ function(Context $ctx, string $coro, string $nExpr) {
for ($i = 0; $i < $n; $i++) {
$ctx->inc("thr_spawn_attempts_$coro");
try {
\Async\spawn_thread(static function() use ($i): array {
\Async\spawn_thread(self::unscoped(static function() use ($i): array {
$x = 0.0;
for ($j = 0; $j < 40000; $j++) { $x += sqrt($j); }
return ['idx' => $i, 'x' => $x, 'buf' => str_repeat('w', 64)];
});
}));
$ctx->inc("thr_spawned_$coro");
} catch (\Throwable $e) {
$ctx->inc("thr_spawn_failed_$coro");
Expand Down Expand Up @@ -3981,9 +3990,9 @@ function(Context $ctx, string $coro, string $f, string $valExpr) {
$state = $ctx->futureStates[$f];
$ctx->inc("rf_xfer_attempts_$f");
try {
$h = \Async\spawn_thread(static function() use ($state, $val) {
$h = \Async\spawn_thread(self::unscoped(static function() use ($state, $val) {
$state->complete($val);
});
}));
$ctx->remoteFutureThreads[$f] = $h;
$ctx->inc("rf_xfer_ok_$f");
} catch (\Throwable $e) {
Expand All @@ -4004,9 +4013,9 @@ function(Context $ctx, string $coro, string $f, string $msg) {
$state = $ctx->futureStates[$f];
$ctx->inc("rf_xfer_attempts_$f");
try {
$h = \Async\spawn_thread(static function() use ($state, $msg) {
$h = \Async\spawn_thread(self::unscoped(static function() use ($state, $msg) {
$state->error(new \RuntimeException($msg));
});
}));
$ctx->remoteFutureThreads[$f] = $h;
$ctx->inc("rf_xfer_ok_$f");
} catch (\Throwable $e) {
Expand All @@ -4031,9 +4040,9 @@ function(Context $ctx, string $coro, string $f) {
$state = $ctx->futureStates[$f];
$ctx->inc("rf_xfer_attempts_$f");
try {
$h = \Async\spawn_thread(static function() use ($state) {
$h = \Async\spawn_thread(self::unscoped(static function() use ($state) {
throw new \RuntimeException("worker crashed before complete");
});
}));
$ctx->remoteFutureThreads[$f] = $h;
$ctx->inc("rf_xfer_ok_$f");
} catch (\Throwable $e) {
Expand Down Expand Up @@ -4108,10 +4117,10 @@ function(Context $ctx, string $coro, string $f) {
$state = $ctx->futureStates[$f];
$ctx->inc("rf_double_xfer_attempts_$f");
try {
$h2 = \Async\spawn_thread(static function() use ($state) {
$h2 = \Async\spawn_thread(self::unscoped(static function() use ($state) {
// unreachable if transfer is blocked
try { $state->complete("dup"); } catch (\Throwable $e) {}
});
}));
// Transfer was allowed — join the rogue so we don't leak it.
try { \Async\await($h2); } catch (\Throwable $e) {}
$ctx->inc("rf_double_xfer_allowed_$f");
Expand Down Expand Up @@ -4180,12 +4189,12 @@ function(Context $ctx, string $coro, string $nExpr, string $tc) {
}
$ch = $ctx->threadChannels[$tc];
$ctx->inc("tc_thread_send_attempts_$tc");
$h = \Async\spawn_thread(static function() use ($ch, $n): int {
$h = \Async\spawn_thread(self::unscoped(static function() use ($ch, $n): int {
for ($i = 0; $i < $n; $i++) {
$ch->send($i);
}
return $n;
});
}));
for ($i = 0; $i < $n; $i++) {
try {
$ch->recv();
Expand Down Expand Up @@ -4218,12 +4227,12 @@ function(Context $ctx, string $coro, string $nExpr, string $tc) {
}
$ch = $ctx->threadChannels[$tc];
$ctx->inc("tc_thread_recv_attempts_$tc");
$h = \Async\spawn_thread(static function() use ($ch, $n): int {
$h = \Async\spawn_thread(self::unscoped(static function() use ($ch, $n): int {
for ($i = 0; $i < $n; $i++) {
$ch->recv();
}
return $n;
});
}));
for ($i = 0; $i < $n; $i++) {
try {
$ch->send($i);
Expand Down Expand Up @@ -4256,14 +4265,14 @@ function(Context $ctx, string $coro, string $tc) {
}
$ch = $ctx->threadChannels[$tc];
$ctx->inc("tc_close_race_attempts_$tc");
$h = \Async\spawn_thread(static function() use ($ch): string {
$h = \Async\spawn_thread(self::unscoped(static function() use ($ch): string {
try {
$ch->recv();
return "no-throw";
} catch (\Throwable $e) {
return "threw";
}
});
}));
$ch->close();
try {
$outcome = \Async\await($h);
Expand Down
9 changes: 3 additions & 6 deletions tests/thread/057-spawn_thread_this_subclass_self.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ if (!PHP_ZTS) die('skip ZTS required');
if (!function_exists('Async\spawn_thread')) die('skip spawn_thread not available');
?>
--DESCRIPTION--
Documents the current scope semantics for transferred closures.
In native PHP `self::X` resolves to the closure's *defining* class (Base).
Across spawn_thread the worker scope is currently set to Z_OBJCE($this),
so `self::X` resolves to Child::X. Same closure called locally still
gives Base::X — this test pins both behaviors so a future fix is
visible as an EXPECT diff.
The transferred closure carries its scope by name, so the worker matches
the local result: self::X is Base::X on both sides.
--FILE--
<?php

Expand Down Expand Up @@ -41,4 +38,4 @@ spawn(function() use ($boot) {
?>
--EXPECT--
local: base
worker: child
worker: base
35 changes: 35 additions & 0 deletions tests/thread/072-spawn_thread_static_closure_self.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
--TEST--
spawn_thread() - static closure declared in a class keeps self::/static:: scope
--SKIPIF--
<?php
if (!PHP_ZTS) die('skip ZTS required');
if (!function_exists('Async\spawn_thread')) die('skip spawn_thread not available');
?>
--FILE--
<?php

use function Async\spawn;
use function Async\spawn_thread;
use function Async\await;

class Demo {
private const TAG = 'reached';

public static function makeWorker(): \Closure {
return static function(): string {
return self::TAG . '|' . static::class;
};
}
}

$boot = function() {
eval('class Demo { private const TAG = "reached"; }');
};

spawn(function() use ($boot) {
$t = spawn_thread(Demo::makeWorker(), bootloader: $boot);
echo await($t), "\n";
});
?>
--EXPECT--
reached|Demo
32 changes: 32 additions & 0 deletions tests/thread/073-spawn_thread_closure_scope_class_missing.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
--TEST--
spawn_thread() - class-scoped closure with no bootloader → clear scope error
--SKIPIF--
<?php
if (!PHP_ZTS) die('skip ZTS required');
if (!function_exists('Async\spawn_thread')) die('skip spawn_thread not available');
?>
--FILE--
<?php

use function Async\spawn;
use function Async\spawn_thread;
use function Async\await;

class Demo {
public static function makeWorker(): \Closure {
return static function(): int { return 1; };
}
}

spawn(function() {
$t = spawn_thread(Demo::makeWorker());
try {
echo await($t), "\n";
echo "ERROR: should not reach here\n";
} catch (\Throwable $e) {
echo $e->getMessage(), "\n";
}
});
?>
--EXPECTF--
%ACannot restore closure scope: class "Demo" not found%A
Loading
Loading