Skip to content

Commit 7e2fb94

Browse files
committed
redis_pool: implement working multiplex command path (Stage 2 v0)
Stateless commands now ride shared mux lanes end to end: dispatch in redis_process_cmd/_kw_cmd builds the command, classifies it, picks the least-loaded lane (argmin in_flight), enqueues a zend_future_t waiter, writes (batched) to the shared socket and suspends. A reactor READABLE C callback (redis_mux_pump) drains replies non-blocking, frames them, and resolves waiters' futures in FIFO order — Redis's per-connection reply order is the match, no correlation IDs. The resumed coroutine materializes its frame through a read-only memory stream + the ordinary atomic resp_cb into return_value. Stateful/blocking/SELECT and pinned coroutines fall back to checkout (Stage 1). Verified under ASAN against live Redis: concurrent and interleaved GET/SET return correct, correctly ordered replies (20/20 interleaved, 50/50 concurrent) with no per-operation leaks; the checkout suite (tests/async 7/7) is unaffected. v0 caveats, tracked in TRUE_ASYNC_POOL.md 9b: plain TCP only (raw recv on the lane fd), blocking write baton (no WRITABLE drain / backpressure yet), a one-time reactor-loop teardown leak, no broken-lane recovery, no .phpt yet. Adds the step-by-step mux walkthrough (5.6).
1 parent b024d44 commit 7e2fb94

4 files changed

Lines changed: 400 additions & 7 deletions

File tree

TRUE_ASYNC_POOL.md

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ best of both worlds.
99
- **Stage 1 (checkout pool): implemented & tested.** `redis_pool.{c,h}`, wired
1010
into the command path; concurrent coroutines, transaction pinning and
1111
concurrent-MULTI isolation pass under ASAN.
12-
- **Stage 2 (multiplexing): designed, not yet implemented.** See §5.
12+
- **Stage 2 (multiplexing): working (v0).** The mux command path is implemented
13+
and verified — concurrent commands over shared lanes return correct, correctly
14+
ordered replies under interleaving, with no per-operation leaks (ASAN). v0
15+
caveats: plain TCP only (no SSL on the lane), blocking write baton (no WRITABLE
16+
drain/backpressure yet), a one-time reactor-loop teardown leak (§9a), and no
17+
`.phpt` coverage yet. See §5.
1318

1419
---
1520

@@ -361,6 +366,46 @@ coroutine until the lane drains. A non-multiplexable command, or a lane marked
361366
broken, transparently falls back to `redis_pool_acquire_conn` (Stage 1);
362367
stateful sequences (MULTI/SUB/BLPOP/SELECT) always run on a private connection.
363368

369+
### 5.6 Walkthrough — a `$redis->get('x')` over mux
370+
371+
The concrete end-to-end flow (implemented and working; see Status):
372+
373+
1. **Dispatch** (`redis_process_cmd`): build the command bytes
374+
(`*2\r\n$3\r\nGET\r\n$1\r\nx\r\n`) using the template socket's serializer.
375+
Gate on `redis_pool_should_mux` (pool + mux, in a coroutine, no pinned conn)
376+
and `redis_cmd_is_multiplexable` (GET → yes).
377+
2. **Pick a lane** (`redis_mux_pick`): `argmin(in_flight)` across the lanes,
378+
lazily opening the socket (and its READABLE poll event → pump) on first use.
379+
3. **Register a waiter**: a `zend_future_t` plus a FIFO node pushed at the lane's
380+
tail; `in_flight++`. The FIFO order is the wire order.
381+
4. **Write** (`redis_mux_flush`): append the bytes to `out_buf`; the write-baton
382+
holder flushes them. Concurrent senders just append → one batched write =
383+
implicit pipelining.
384+
5. **Arm READABLE**: start the lane poll event so the reactor invokes the pump
385+
when replies arrive.
386+
6. **Await** (`redis_mux_await`): the coroutine suspends on its Future; control
387+
returns to the scheduler and other coroutines pile their commands onto the
388+
same lane.
389+
7. **Reply pump** (`redis_mux_pump`, a reactor C callback firing on socket
390+
readability, between coroutines): non-blocking `recv` into `in_buf`; for each
391+
complete RESP frame (`redis_resp_frame_len`) pop the FIFO head waiter (in
392+
order) and `ZEND_FUTURE_COMPLETE(future, frame)` → resolves the Future →
393+
resumes that coroutine.
394+
8. **Materialize** (back in the resumed coroutine): wrap the frame in a read-only
395+
memory stream and run the ordinary atomic `resp_cb``return_value`.
396+
397+
Reply matching needs no correlation IDs: steps 3–6 fix the order, step 7 hands
398+
replies out FIFO, and Redis guarantees reply order within a connection.
399+
400+
```
401+
coroutines A,B,C → GET on one lane
402+
write: [cmdA][cmdB][cmdC] ── one batched write (pipeline)
403+
wire ← replyA replyB replyC (in order)
404+
pump: replyA → FIFO.pop = A → wake A
405+
replyB → FIFO.pop = B → wake B
406+
replyC → FIFO.pop = C → wake C
407+
```
408+
364409
---
365410

