Skip to content

feat: {:awaitable, fn} host-parallel capabilities + yield propagation through call args#62

Merged
ivarvong merged 2 commits into
mainfrom
feat/awaitable-capabilities
May 11, 2026
Merged

feat: {:awaitable, fn} host-parallel capabilities + yield propagation through call args#62
ivarvong merged 2 commits into
mainfrom
feat/awaitable-capabilities

Conversation

@ivarvong
Copy link
Copy Markdown
Owner

Summary

  • New {:awaitable, fn} callable variant: hosts register external tools as awaitable capabilities; calling one from Python returns a coroutine that yields an {:asyncio_capability_call, ...} sentinel for the trampoline to dispatch
  • asyncio.gather over awaitable caps fans out all calls in a round as concurrent BEAM Tasks (Task.async_stream), then resumes each iter with its result — ~20x speedup for a batch of 20 × 50 ms calls
  • out.append(await slow_call(i)) and any f(await expr, ...) pattern now works correctly; yield signals thread through call args via new {:cont_call_pos_resume, ...} / {:cont_call_kw_resume, ...} / star-arg frames
  • advance_iter_one uses fresh semantics for resolved capability sentinels, fixing await helper() where helper wraps a cap (InvalidStateError: capability iter not awaiting in nested-await chains)

Architecture

{:awaitable, fn} capability flow

# Python code — identical regardless of sequential vs parallel
async def main():
    results = await asyncio.gather(*[search(q) for q in queries])
# Sequential (builtin) — Pyex drives one at a time
%{"tools" => %{"search" => {:builtin, fn [q] -> ... end}}}

# Parallel (awaitable) — gather dispatches all as BEAM Tasks concurrently  
%{"tools" => %{"search" => {:awaitable, fn [q] -> ... end}}}

The Python code is identical. The host registration controls fan-out shape.

Yield propagation through call args

eval_var_attr_call, eval_call_expr (getattr), and eval_call_expr (general) now route through Calls.eval_call_with_yield/5eval_remaining_and_call/8, which evaluates args recursively with full yield support and mutation write-back (:mutate, :mutate_arg handled per call-site variant).

advance_iter_one fix for resolved caps

When advance_iter_one is called on a :gen_pending iter whose buffered value is a capability sentinel AND that cap is already {:gen_done, _}, it uses fresh semantics (runs the continuation) rather than re-surfacing the stale sentinel. Regular sleep yields remain on the buffered path so cooperative interleaving (ABABAB gather test) is unaffected.

Test plan

  • mix format --check-formatted clean
  • mix compile --warnings-as-errors clean
  • mix test — 5356 tests, 0 failures, 2 skipped
  • mix dialyzer — 43 errors / 43 skipped (baseline unchanged)
  • 7 new conformance tests covering: sequential await, gather order, parallel speedup, out.append(await cap(i)), gather vs loop identity, nested helper coroutine, gather over helpers

🤖 Generated with Claude Code

ivarvong and others added 2 commits May 10, 2026 15:20
… through call args

Adds `{:awaitable, fn}` as a new callable variant that hosts register to
enable parallel dispatch of external tool calls from Python async code.

## What changed

### {:awaitable, fn} capability variant (Ctx, Interpreter, Invocation)

- `Ctx.new_awaiting_capability_iterator/4` — new iter pool entry type
  `{:gen_awaiting_capability, sentinel, cont, gen_env}` for caps waiting
  on host dispatch
- `Interpreter.call_function({:awaitable, fun})` — calling an awaitable
  returns a coroutine that immediately yields an
  `{:asyncio_capability_call, cap_id, fun, args}` sentinel; the
  trampoline dispatches the fn and resumes via `resume_capability/4`
- `Invocation.resume_capability/4` and `invoke_capability/3` — public
  API for the trampoline to feed a result back into a waiting cap iter
- `{:cont_capability_resume}` frame — when a capability coroutine is
  resumed via send, terminates with the sent value as the return

### Parallel gather dispatch (Pyex.Stdlib.Asyncio)

`asyncio.gather` over `{:awaitable, _}` caps now fans out all cap calls
in a round as concurrent BEAM Tasks via `Task.async_stream`, then resumes
each iter with its result before the next round.  Sequential code (for
loop with `await`) still works — caps are dispatched inline.

**Benchmark**: 20 capability calls × 50 ms sleep → ~20x speedup
(parallel ~52 ms vs sequential ~1037 ms), identical results.

`Task.async_stream/3` added to `BannedCallTracer` allowlist with a note
explaining why it's safe (host code only, bounded by gather duration).

