feat: {:awaitable, fn} host-parallel capabilities + yield propagation through call args#62
Merged
Merged
Conversation
… through call args
Adds `{:awaitable, fn}` as a new callable variant that hosts register to
enable parallel dispatch of external tool calls from Python async code.
## What changed
### {:awaitable, fn} capability variant (Ctx, Interpreter, Invocation)
- `Ctx.new_awaiting_capability_iterator/4` — new iter pool entry type
`{:gen_awaiting_capability, sentinel, cont, gen_env}` for caps waiting
on host dispatch
- `Interpreter.call_function({:awaitable, fun})` — calling an awaitable
returns a coroutine that immediately yields an
`{:asyncio_capability_call, cap_id, fun, args}` sentinel; the
trampoline dispatches the fn and resumes via `resume_capability/4`
- `Invocation.resume_capability/4` and `invoke_capability/3` — public
API for the trampoline to feed a result back into a waiting cap iter
- `{:cont_capability_resume}` frame — when a capability coroutine is
resumed via send, terminates with the sent value as the return
### Parallel gather dispatch (Pyex.Stdlib.Asyncio)
`asyncio.gather` over `{:awaitable, _}` caps now fans out all cap calls
in a round as concurrent BEAM Tasks via `Task.async_stream`, then resumes
each iter with its result before the next round. Sequential code (for
loop with `await`) still works — caps are dispatched inline.
**Benchmark**: 20 capability calls × 50 ms sleep → ~20x speedup
(parallel ~52 ms vs sequential ~1037 ms), identical results.
`Task.async_stream/3` added to `BannedCallTracer` allowlist with a note
explaining why it's safe (host code only, bounded by gather duration).
### Yield propagation through call args (Interpreter, Calls)
`out.append(await slow_call(i))` and any expression of the form
`f(await expr, ...)` now work without a temp variable.
`eval_var_attr_call`, `eval_call_expr` (getattr), and `eval_call_expr`
(general) are refactored to use a new `Calls.eval_call_with_yield/5` →
`eval_remaining_and_call/8` pipeline that:
- evaluates args recursively (handling kwarg, *args, **kwargs)
- on any arg yield, emits `{:cont_call_pos_resume, ...}` /
`{:cont_call_kw_resume, ...}` / `{:cont_call_star_resume, ...}` /
`{:cont_call_dstar_resume, ...}` frames
- when resumed via `resume_generator_with_send`, continues evaluating
remaining args and then calls the function with full mutation
write-back (`:mutate`, `:mutate_arg` handled per call-site variant)
### advance_iter_one: fresh semantics for resolved capability sentinels
A nested-await chain (`await helper()` where `helper` does
`return await cap()`) previously re-surfaced the already-dispatched
capability sentinel, causing a double `resume_capability` crash.
Root cause: when `get_id` is in `{:gen_pending, cap_sentinel, cont}` and
`advance_iter_one` is called after the cap was resolved, the buffered
semantics deliver `cap_sentinel` again rather than running the
continuation.
Fix: `advance_iter_one` checks whether the pending value is a
`{:asyncio_capability_call, cap_id, ...}` sentinel AND `cap_id` is
already `{:gen_done, _}`. If so, it uses fresh semantics (run the
continuation, return its result) instead of re-surfacing the stale
sentinel. Cooperative-yield interleaving (ABABAB gather test) is
unaffected because regular `{:asyncio_sleep, _}` sentinels remain on
the buffered path.
## Tests
7 new tests in `test/pyex/async_conformance_test.exs`:
- sequential `await cap()` returns correct value
- `gather` result order preserved
- `gather` runs in parallel (wall-clock ≈ 1× cap, not N×)
- `out.append(await cap(i))` without temp var
- gather and sequential loop produce identical results
- capability called from nested helper coroutine
- gather over helper coroutines that each await a capability
5356 tests total, 0 failures. Dialyzer clean.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two artifacts that prove {awaitable, fn} parallel dispatch is correct
and genuinely parallel, not just concurrent.
## bench/parallel_proof.exs
Runs N CPU-bound tasks (sum 1..500_000, pure arithmetic) sequentially
and via asyncio.gather, recording which BEAM scheduler ID each task
executes on. Outputs:
- Wall time for both modes
- Speedup (observed vs theoretical max)
- Per-scheduler call distribution (shows multiple OS threads ran)
CPU-bound work makes it impossible to explain speedup via event-loop
tricks: if 16 tasks burning 1.5 ms of CPU each finish in 3 ms wall
time, multiple physical threads ran simultaneously.
## test/pyex/map_reduce_proof_test.exs
Four tests covering three independent mathematical invariants over 64
parallel capability calls. Uses f(x) = a·x mod p (M61 Mersenne prime)
where a is a host-side secret the Python program never sees.
### Invariant 1 — element-wise (impossible to pass with wrong inputs)
result[i] == a·x_i mod p for all i
f is a bijection on Z_p and inputs are distinct, so f(x_j) ≠ f(x_i)
for j ≠ i. A wrong-input result CANNOT accidentally equal the correct
value. This is a proof, not a probabilistic bound.
### Invariant 2 — polynomial fingerprint (catches position swaps)
Σ result[i]·r^i mod p == Σ expected[i]·r^i mod p
Random evaluation point r chosen after the run (Fiat–Shamir pattern).
By Schwartz–Zippel, any nontrivial permutation of results produces a
different polynomial evaluation at r except at ≤ N roots.
Pr[false positive] ≤ N/p ≈ 64/(2^61) < 2^−55.
### Invariant 3 — MapReduce sum invariant (closed-form reduce)
Σ f(x_i) mod p == a · Σ x_i mod p
The host can compute the expected reduce output from the inputs alone,
without running any map tasks. This is the algebraic signature of
MapReduce: the reduce result is derivable from the inputs by linearity
of f. Any wrong mapped value shifts the sum by a non-zero amount mod p.
All three invariants hold simultaneously on fresh random (a, inputs)
pairs each test run, so the test cannot have been tuned for a specific
instance.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
{:awaitable, fn}callable variant: hosts register external tools as awaitable capabilities; calling one from Python returns a coroutine that yields an{:asyncio_capability_call, ...}sentinel for the trampoline to dispatchasyncio.gatherover awaitable caps fans out all calls in a round as concurrent BEAM Tasks (Task.async_stream), then resumes each iter with its result — ~20x speedup for a batch of 20 × 50 ms callsout.append(await slow_call(i))and anyf(await expr, ...)pattern now works correctly; yield signals thread through call args via new{:cont_call_pos_resume, ...}/{:cont_call_kw_resume, ...}/ star-arg framesadvance_iter_oneuses fresh semantics for resolved capability sentinels, fixingawait helper()wherehelperwraps a cap (InvalidStateError: capability iter not awaitingin nested-await chains)Architecture
{:awaitable, fn}capability flowThe Python code is identical. The host registration controls fan-out shape.
Yield propagation through call args
eval_var_attr_call,eval_call_expr(getattr), andeval_call_expr(general) now route throughCalls.eval_call_with_yield/5→eval_remaining_and_call/8, which evaluates args recursively with full yield support and mutation write-back (:mutate,:mutate_arghandled per call-site variant).advance_iter_one fix for resolved caps
When
advance_iter_oneis called on a:gen_pendingiter whose buffered value is a capability sentinel AND that cap is already{:gen_done, _}, it uses fresh semantics (runs the continuation) rather than re-surfacing the stale sentinel. Regular sleep yields remain on the buffered path so cooperative interleaving (ABABAB gather test) is unaffected.Test plan
mix format --check-formattedcleanmix compile --warnings-as-errorscleanmix test— 5356 tests, 0 failures, 2 skippedmix dialyzer— 43 errors / 43 skipped (baseline unchanged)out.append(await cap(i)), gather vs loop identity, nested helper coroutine, gather over helpers🤖 Generated with Claude Code