366411
## 6. PHP-level API
@@ -417,12 +462,13 @@ pool, released on destroy.
417462
attributes, partial-frame safe; unit-tested across 21 cases).
418463
- [x] `redis_mux_t` lane struct + waiter (a `zend_future_t`) + lane array on the
419464
pool + lifecycle (lazy slots, teardown). ASAN-clean construct/destroy.
420-
- [ ] Lazy lane open + `argmin(in_flight)` selection.
421-
- [ ] Reply pump: recv + frame + FIFO pop + `ZEND_ASYNC_CALLBACKS_NOTIFY`.
422-
- [ ] Coroutine-side materialization via memory-stream + atomic `resp_cb`.
423-
- [ ] Write path: optimistic non-blocking write + WRITABLE drain + batching.
424-
- [ ] Dispatch in `redis_process_cmd`/`_kw_cmd`; degrade to checkout.
425-
- [ ] Backpressure: bounded FIFO + out-buffer high-water park the producer.
465+
- [x] Lazy lane open + `argmin(in_flight)` selection.
466+
- [x] Reply pump: recv + frame + FIFO pop + `ZEND_FUTURE_COMPLETE`.
467+
- [x] Coroutine-side materialization via memory-stream + atomic `resp_cb`.
468+
- [x] Dispatch in `redis_process_cmd`/`_kw_cmd`; degrade to checkout.
469+
- [~] Write path: v0 blocking write baton + batching (WRITABLE drain TODO, §9b).
470+
- [ ] Backpressure: bounded FIFO + out-buffer high-water park the producer (§9b).
471+
- [ ] Lane teardown leak fix (§9b) + broken-lane recovery + TLS on lanes.
426472
- [ ] Tests: reply ordering under interleaving, fallback, broken socket, batching.
427473

428474
---
@@ -492,6 +538,24 @@ multiplex path ships or as profiling dictates:
492538
4. **Micro:** `resp_line_end` scans for `\r\n` byte-by-byte; `memchr` is faster,
493539
but the lines here are short headers, so the gain is marginal. Low priority.
494540

541+
### 9b. Technical debt — multiplex v0
542+
543+
5. **Lane teardown leak (reactor loop).** A lane's READABLE poll event keeps the
544+
libuv loop alive at process shutdown: lanes are freed during object teardown
545+
(`free_redis_object``redis_pool_destroy`), past the reactor's final
546+
close-drain, so the deferred `uv_close` never completes. One-time and
547+
non-growing (constant 2 allocations regardless of command count); the
548+
per-operation path is leak-clean. Checkout connections (php_stream-managed) do
549+
not hit this — only the explicit `ZEND_ASYNC_NEW_SOCKET_EVENT` does. Fix needs
550+
closing lanes within the reactor-active phase (a pool-close/shutdown hook).
551+
6. **Blocking write baton, no backpressure.** v0 flushes `out_buf` with blocking
552+
`php_stream_write`; there is no WRITABLE-drain path nor an in-flight/out-buffer
553+
high-water mark yet (§5.3/§5.5).
554+
7. **Plain TCP only.** The pump reads via raw `recv(MSG_DONTWAIT)`, bypassing the
555+
stream filters — TLS on a mux lane is not supported in v0.
556+
8. **Single-lane teardown of pending waiters.** A broken lane must fail its
557+
in-flight futures and fall back to checkout (§5.5) — not yet implemented.
558+
495559
---
496560

