drpc: enable stream multiplexing#51
drpc: enable stream multiplexing#51shubhamdhama wants to merge 11 commits intostream-multiplexingfrom
Conversation
|
I'll update tests in a separate PR and address some of the limitations in subsequent PRs. Keeping the PR size small for the ease of review. |
9155272 to
e14c876
Compare
553b574 to
aebdc90
Compare
aebdc90 to
d57f892
Compare
We are not using it and not planning to use it anytime soon, so till then it's just a burden to maintain.
Enable multiple concurrent streams over a single transport. This is the
foundational change that replaces the single-stream-at-a-time model with true
multiplexing, allowing clients and servers to run multiple RPCs concurrently on
a shared connection.
Background:
Previously, the Manager enforced single-stream semantics: a semaphore (sem)
allowed only one active stream, and each new stream had to wait for the previous
one to finish (waitForPreviousStream). Stream IDs were required to be
monotonically increasing (checkStreamMonotonicity), and a single PacketAssembler
was shared across all invoke sequences. This was simple and correct for
one-at-a-time RPCs but incompatible with multiplexing.
Structural changes:
Manager:
- Remove the semaphore (sem) and acquireSemaphore/waitForPreviousStream.
Multiple streams can now be created concurrently without blocking on each
other.
- Remove checkStreamMonotonicity. With multiplexing, frames from different
streams arrive interleaved; monotonicity is not meaningful.
- Remove lastFrameID/lastFrameKind tracking fields (only used by the
monotonicity check).
- Replace the single shared PacketAssembler with a per-stream invokesAssembler
map (map[uint64]*invokeAssembler). Each stream's invoke/metadata frame
sequence is assembled independently.
- Remove SoftCancel option (see error semantics below).
- Remove GetLatest from streamRegistry; manageReader now dispatches frames by
looking up the stream ID in the registry directly.
Server:
- ServeOne now spawns a goroutine per incoming RPC via sync.WaitGroup, rather
than handling RPCs sequentially. Errors from individual RPC handlers are
logged (via opts.Log) rather than terminating the connection.
Stream:
- NewWithOptions no longer calls wr.Reset() on the shared Writer. With
multiplexing, multiple streams share the same Writer; resetting it would
discard buffered frames from other streams.
- SendCancel no longer returns a (busy, error) tuple. It blocks on the stream's
write lock instead of returning busy=true when another write is in progress.
This guarantees the cancel frame is sent (or fails with an IO error), at the
cost of waiting for any in-progress write to finish. A future writer queue
will eliminate this blocking.
Error and cancellation semantics:
The central design principle is that the manageReader goroutine is the single
authority on transport health. It is the only goroutine that reads from the
transport, and if the transport fails, it will detect the failure and terminate
the connection. Write-side errors propagate to the caller but do not directly
terminate the connection; the reader will independently detect the broken
transport (since an IO write failure implies the transport is broken, and the
next read will also fail). This matches gRPC's approach: when loopyWriter
encounters an IO error, it does not close the connection. Instead, it relies on
the reader to detect the failure and clean up.
Error classification:
Connection-scoped (terminates all streams):
- Transport read error: manageReader fails to read a frame.
- Frame assembly error: corrupted wire data that cannot be parsed.
- Protocol error: e.g., receiving an invoke on an existing stream, or an
unknown non-control packet kind.
- Manager.Close(): explicit shutdown by the application.
Stream-scoped (only affects that stream):
- Application error: the RPC handler returns an error, which is sent via
SendError (KindError) and terminates only that stream.
- Remote close: receiving KindClose or KindCloseSend terminates or half-closes
only that stream.
- Remote cancel: receiving KindCancel terminates only that stream.
- Remote error: receiving KindError terminates only that stream.
- Write error (MsgSend, SendError, CloseSend, SendCancel): the error
propagates to the caller. The stream is terminated locally. The manageReader
goroutine will detect the transport failure on its next read and terminate
the connection.
Context cancellation:
When a stream's context is cancelled, manageStream:
1. Attempts to send a KindCancel frame (SendCancel). This blocks until any
in-progress write on that stream completes, then sends the cancel. If the
send fails (IO error), the error is logged. The reader will catch the
transport failure.
2. Cancels the stream locally (stream.Cancel), which terminates the stream and
causes any blocked Send/Recv to return the context error.
3. Waits for the stream to finish (stream.Finished).
The SoftCancel option is removed. Previously, SoftCancel=false would terminate
the entire connection when a stream's context was cancelled (calling m.terminate
if the stream wasn't already finished). With multiplexing, cancelling one stream
must never kill the connection. SoftCancel=true behavior (send cancel, then
cancel locally) is now the only behavior, simplified to always block for the
write lock rather than returning "busy" and falling back to a hard cancel.
Manager termination:
When the manager terminates (from any connection-scoped error), it closes the
transport and the stream registry. Each active stream's manageStream goroutine
detects termination via m.sigs.term, cancels its stream, and waits for it to
finish. Manager.Close() then waits for all stream goroutines (m.wg.Wait) and the
reader goroutine before returning.
Known limitations:
- The shared drpcwire.Writer is protected by a mutex. All streams serialize
their writes through this single writer.
- SendCancel blocks on the stream's write lock. If a stream has a large
in-progress write, the cancel is delayed.
- packetBuffer is single-slot (Put blocks until Get+Done). A slow consumer
stream blocks manageReader, stalling frame delivery to all streams. This needs
to be addressed with per-stream buffering or async delivery.
- Conn.Invoke() holds a mutex for the entire unary RPC duration, serializing
concurrent unary RPCs. Streaming RPCs (NewStream) are not affected.
Add Stream.WriteInvoke that writes InvokeMetadata and Invoke frames under a single write lock acquisition. This prevents SendCancel from slipping in between the two frames when a context is cancelled during stream setup. Without this, the following race is possible: 1. Client creates stream, starts manageStream goroutine. 2. doNewStream sends InvokeMetadata, releases write lock. 3. Context cancels. manageStream calls SendCancel, acquires write lock, sends KindCancel, terminates the stream. 4. doNewStream tries to send Invoke, sees stream terminated, returns error. The server receives InvokeMetadata then Cancel, but never the Invoke. It has no registered stream to cancel, so the Cancel is dropped and the partial invokeAssembler entry leaks until the connection closes. With WriteInvoke, SendCancel blocks until both frames are written. The server always sees a complete invoke before any cancel.
…s.Close Previously, when a manager terminated, each stream's manageStream goroutine independently detected m.sigs.term.Signal and cancelled its own stream. This was asynchronous. Instead, cancel all streams directly in activeStreams.Close(), called from Manager.terminate. This makes termination synchronous and immediate, and removes the term.Signal case from manageStream.
With multiplexing, multiple Invoke calls run concurrently. The shared c.wbuf buffer and its protecting mutex are replaced with per-call allocation. Stats collection is removed (unused by cockroach).
Fix compilation errors and remove tests for removed features: - Update New/NewWithOptions calls to include ManagerKind argument - Update activeStreams.Close calls to include error argument - Remove tests for SoftCancel, cross-stream monotonicity, Unblocked semantics, SendCancel busy return, and ForEach (all removed) - Adapt OldStreamFramesIgnored to use stream.Finished instead of Unblocked
The old packetBuffer was single-slot: Put blocked until the consumer called Get+Done. This meant manageReader was stuck delivering to one stream and couldn't serve others — a deadlock under multiplexing. Replace it with a ring-buffer packetQueue (capacity 256) that copies data on Put and returns immediately. Get drains queued messages before returning the close error, ensuring graceful shutdown delivers all buffered data.
Replace per-stream drpcwire.Writer with a shared MuxWriter that uses a dedicated drain goroutine. This decouples stream writes from transport I/O, enabling true concurrent stream multiplexing. Core Architecture: MuxWriter (drpcwire/mux_writer.go): - Single instance per Manager, shared across all streams - Dedicated goroutine continuously drains buffered frames to transport - Non-blocking WriteFrame: appends to buffer under lock, signals goroutine - Double-buffer swap (buf/spare) minimizes time spent under lock Stream changes (drpcstream/stream.go): - wr field: *drpcwire.Writer -> *drpcwire.MuxWriter - Removed: ManualFlush option, RawFlush, rawFlushLocked, checkRecvFlush - sendPacketLocked/rawWriteLocked: WriteFrame only, no Flush Manager integration (drpcmanager/manager.go): - Creates MuxWriter with onError: m.terminate - terminate(): wr.Stop() THEN tr.Close() — Stop makes WriteFrame reject immediately; transport close unblocks any in-flight Write in the drain goroutine - Close(): <-wr.Done() to wait for drain goroutine exit Key Design Decisions: 1. No explicit flush needed. The drain goroutine continuously pulls from the buffer. Natural batching occurs because appends accumulate while the goroutine is mid-Write. The old cork pattern (delay flush until first recv) is unnecessary — appending is a memcpy under lock, and the goroutine controls when transport I/O happens. 2. sync.Cond over channels. Signal coalescing: multiple WriteFrame calls while run() is in Write produce a single wakeup. No allocation overhead. Stop uses closed bool + Broadcast. Consistent with packetQueue. 3. Two-phase shutdown (Stop/Done split to avoid deadlock). Stop() is non-blocking: sets closed, Broadcast, returns immediately. Done() returns a channel that closes when run() exits. This split is critical for the onError path: run() -> Write fails -> sets closed -> onError -> terminate -> Stop (finds closed=true, noop) -> run() returns. If Stop blocked until run() exited, this path would self-join. 4. run() owns its lifecycle on write failure. When Write fails, run() sets closed=true itself before calling onError. The subsequent onError -> terminate -> Stop path finds closed already set. No coordination needed; the flag is idempotent. 5. No per-stream FrameWriter wrapper. Initially considered a per-stream FrameWriter wrapping *MuxWriter, but the only value was a closed check before append. That check lives in MuxWriter.WriteFrame directly. Streams hold *MuxWriter and call WriteFrame. What this unlocks: - Concurrent multiplexing: streams no longer serialize on writes - Simplified stream: all flush/cork complexity removed - Natural batching from continuous drain - Direct error propagation: transport write failures fire manager termination via onError callback Breaking changes: - drpcstream.Options.ManualFlush removed - Stream.RawFlush(), SetManualFlush() removed - Stream constructor: *drpcwire.Writer -> *drpcwire.MuxWriter Test coverage: 8 concurrency tests for MuxWriter covering concurrent WriteFrame, write errors, onError->Stop deadlock path, blocked Write unblocked by Close, concurrent Stop, abort semantics (Stop discards buffered data), and write-during-active-drain. A data race in the initial implementation (reading buf capacity without lock) was caught by these tests and fixed.
Fix a race condition where a unary RPC with a cancelled context could
return io.EOF instead of codes.Canceled. Two changes, mirroring how
gRPC handles this:
1. Early ctx.Err() check in NewClientStream before creating the stream.
2. Deferred stream.CheckCancelError in doInvoke to convert io.EOF to
the cancel error if the stream was cancelled mid-operation.
The problem:
With multiplexing, each stream gets a manageStream goroutine that
watches ctx.Done() and calls SendCancel + Cancel when the context is
cancelled. This races with doInvoke, which writes invoke and message
frames through the same stream. The race has three outcomes depending
on who acquires the stream's write lock first:
1. doInvoke wins the lock, completes all writes, and the RPC succeeds
even though it should have been cancelled.
2. SendCancel wins, sets send=io.EOF before doInvoke runs.
rawWriteLocked sees send.IsSet() and returns io.EOF. Invoke's
ToRPCErr passes io.EOF through unchanged, so the caller gets the
wrong error code.
3. doInvoke finishes writes, then MsgRecv sees the cancellation and
returns codes.Canceled. This is the correct outcome but only
happens by luck of timing.
Why this didn't happen before multiplexing:
The old single-stream manager used a non-blocking SendCancel that
returned (busy=true) when the write lock was held by an in-progress
write. With SoftCancel=false (the default), the fallback path was:
manageStream calls stream.Cancel(ctx.Err()). The stream is not
finished because doInvoke holds the write lock, so the manager calls
m.terminate(), which closes the entire transport. The in-flight
Writer.Write() fails with an IO error, and checkCancelError sees
cancel.IsSet() and returns context.Canceled.
The correct error surfaced, but through connection termination. This was
fine in single-stream mode where one stream is one connection. With
multiplexing, we cannot terminate the entire connection for one stream's
cancellation. The new SendCancel blocks on the write lock to guarantee
the cancel frame is sent, and that introduced this race.
How gRPC handles this (verified against grpc-go source):
gRPC uses two mechanisms. First, newAttemptLocked (stream.go:408) checks
cs.ctx.Err() before creating the transport stream. This catches the
already-cancelled case without allocating resources. Second, for unary
RPCs, csAttempt.sendMsg (stream.go:1092) swallows write errors and
returns nil when !cs.desc.ClientStreams. The real error always surfaces
from RecvMsg, which detects context cancellation via
recvBufferReader.readClient (transport.go:239) and returns
status.Error(codes.Canceled, ...). This means gRPC never returns io.EOF
from a unary RPC because it never short-circuits on a send error.
For streaming RPCs, gRPC returns io.EOF from Send() after cancel (the
stream is done for writing) and codes.Canceled from Recv() (the actual
reason). Our grpccompat tests confirm this by comparing gRPC and DRPC
error results for identical cancel scenarios.
Our fix:
Rather than restructuring doInvoke to swallow send errors like gRPC, we
use the stream's existing CheckCancelError mechanism.
NewClientStream checks ctx.Err() before creating the stream. This
mirrors gRPC's newAttemptLocked check and avoids wasting a stream ID,
spawning a goroutine, and allocating stream resources.
doInvoke defers stream.CheckCancelError on its return value. If any
operation in doInvoke fails because SendCancel won the write lock race
(returning io.EOF via the send signal), CheckCancelError replaces it
with the cancel signal's error (context.Canceled). This is the same
function the stream already uses internally for transport write
failures. CheckCancelError is exported (was checkCancelError) so that
doInvoke in the drpcconn package can call it.
On TOCTOU:
The NewClientStream check is technically TOCTOU: the context could be
cancelled immediately after the check passes. This is acceptable because
Go's context cancellation model is cooperative, not preemptive. The
context package provides Done() "for use in select statements," and
operations check at natural boundaries rather than continuously. The
standard library follows this pattern: http.Client.Do checks between
redirect hops, database/sql checks before query execution, and gRPC
checks in newAttemptLocked before creating the transport stream. If the
context is cancelled mid-operation, manageStream handles cleanup and the
deferred CheckCancelError corrects the error code.
d57f892 to
e1f642b
Compare
| } | ||
|
|
||
| // New returns a new cache. | ||
| func New() *Cache { return &Cache{} } |
There was a problem hiding this comment.
You don't have to remove it from server.go?
Lines 167 to 170 in f88990a
There was a problem hiding this comment.
Yes it's removed in a previous commit.
| pa drpcwire.PacketAssembler // assembles invoke/metadata frames into packets | ||
| // invokesAssembler is owned by the manageReader goroutine, used in | ||
| // handleInvokeFrame. | ||
| invokesAssembler map[uint64]*invokeAssembler |
There was a problem hiding this comment.
nit: not necessarily in this PR but we should find a better name to replace invokeAssembler.
There was a problem hiding this comment.
Agreed. I'll change either in this or next one.
| if err := curr.HandleFrame(incomingFrame); err != nil { | ||
| switch { | ||
| // if the packet is for an active stream, deliver it. | ||
| case ok && stream != nil: |
There was a problem hiding this comment.
stream != nil is redundant right?
There was a problem hiding this comment.
Right. It was added before I put the guarantee in activeStreams.
|
|
||
| // allow a new stream to begin. | ||
| m.sem.Recv() | ||
| if err := stream.SendCancel(ctx.Err()); err != nil { |
There was a problem hiding this comment.
In case of an IO error, we could very much close the connection. Are you logging it and let the reader to handle it for consistency with gRPC?
There was a problem hiding this comment.
Right. But I am not sure if it's a good move. I am inclined toward terminating the manager. I did some research here and TCP does not provide such guarantee that if an IO error happened on write will lead to read error as well.
What do you think?
| // the return result is only valid until the next call to NewClientStream or | ||
| // NewServerStream. | ||
| func (m *Manager) Unblocked() <-chan struct{} { | ||
| if prev := m.streams.GetLatest(); prev != nil { |
There was a problem hiding this comment.
Nice. This will always handout the same connection from the pool. Can you capture the behavior change (concurrent requests use the same connection from the pool) in the commit messages?
There was a problem hiding this comment.
Right. Actually, I think I will delete this method in my subsequent PRs (if I don't do that I will make the note you suggested). Reason to remove this would be that pool will try to enforce the one stream one connection mechanism by some other means. Likely this method's usage will be removed from there.
| // flight at termination time (fin fires once the last operation completes). | ||
| // Whichever call site runs last sees term set and both locks free, and sets fin. | ||
| func (s *Stream) checkFinished() { | ||
| if s.sigs.term.IsSet() && s.write.Unlocked() && s.read.Unlocked() { |
There was a problem hiding this comment.
I don't fully understand the details just yet. The IO could be message or cancel. If we are waiting for cancel it makes sense but if we are waiting for only message I don't know how this two phase shutdown is useful. Reviewing commit by commit may not be providing full picture. I'll revisit this commit once I'm done reviewing all the commits.
There was a problem hiding this comment.
Just to be on the same page: this "two phase shutdown" is not something introduced in PR. I wanted to document existing behavior of what checkFinished do and why it's important.
Either ways, I just want to note down, in the end if we still feel that we can or should eliminate checkFinished, I am all in for simplification but that would still be an improvement/clean-up/simplification. I would keep such clean-up beyond the scope of this PR.
There was a problem hiding this comment.
Now that I think more on this and after the arguments that I applied for read mutex, I agree with your argument during our in-person discussion.
Here's a version ready to post:
I think we can eliminate checkFinished (and the fin signal, and the inspectMutex types).
Details
checkFinished bridges a two-phase shutdown: term fires when termination arrives, fin fires when all in-flight operations have completed. The idea is that Finished() waits on fin, so manageStream doesn't exit until every read/write has unwound. But this guarantee isn't actually load-bearing.
When a cancel arrives while a write is in progress, checkFinished delays fin until that write returns. But it doesn't prevent the write from completing. The goroutine is already past the send/term checks and inside WriteFrame. That write goes through regardless. All checkFinished does is delay when manageStream exits, and nothing after Finished() relies on exclusive access to the stream. manageStream just does streams.Remove and wg.Done(), neither of which touches the stream.
If we collapse fin into term (i.e. Finished() returns term.Signal()), the only observable difference is that manageStream exits while the write goroutine is still unwinding. The goroutine checks send/term on the next loop iteration, returns an error, and exits on its own. No correctness issue.
One side benefit: s.ctx.sig.Set(context.Canceled) currently fires with fin (after all ops complete). Moving it into terminate means the stream's context is canceled immediately, so handler code selecting on stream.Context().Done() reacts faster.
The read inspectMutex can also go. Reads are never concurrent at the stream level (only the application goroutine calls Recv), and pbuf.Close in terminate already synchronizes with any in-progress Get/Done pair. The write inspectMutex can be downgraded to a plain sync.Mutex since we no longer need the Unlocked() observability.
| mu sync.Mutex | ||
| cond sync.Cond | ||
|
|
||
| buf [][]byte // ring buffer of byte slices |
There was a problem hiding this comment.
I'm debating between keeping this as []byte vs using packet instead.
There was a problem hiding this comment.
I'm inclined toward []byte. If we have some argument that would tip the scale to the other side.
| // (manageReader) from the consumer (RPC handler), preventing a slow handler | ||
| // from blocking frame delivery to other streams. | ||
| // | ||
| // TODO: benchmark whether power-of-2 masking improves performance over modulo. |
There was a problem hiding this comment.
Yes. If it is in the hot path, we should avoid modulo.
There was a problem hiding this comment.
Right. I want to start with a readable code and run some perf benchmarks to justify the tradeoff. I'm not creating an issue for it because chances are we may not need it but keeping it as a TODO to ensure we don't forget this possibility either.
|
|
||
| // Done advances the read pointer, making the slot available for reuse. | ||
| // It must be called exactly once after each successful Get. | ||
| func (pq *packetQueue) Done() { |
There was a problem hiding this comment.
The semantics of the queue look odd to me. Any reason to do so?
There was a problem hiding this comment.
Now I think about it, it's not a "queue". It's a circular buffer.
It need to expose three APIs,
- Put (Push, Write)
- Get (Read)
- Done (or TailAdvance)
The semantic here is, if you do a Get, you sort of take a lock on the tail and when you do Done(), you release the lock on tail and its data is "consumed" so that buffer can be reused for future operations.
I asked Mr. Opus about this and I liked his suggestion. TLDR; I'm thinking of renaming it to ringBuffer since here we have nothing related to a packet. This structure is pretty much agnostic to packet (but keep the variable name as packetRing or packetBuffer).
And change the APIs to,
Put(data)
Acquire() → (data) // "I'm borrowing this slot"
Release() // "I'm done, slot is yours again"
Claude response
**What the data structure actually is**
A bounded, single-producer/single-consumer ring buffer where the consumer borrows a reference to the tail slot (zero-copy read) and must explicitly release it. The held flag exists precisely because the slot can't be reused until the consumer says it's done with the data. This is not a queue — a queue's dequeue is atomic (pop and you're done). Here, the read is a lease.
Naming options
| Name | Precedent / Rationale | Downside |
|---|---|---|
ringBuffer |
Exactly what it is structurally. Go stdlib, Prometheus, and most systems code use this. | Generic — doesn't convey the borrow semantics |
packetRing |
Domain-specific ring buffer. Linux kernel uses "ring" for this pattern (ring_buffer, io_uring). |
Fine, but packet might be overly specific if data is just []byte |
byteRing |
Emphasizes the payload type | Loses the "bounded, blocking" aspect |
slotBuffer |
Emphasizes the slot-based borrow/release pattern | Not a widely recognized term |
My preference: packetRing if it stays packet-specific, or just ringBuffer if you want the standard name. The two-phase read is a detail of the API, not the identity of the data structure.
API naming
This is where the bigger improvement lives. Your current API:
Put(data) // produce
Get() → (data) // borrow
Done() // release
The problem: Get and Done are generic names that don't hint at the contract between them. A reader has to understand that Get returns a borrowed reference and Done releases it. Two alternatives:
Option A — Acquire/Release vocabulary (systems programming convention):
Put(data)
Acquire() → (data) // "I'm borrowing this slot"
Release() // "I'm done, slot is yours again"
This is the most semantically precise. sync.Pool uses Get/Put, but that's a different pattern. The acquire/release pair from lock-free programming maps directly: you acquire exclusive access to the tail slot's data, then release it.
Option B — Peek/Consume vocabulary (queue/buffer convention):
Put(data)
Peek() → (data) // look at head of queue without removing
Consume() // advance past it
Disruptor pattern, some Java concurrent queues. Slightly less precise because Peek traditionally means "look but don't touch" with no obligation to Consume, whereas here the contract is mandatory.
Option C — Keep Get, rename Done:
Put(data)
Get() → (data)
Advance() // or Commit()
Minimal rename. Advance makes the "move the tail pointer" action explicit. Commit has transaction semantics (you're committing to having consumed the item).
| } | ||
| if pq.count == 0 { | ||
| // Queue is empty and closed — return the close error. | ||
| return nil, pq.err |
There was a problem hiding this comment.
You don't need broadcast here?
There was a problem hiding this comment.
I don't think so. We have not consumed anything to awakening the Put won't do anything meaningful.
shubhamdhama
left a comment
There was a problem hiding this comment.
Thanks for really high quality review. I will address the feedback where we are on the same page and wait for your response where we don't.
| if err := s.handleRPC(stream, rpc); err != nil { | ||
| return errs.Wrap(err) | ||
| } | ||
| // TODO: add worker pool |
|
|
||
| man := drpcmanager.NewWithOptions(tr, s.opts.Manager) | ||
| man := drpcmanager.NewWithOptions(tr, drpcmanager.Server, s.opts.Manager) | ||
| var wg sync.WaitGroup |
There was a problem hiding this comment.
The way I see it, manageStream is Manager's implementation detail. It is Manager's, responsibility to ensure all the goroutines its spawning are finished when it's Closeed.
This wg is to ensure all the handleRPC goroutines are finished.
This WaitGroup and the other one maybe correlated but serve purpose at different layers.
A manageStream and handleRPC can have different lifetimes.
I started doing some mental exercise below. TLDR; is I think the current model is solid.
Let's take a RPC handler for example,
func RPCHandler()
- some logic
- Send()
- some logic
- Recv()
- some logic
And here is a timeline where manageStream exits before RPCHandler,
- RPCHandler is at "3. some logic"
- context get cancelled
- manageStream detects the context cancellation
- manageStream cancels the stream and exit (finished).
- RPCHandler moves to "4" and detects stream is done and exits (finished).
Now here is an example where RPCHandler exits before manageStream,
- RPCHandler is "5". Decides to exit without doing stream.Close or anything.
- drpcmux/handle_rpc.go#L73: does a CloseSend
- handleRPC has exited (finished)
- Client replies with a CloseSend and stream is finished
- manageStream detects stream is finished and exits (finished)
Let's take one case where server context is cancelled?
- server context is cancelled
- NewServerStream exits
- for loop stops
- defer waits for all the handleRPC goroutines to be finished
- some handleRPC will exit seeing the context cancelled
- other handleRPC will be waiting on some stream operation (like Send or Recv). These operation can exit only when manageStream detects the cancellation. manageStream detects the cancellation and cancels the stream. Let's take one manageStream as an example.
- manageStream marks the stream signals as done. But not exited yet.
- stuck handleRPC's Send/Recv completes with an error and handleRPC exits
- wg.Wait() in ServeOne exits
- manageStream has still not finished
- man.Close() of defer executes
- man.Close waits for all its goroutines to finish including manageStream
- manageStream exits and everything is fine again
| // (manageReader) from the consumer (RPC handler), preventing a slow handler | ||
| // from blocking frame delivery to other streams. | ||
| // | ||
| // TODO: benchmark whether power-of-2 masking improves performance over modulo. |
There was a problem hiding this comment.
Right. I want to start with a readable code and run some perf benchmarks to justify the tradeoff. I'm not creating an issue for it because chances are we may not need it but keeping it as a TODO to ensure we don't forget this possibility either.
| } | ||
| if pq.count == 0 { | ||
| // Queue is empty and closed — return the close error. | ||
| return nil, pq.err |
There was a problem hiding this comment.
I don't think so. We have not consumed anything to awakening the Put won't do anything meaningful.
|
|
||
| // Done advances the read pointer, making the slot available for reuse. | ||
| // It must be called exactly once after each successful Get. | ||
| func (pq *packetQueue) Done() { |
There was a problem hiding this comment.
Now I think about it, it's not a "queue". It's a circular buffer.
It need to expose three APIs,
- Put (Push, Write)
- Get (Read)
- Done (or TailAdvance)
The semantic here is, if you do a Get, you sort of take a lock on the tail and when you do Done(), you release the lock on tail and its data is "consumed" so that buffer can be reused for future operations.
I asked Mr. Opus about this and I liked his suggestion. TLDR; I'm thinking of renaming it to ringBuffer since here we have nothing related to a packet. This structure is pretty much agnostic to packet (but keep the variable name as packetRing or packetBuffer).
And change the APIs to,
Put(data)
Acquire() → (data) // "I'm borrowing this slot"
Release() // "I'm done, slot is yours again"
Claude response
**What the data structure actually is**
A bounded, single-producer/single-consumer ring buffer where the consumer borrows a reference to the tail slot (zero-copy read) and must explicitly release it. The held flag exists precisely because the slot can't be reused until the consumer says it's done with the data. This is not a queue — a queue's dequeue is atomic (pop and you're done). Here, the read is a lease.
Naming options
| Name | Precedent / Rationale | Downside |
|---|---|---|
ringBuffer |
Exactly what it is structurally. Go stdlib, Prometheus, and most systems code use this. | Generic — doesn't convey the borrow semantics |
packetRing |
Domain-specific ring buffer. Linux kernel uses "ring" for this pattern (ring_buffer, io_uring). |
Fine, but packet might be overly specific if data is just []byte |
byteRing |
Emphasizes the payload type | Loses the "bounded, blocking" aspect |
slotBuffer |
Emphasizes the slot-based borrow/release pattern | Not a widely recognized term |
My preference: packetRing if it stays packet-specific, or just ringBuffer if you want the standard name. The two-phase read is a detail of the API, not the identity of the data structure.
API naming
This is where the bigger improvement lives. Your current API:
Put(data) // produce
Get() → (data) // borrow
Done() // release
The problem: Get and Done are generic names that don't hint at the contract between them. A reader has to understand that Get returns a borrowed reference and Done releases it. Two alternatives:
Option A — Acquire/Release vocabulary (systems programming convention):
Put(data)
Acquire() → (data) // "I'm borrowing this slot"
Release() // "I'm done, slot is yours again"
This is the most semantically precise. sync.Pool uses Get/Put, but that's a different pattern. The acquire/release pair from lock-free programming maps directly: you acquire exclusive access to the tail slot's data, then release it.
Option B — Peek/Consume vocabulary (queue/buffer convention):
Put(data)
Peek() → (data) // look at head of queue without removing
Consume() // advance past it
Disruptor pattern, some Java concurrent queues. Slightly less precise because Peek traditionally means "look but don't touch" with no obligation to Consume, whereas here the contract is mandatory.
Option C — Keep Get, rename Done:
Put(data)
Get() → (data)
Advance() // or Commit()
Minimal rename. Advance makes the "move the tail pointer" action explicit. Commit has transaction semantics (you're committing to having consumed the item).
|
|
||
| // allow a new stream to begin. | ||
| m.sem.Recv() | ||
| if err := stream.SendCancel(ctx.Err()); err != nil { |
There was a problem hiding this comment.
Right. But I am not sure if it's a good move. I am inclined toward terminating the manager. I did some research here and TCP does not provide such guarantee that if an IO error happened on write will lead to read error as well.
What do you think?
| // the return result is only valid until the next call to NewClientStream or | ||
| // NewServerStream. | ||
| func (m *Manager) Unblocked() <-chan struct{} { | ||
| if prev := m.streams.GetLatest(); prev != nil { |
There was a problem hiding this comment.
Right. Actually, I think I will delete this method in my subsequent PRs (if I don't do that I will make the note you suggested). Reason to remove this would be that pool will try to enforce the one stream one connection mechanism by some other means. Likely this method's usage will be removed from there.
| } | ||
|
|
||
| man := drpcmanager.NewWithOptions(tr, s.opts.Manager) | ||
| defer func() { err = errs.Combine(err, man.Close()) }() |
There was a problem hiding this comment.
Cool. I will ensure I resolve this commit when I'm merging this PR.
Either, I will squash this commit with the other one. OR, I will move this change to previous commit.
| // | ||
| // Client streams return Unavailable errors when the remote closes the | ||
| // connection, while server streams return Canceled errors. | ||
| Kind() StreamKind |
There was a problem hiding this comment.
Any reason for this? Otherwise it would be just a dead-code.
| assert.That(t, errors.Is(err, io.EOF)) | ||
| } | ||
|
|
||
| func TestConcurrentStreams(t *testing.T) { |
There was a problem hiding this comment.
I will keep this in mind, I have some mixed thoughts here. I would love to have things deterministic but just to be on the same page (and I know you would agree with me on this) unless we have a deterministic runtime/operating system, we may still not be able to simulate a failure. But I see your point of increasing the surface of determinism.
🚨 Most of the commits will be squashed and merged because they are not completely atomic.
drpcmanager: remove InactivityTimeout optionWe are not using it and not planning to use it anytime soon, so till then
it's just a burden to maintain.
drpc: enable stream multiplexingEnable multiple concurrent streams over a single transport. This is the
foundational change that replaces the single-stream-at-a-time model with true
multiplexing, allowing clients and servers to run multiple RPCs concurrently on
a shared connection.