Conversation
- Send SYN/ACK/FIN/RST as TYPE_WINDOW_UPDATE, not TYPE_DATA - Interpret length as window delta for TYPE_WINDOW_UPDATE frames - Serialize writes with _write_lock to prevent frame interleaving Ref: github.com/libp2p/go-yamux/v5 (stream.go, session.go, const.go)
fix(yamux): match go-yamux frame types and add write serialization
Port go-yamux's window auto-tuning to py-libp2p. The receive window starts at 256KB and doubles each RTT epoch up to 16MB, with 50% hysteresis to avoid flooding small updates. Includes background RTT measurement via ping/pong with exponential smoothing. This eliminates ~4,000 round-trips per 1GB transfer (down to ~64), significantly improving throughput for perf tests.
feat: add yamux receive window auto-tuning
|
Thanks @asabya . Please read the issues, and elaborate on them. AI PR Review: #1269 — yamux fixesReview generated using 1. Summary of ChangesThis PR addresses yamux interoperability with go-yamux and throughput on multiplexed connections:
Related issues (not “Fixes” in the PR body, but linked in the ecosystem):
Affected modules: Breaking changes: None intended; behavior change is wire-compatible with correct yamux peers and fixes interop with Go. 2. Branch Sync Status and Merge ConflictsBranch sync statusFrom
Merge conflict analysis
3. Strengths
4. Issues FoundCriticalNone confirmed with a failing test locally. Two areas deserve maintainer scrutiny before treating this as production-ready:
async def _auto_tune_and_send_window_update(
self: "YamuxStream", bytes_consumed: int
) -> None:
"""
Auto-tune receive window size based on RTT and send window update.
Ports go-yamux's auto-tuning: starts at 256KB, doubles each RTT epoch
up to 16MB. Only sends update when delta >= 50% of target (hysteresis).
"""
async with self.window_lock:
delta = self.target_recv_window - self.recv_window
# Hysteresis: skip if delta < 50% of target (matches go-yamux GrowTo)
if delta < self.target_recv_window // 2:
return
# Auto-tune: if within 4x RTT of last epoch, double the target
now = trio.current_time()
rtt = self.conn.rtt()
if rtt > 0 and self.epoch_start > 0 and (now - self.epoch_start) < rtt * 4:
self.target_recv_window = min(
self.target_recv_window * 2, MAX_WINDOW_SIZE
)
delta = self.target_recv_window - self.recv_window
self.epoch_start = now
self.recv_window += delta
logger.debug(
f"Stream {self.stream_id}: Auto-tune window update "
f"delta={delta}, target={self.target_recv_window}"
)
await self.send_window_update(delta, skip_lock=True)
if len(buffer) > 0:
chunk = bytes(buffer)
buffer.clear()
data += chunk
# Auto-tune and send window update for the chunk we just read
await self._auto_tune_and_send_window_update(len(chunk))Major
async def _measure_rtt_loop(self) -> None:
"""Background task that periodically measures RTT via ping/pong."""
# Initial delay to let the connection establish
await trio.sleep(0.5)
while not self.event_shutting_down.is_set():
try:
self._ping_id += 1
self._ping_event = trio.Event()
self._ping_sent_time = trio.current_time()
header = struct.pack(
YAMUX_HEADER_FORMAT, 0, TYPE_PING, FLAG_SYN, 0, self._ping_id
)
await self.secured_conn.write(header)
async def _write_frame(self, data: bytes) -> None:
"""Write a frame to the connection, serializing all writes."""
if len(data) >= HEADER_SIZE:
_, typ, flags, sid, length = struct.unpack(
YAMUX_HEADER_FORMAT, data[:HEADER_SIZE]
)
flag_names = []
if flags & FLAG_SYN:
flag_names.append("SYN")
if flags & FLAG_ACK:
flag_names.append("ACK")
if flags & FLAG_FIN:
flag_names.append("FIN")
if flags & FLAG_RST:
flag_names.append("RST")
type_names = {0: "DATA", 1: "WINDOW_UPDATE", 2: "PING", 3: "GO_AWAY"}
logger.info(
f"YAMUX TX: type={type_names.get(typ, typ)} "
f"flags={'+'.join(flag_names) or '0'} "
f"stream={sid} length={length} "
f"is_initiator={self.is_initiator_value} "
f"payload_bytes={len(data) - HEADER_SIZE}"
)
async with self._write_lock:
await self.secured_conn.write(data)
Example expected layout (not present on the branch at review time):
Minor
logger.info(
f"YAMUX TX: type={type_names.get(typ, typ)} "
f"flags={'+'.join(flag_names) or '0'} "
f"stream={sid} length={length} "
f"is_initiator={self.is_initiator_value} "
f"payload_bytes={len(data) - HEADER_SIZE}"
)
logger.info(
f"YAMUX RX: type={type_names.get(typ, typ)} "
f"flags={'+'.join(flag_names) or '0'} "
f"stream={stream_id} length={length} "
f"is_initiator={self.is_initiator_value}"
)
async def close(self) -> None:
async with self.close_lock:
if not self.send_closed:
logger.debug(f"Half-closing stream {self.stream_id} (local end)")
try:
header = struct.pack(
YAMUX_HEADER_FORMAT,
0,
TYPE_WINDOW_UPDATE,
FLAG_FIN,
self.stream_id,
0,
)
await self.conn._write_frame(header)
except RawConnError as e:
logger.debug(f"Error sending FIN, connection likely closed: {e}")
finally:
self.send_closed = True
# Only set fully closed if both directions are closed
if self.send_closed and self.recv_closed:
self.closed = True
else:
# Stream is half-closed but not fully closed
self.closed = False
async def reset(self) -> None:
if not self.closed:
async with self.close_lock:
logger.debug(f"Resetting stream {self.stream_id}")
try:
header = struct.pack(
YAMUX_HEADER_FORMAT,
0,
TYPE_WINDOW_UPDATE,
FLAG_RST,
self.stream_id,
0,
)
await self.conn._write_frame(header)
except RawConnError as e:
logger.debug(f"Error sending RST, connection likely closed: {e}")
finally:
self.closed = True
self.send_closed = True
self.recv_closed = True
self.reset_received = True # Mark as reset5. Security Review
Overall security impact: Low (no new crypto or identity bypass identified). 6. Documentation and Examples
7. Newsfragment Requirement
Severity: BLOCKER for merge under stated py-libp2p release/process rules: add at least:
Content should be user-impact focused (throughput, interop with Go peers), one line or short paragraph each, ending with newline. 8. Tests and ValidationCommands run: Lint (
|
| Dimension | Rating |
|---|---|
| Quality | Good — direction and tests are strong; a few design details need tightening. |
| Security impact | None to Low |
| Merge readiness | Needs fixes — newsfragments + issue linkage are process blockers; PING/write-lock and auto-tune accounting should be addressed or explicitly signed off by maintainers. |
| Confidence | Medium — full suite green locally; remaining concerns are from code review (serialization gap, flow-control accounting). |
12. Spec and cross-implementation comparison
Sources used for this section (on the reviewer’s machine):
| Source | Path (reviewer workspace) |
|---|---|
| libp2p yamux spec (copy of Hashicorp framing rules) | /home/luca/PNL_Launchpad_Curriculum/Libp2p/specs/yamux/README.md |
| go-libp2p muxer adapter | /home/luca/PNL_Launchpad_Curriculum/Libp2p/go-libp2p/p2p/muxer/yamux/ — thin wrapper over github.com/libp2p/go-yamux/v5 (go.mod lists v5.0.1). Framing implementation: Go module cache, e.g. ~/go/pkg/mod/github.com/libp2p/go-yamux/v5@v5.0.0/ (exact patch version may differ after go mod download). |
| js-libp2p | @chainsafe/libp2p-yamux (e.g. js-libp2p/packages/integration-tests/package.json); sources: /home/luca/PNL_Launchpad_Curriculum/Libp2p/js-libp2p/node_modules/@chainsafe/libp2p-yamux/src/. |
| py-libp2p (this PR) | libp2p/stream_muxer/yamux/yamux.py (within this repo) |
Upstream canonical framing text is also linked from the spec: Hashicorp yamux spec.md.
12.1 What the libp2p yamux spec says
From specs/yamux/README.md (mirrored spec):
- Types:
0x0Data,0x1Window update,0x2Ping,0x3Go away. - Flags: SYN / ACK / FIN / RST may be sent with a data or window update message (not exclusively with Data).
- Length field: For Window update, length is a delta on the window. For Data, length is the payload size after the header.
- Opening streams: First frame may be data or window update with SYN; peer replies with data or window update with ACK or RST.
- Closing: Half-close uses data or window update with FIN; RST may accompany data or window update.
- Flow control: Default 256 KiB per stream; either side may advertise a larger window via window updates (including as part of SYN/ACK).
- Accounting: “Both sides should track the number of bytes sent in Data frames only” for window consumption.
Implication for the old py behavior: Sending SYN/ACK/FIN/RST as TYPE_DATA was not aligned with the spec’s “data or window update” wording for those control transitions, and it disagrees with how go-yamux and @chainsafe/libp2p-yamux encode those transitions (see below). The PR’s move to TYPE_WINDOW_UPDATE for those frames is spec-aligned.
12.2 go-libp2p / go-yamux v5
go-libp2p does not reimplement yamux framing; it wraps go-yamux/v5 (stream.go / conn.go only adapt errors and interfaces).
In go-yamux v5 (stream.go in the module cache):
- Window updates and stream opens use
typeWindowUpdatewith flags fromsendFlags()(SYN/ACK), andsendMsg→sendCh→ dedicatedsend()loop serializes all writes (including Ping and GoAway) on one path—no concurrent interleaving onnet.Conn.Write.
// sendWindowUpdate — FIN/RST/control window path
hdr := encode(typeWindowUpdate, flags, s.id, delta)
return s.session.sendMsg(hdr, nil, deadline)
// sendClose — FIN
hdr := encode(typeWindowUpdate, flags|flagFIN, s.id, 0)
return s.session.sendMsg(hdr, nil, nil)
// sendReset — RST
hdr := encode(typeWindowUpdate, flagRST, s.id, errCode)
return s.session.sendMsg(hdr, nil, nil)Auto-tuning uses recvBuf.GrowTo, RTT from the session, and now.Sub(epochStart) < rtt*4 to double the receive window up to MaxStreamWindowSize—the same family of behavior the PR ports to Python.
Compare to py PR: Matching TYPE_WINDOW_UPDATE + SYN/ACK/FIN/RST and adding _write_lock brings py closer to go-yamux’s single send path. The remaining gap is PING still calling secured_conn.write directly (§4), which go-yamux does not do.
12.3 js-libp2p / @chainsafe/libp2p-yamux
FrameType and Flag match the spec (frame.ts). Control-plane sends use WindowUpdate:
- Open stream:
newStream()ends withstream.sendWindowUpdate()— first frame is WindowUpdate with SYN (viagetSendFlags()), length = delta (initial grant), not a Data frame. - FIN / RST:
sendCloseWriteandsendResetuseFrameType.WindowUpdatewith FIN / RST andlength0 (same pattern as py PR).
muxer.ts — all outbound frames (including Ping) go through sendFrame, which pushes encoded headers (and data) onto the muxer’s source queue—effectively one ordered outbound pipeline consumed by the connection sink:
private sendFrame (header: FrameHeader, data?: Uint8ArrayList): void {
this.log?.trace('sending frame %o', header)
if (header.type === FrameType.Data) {
if (data === undefined) {
throw new InvalidFrameError('Invalid frame')
}
this.source.push(
new Uint8ArrayList(encodeHeader(header), data)
)
} else {
this.source.push(encodeHeader(header))
}
}stream.ts — sendWindowUpdate implements RTT-based doubling (rtt * 4, epochStart) and computes delta = recvWindow - recvWindowCapacity before emitting a WindowUpdate (different shape than py’s target_recv_window - recv_window + //2 hysteresis, but same high-level goal: batch updates and grow windows).
Compare to py PR: Frame types for open/close/reset match Chainsafe’s approach. Serialization: JS funnels Ping through sendFrame; py should do the equivalent via _write_frame. Auto-tune: worth line-by-line parity with sendWindowUpdate + go-yamux GrowTo, not only with PR comments.
12.4 Summary table (PR #1269 vs spec vs Go vs JS)
| Topic | libp2p spec (specs/yamux) |
go-yamux v5 (go-libp2p) | @chainsafe/libp2p-yamux (js-libp2p) | py-libp2p after #1269 |
|---|---|---|---|---|
| SYN / ACK / FIN / RST frame type | May use Data or Window update | Window update (+ flags) | Window update (+ flags) | Window update (+ flags) |
| Window length on update | Delta | Delta (GrowTo / encode) |
delta in sendWindowUpdate |
Delta in send_window_update; SYN/ACK carry DEFAULT_WINDOW_SIZE where intended |
| Ping type | Ping, stream ID 0 | typePing via sendMsg |
FrameType.Ping via sendFrame |
TYPE_PING — but write path bypasses _write_lock (issue in §4) |
| Serialize all mux writes | (implied: correct framing order) | Single send() loop |
Single source.push pipeline |
_write_lock + _write_frame for most paths |
| Initial / max window | 256 KiB default, configurable | initialStreamWindow / MaxStreamWindowSize |
INITIAL_STREAM_WINDOW / MAX_STREAM_WINDOW |
DEFAULT_WINDOW_SIZE / MAX_WINDOW_SIZE |
| Auto-tune / RTT | (not specified in detail) | GrowTo, rtt*4, epoch |
sendWindowUpdate, rtt*4, epoch |
_auto_tune_and_send_window_update, _measure_rtt_loop — verify against §4 |
- Send SYN/ACK with length=0 (matches go-yamux GrowTo delta behavior), fixing inflated send_window when go peer ADDs ACK length to initial 256K - Change SYN/ACK handlers from SET to ADD for send_window (matches go-yamux incrSendWindow) - Route PING through _write_frame to honor single-writer serialization - Restore ConnectionClosedError in FIN/RST exception handlers - Demote per-frame TX/RX logging from INFO to DEBUG - Rewrite _auto_tune_and_send_window_update with two-pass GrowTo logic matching go-yamux, remove unused bytes_consumed parameter - Add recv_window overflow guards (clamp to 0 with warning) - Add newsfragments for libp2p#1270 (feature) and libp2p#1271 (bugfix)
The two-pass GrowTo logic was overwriting pass 1 delta with pass 2 delta, causing the peer to receive only the incremental (pass 2) window update. This left the peer's send_window consistently short, degrading throughput and causing perf test timeouts during download. Change delta = extra_delta to delta += extra_delta so the peer receives the full window growth (pass 1 + pass 2).
Fix/yamux go interop
|
I have added fixes for the review. here is the perf PR libp2p/unified-testing#62 |
- python-v0.x x python-v0.x (tcp, noise, yamux)
- python-v0.x x python-v0.x (tcp, noise, mplex)
- python-v0.x x python-v0.x (ws, noise, yamux)
- python-v0.x x python-v0.x (ws, noise, mplex)
- python-v0.x x python-v0.x (ws, tls, yamux)
- python-v0.x x python-v0.x (ws, tls, mplex)
→ Total time: 00:20:19
✗ 6 test(s) failed |
The yamux window auto-tuning (256KB → 16MB) caused two failures: 1. Noise encrypt() rejects data > 65535 bytes. Fix: chunk writes into ≤65519-byte (65535 - 16 MAC) segments before encrypting, matching go-libp2p's noise/rw.go Write() approach. 2. Yamux frames sized by send_window (up to 16MB) overwhelm transports. Fix: cap per-frame payload at 64KB - 12 (header), matching go-yamux's MaxMessageSize default.
The chunking loop skipped empty messages (b"") which broke the Noise XX handshake msg#1. Use a fast path for messages ≤ MAX_PLAINTEXT_LENGTH (including empty) and only chunk larger messages.
|
This PR broke Noise and WS+TLS TLDR; |
|
@acul71 , @asabya : Wish if we could arrive at a good conclusion on this PR. Also, address libp2p/unified-testing#62 (comment) |
What was wrong?
yamux window update and auto stream size compatibility with go-yamux
Issue #
How was it fixed?
Cute Animal Picture