497561
## 10. References

redis.c

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,36 @@ redis_process_cmd(INTERNAL_FUNCTION_PARAMETERS, redis_cmd_cb cmd_cb,
710710
int cmd_len;
711711
char *cmd;
712712

713+
if (Z_TYPE_P(getThis()) == IS_OBJECT) {
714+
redis_object *obj = PHPREDIS_ZVAL_GET_OBJECT(redis_object, getThis());
715+
if (redis_pool_should_mux(obj)) {
716+
if (UNEXPECTED(cmd_cb(INTERNAL_FUNCTION_PARAM_PASSTHRU, obj->sock, &cmd,
717+
&cmd_len, NULL, &ctx) == FAILURE)) {
718+
RETURN_FALSE;
719+
}
720+
if (redis_cmd_is_multiplexable(cmd, cmd_len)) {
721+
redis_mux_dispatch(obj, cmd, cmd_len, resp_cb, ctx, INTERNAL_FUNCTION_PARAM_PASSTHRU);
722+
return;
723+
}
724+
/* Stateful command: run it on a private checkout connection. */
725+
redis_sock = redis_sock_get(getThis(), 0);
726+
if (UNEXPECTED(redis_sock == NULL)) {
727+
efree(cmd);
728+
RETURN_FALSE;
729+
}
730+
if (redis_process_request(redis_sock, cmd, cmd_len) != SUCCESS) {
731+
RETURN_FALSE;
732+
}
733+
if (IS_ATOMIC(redis_sock)) {
734+
resp_cb(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, NULL, ctx);
735+
} else {
736+
REDIS_PROCESS_RESPONSE_CLOSURE(resp_cb, ctx);
737+
}
738+
redis_pool_maybe_release(getThis());
739+
return;
740+
}
741+
}
742+
713743
redis_sock = redis_sock_get(getThis(), 0);
714744
if (UNEXPECTED(redis_sock == NULL)) {
715745
RETURN_FALSE;
@@ -747,6 +777,35 @@ redis_process_kw_cmd(INTERNAL_FUNCTION_PARAMETERS, const char *kw,
747777
int cmd_len;
748778
char *cmd;
749779

780+
if (Z_TYPE_P(getThis()) == IS_OBJECT) {
781+
redis_object *obj = PHPREDIS_ZVAL_GET_OBJECT(redis_object, getThis());
782+
if (redis_pool_should_mux(obj)) {
783+
if (UNEXPECTED(cmd_cb(INTERNAL_FUNCTION_PARAM_PASSTHRU, obj->sock, (char*)kw, &cmd,
784+
&cmd_len, NULL, &ctx) == FAILURE)) {
785+
RETURN_FALSE;
786+
}
787+
if (redis_cmd_is_multiplexable(cmd, cmd_len)) {
788+
redis_mux_dispatch(obj, cmd, cmd_len, resp_cb, ctx, INTERNAL_FUNCTION_PARAM_PASSTHRU);
789+
return;
790+
}
791+
redis_sock = redis_sock_get(getThis(), 0);
792+
if (UNEXPECTED(redis_sock == NULL)) {
793+
efree(cmd);
794+
RETURN_FALSE;
795+
}
796+
if (redis_process_request(redis_sock, cmd, cmd_len) != SUCCESS) {
797+
RETURN_FALSE;
798+
}
799+
if (IS_ATOMIC(redis_sock)) {
800+
resp_cb(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, NULL, ctx);
801+
} else {
802+
REDIS_PROCESS_RESPONSE_CLOSURE(resp_cb, ctx);
803+
}
804+
redis_pool_maybe_release(getThis());
805+
return;
806+
}
807+
}
808+
750809
redis_sock = redis_sock_get(getThis(), 0);
751810
if (UNEXPECTED(redis_sock == NULL)) {
752811
RETURN_FALSE;

0 commit comments

Comments
 (0)