### Yield propagation through call args (Interpreter, Calls)

`out.append(await slow_call(i))` and any expression of the form
`f(await expr, ...)` now work without a temp variable.

`eval_var_attr_call`, `eval_call_expr` (getattr), and `eval_call_expr`
(general) are refactored to use a new `Calls.eval_call_with_yield/5` →
`eval_remaining_and_call/8` pipeline that:
- evaluates args recursively (handling kwarg, *args, **kwargs)
- on any arg yield, emits `{:cont_call_pos_resume, ...}` /
  `{:cont_call_kw_resume, ...}` / `{:cont_call_star_resume, ...}` /
  `{:cont_call_dstar_resume, ...}` frames
- when resumed via `resume_generator_with_send`, continues evaluating
  remaining args and then calls the function with full mutation
  write-back (`:mutate`, `:mutate_arg` handled per call-site variant)

### advance_iter_one: fresh semantics for resolved capability sentinels

A nested-await chain (`await helper()` where `helper` does
`return await cap()`) previously re-surfaced the already-dispatched
capability sentinel, causing a double `resume_capability` crash.

Root cause: when `get_id` is in `{:gen_pending, cap_sentinel, cont}` and
`advance_iter_one` is called after the cap was resolved, the buffered
semantics deliver `cap_sentinel` again rather than running the
continuation.

Fix: `advance_iter_one` checks whether the pending value is a
`{:asyncio_capability_call, cap_id, ...}` sentinel AND `cap_id` is
already `{:gen_done, _}`.  If so, it uses fresh semantics (run the
continuation, return its result) instead of re-surfacing the stale
sentinel.  Cooperative-yield interleaving (ABABAB gather test) is
unaffected because regular `{:asyncio_sleep, _}` sentinels remain on
the buffered path.

## Tests

7 new tests in `test/pyex/async_conformance_test.exs`:
- sequential `await cap()` returns correct value
- `gather` result order preserved
- `gather` runs in parallel (wall-clock ≈ 1× cap, not N×)
- `out.append(await cap(i))` without temp var
- gather and sequential loop produce identical results
- capability called from nested helper coroutine
- gather over helper coroutines that each await a capability

5356 tests total, 0 failures.  Dialyzer clean.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two artifacts that prove {awaitable, fn} parallel dispatch is correct
and genuinely parallel, not just concurrent.

## bench/parallel_proof.exs

Runs N CPU-bound tasks (sum 1..500_000, pure arithmetic) sequentially
and via asyncio.gather, recording which BEAM scheduler ID each task
executes on.  Outputs:
- Wall time for both modes
- Speedup (observed vs theoretical max)
- Per-scheduler call distribution (shows multiple OS threads ran)

CPU-bound work makes it impossible to explain speedup via event-loop
tricks: if 16 tasks burning 1.5 ms of CPU each finish in 3 ms wall
time, multiple physical threads ran simultaneously.

## test/pyex/map_reduce_proof_test.exs

Four tests covering three independent mathematical invariants over 64
parallel capability calls.  Uses f(x) = a·x mod p (M61 Mersenne prime)
where a is a host-side secret the Python program never sees.

### Invariant 1 — element-wise (impossible to pass with wrong inputs)

  result[i] == a·x_i mod p  for all i

f is a bijection on Z_p and inputs are distinct, so f(x_j) ≠ f(x_i)
for j ≠ i.  A wrong-input result CANNOT accidentally equal the correct
value.  This is a proof, not a probabilistic bound.

### Invariant 2 — polynomial fingerprint (catches position swaps)

  Σ result[i]·r^i mod p  ==  Σ expected[i]·r^i mod p

Random evaluation point r chosen after the run (Fiat–Shamir pattern).
By Schwartz–Zippel, any nontrivial permutation of results produces a
different polynomial evaluation at r except at ≤ N roots.
Pr[false positive] ≤ N/p ≈ 64/(2^61) < 2^−55.

### Invariant 3 — MapReduce sum invariant (closed-form reduce)

  Σ f(x_i) mod p  ==  a · Σ x_i mod p

The host can compute the expected reduce output from the inputs alone,
without running any map tasks.  This is the algebraic signature of
MapReduce: the reduce result is derivable from the inputs by linearity
of f.  Any wrong mapped value shifts the sum by a non-zero amount mod p.

All three invariants hold simultaneously on fresh random (a, inputs)
pairs each test run, so the test cannot have been tuned for a specific
instance.
@ivarvong ivarvong merged commit bf69860 into main May 11, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant