From cd6ef42c8722a1fe6326ae9a41791fc7925f2406 Mon Sep 17 00:00:00 2001 From: lance-tan-ai Date: Tue, 24 Feb 2026 09:33:34 -0800 Subject: [PATCH 1/5] grpcproxy: set finish_write=true on final ByteStream WriteRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ByteStream Write protocol requires the last WriteRequest to have finish_write=true to signal that the upload is complete. Without it, servers that strictly enforce the spec (e.g. EngFlow) reject the write with: "Request completed before all data was sent: finished_writing=false". The previous loop only called CloseAndRecv() after reading n==0, which closes the gRPC stream client-side but never sends a message with finish_write=true. io.Reader is also permitted to return n>0 alongside io.EOF in the same call, so the last data chunk could be sent without the finish flag and the n==0 branch might never be reached. Fix: track whether the most recent Read returned io.EOF (finishWrite), set FinishWrite on the WriteRequest for that chunk, and call CloseAndRecv immediately after — handling both the "EOF with data" and "EOF alone" cases. Co-authored-by: Cursor --- cache/grpcproxy/grpcproxy.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/cache/grpcproxy/grpcproxy.go b/cache/grpcproxy/grpcproxy.go index 568fefdf0..2ca21b813 100644 --- a/cache/grpcproxy/grpcproxy.go +++ b/cache/grpcproxy/grpcproxy.go @@ -167,15 +167,22 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { firstIteration := true for { - n, err := item.Rc.Read(buf) - if err != nil && err != io.EOF { - logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) + n, readErr := item.Rc.Read(buf) + if readErr != nil && readErr != io.EOF { + logResponse(r.errorLogger, "Write", readErr.Error(), item.Kind, item.Hash) err := stream.CloseSend() if err != nil { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) } return } + + // The ByteStream Write protocol requires finish_write=true on the + // last WriteRequest. We set it when the reader signals EOF, whether + // that comes with the last data chunk (n>0, readErr==io.EOF) or as + // a standalone termination (n==0, readErr==io.EOF). + finishWrite := readErr == io.EOF + if n > 0 { rn := "" if firstIteration { @@ -185,14 +192,16 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { req := &bs.WriteRequest{ ResourceName: rn, Data: buf[:n], + FinishWrite: finishWrite, } - err := stream.Send(req) - if err != nil { + if err := stream.Send(req); err != nil { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) return } - } else { - _, err = stream.CloseAndRecv() + } + + if finishWrite { + _, err := stream.CloseAndRecv() if err != nil { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) return From 615caf2157d53077686e50e01004f4bf6ea5d547 Mon Sep 17 00:00:00 2001 From: lance-tan-ai Date: Tue, 24 Feb 2026 09:50:42 -0800 Subject: [PATCH 2/5] grpcproxy: send explicit FinishWrite message when EOF arrives alone MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous fix set FinishWrite=true on the data chunk when Read returned (n>0, io.EOF) simultaneously, but missed the case where Read returns (n>0, nil) for the last chunk and then (0, io.EOF) on the following call — which is the more common io.Reader behaviour. In that case the data was fully sent across previous iterations but no message with FinishWrite=true was ever transmitted; CloseAndRecv just closed the stream client-side, leaving the server reporting finished_writing=false. Fix: when finishWrite is true and n==0, send a zero-data WriteRequest with FinishWrite=true before calling CloseAndRecv so the server always receives an explicit completion signal. Co-authored-by: Cursor --- cache/grpcproxy/grpcproxy.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/cache/grpcproxy/grpcproxy.go b/cache/grpcproxy/grpcproxy.go index 2ca21b813..af0380160 100644 --- a/cache/grpcproxy/grpcproxy.go +++ b/cache/grpcproxy/grpcproxy.go @@ -201,6 +201,25 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { } if finishWrite { + if n == 0 { + // All data was sent in previous iterations without FinishWrite. + // io.Reader may return (n>0, nil) for the last chunk then + // (0, io.EOF) on the next call. Send a zero-data terminal + // message so the server sees finish_write=true. + rn := "" + if firstIteration { + firstIteration = false + rn = resourceName + } + req := &bs.WriteRequest{ + ResourceName: rn, + FinishWrite: true, + } + if err := stream.Send(req); err != nil { + logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) + return + } + } _, err := stream.CloseAndRecv() if err != nil { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) From 17c196b7e92ae97e0b2f9f2ae6496d78485b09d1 Mon Sep 17 00:00:00 2001 From: lance-tan-ai Date: Tue, 24 Feb 2026 09:58:55 -0800 Subject: [PATCH 3/5] grpcproxy: set write_offset correctly on all WriteRequests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ByteStream Write protocol requires each WriteRequest to carry the byte offset of its data relative to the start of the resource. Without it the server rejects the terminal finish_write=true message with: write_offset: Current write_offset (0) does not match expected one () Track a running writeOffset, increment it after each data chunk, and pass it on every WriteRequest — including the zero-data terminal message sent when io.Reader returns (0, io.EOF) separately from the last chunk. Co-authored-by: Cursor --- cache/grpcproxy/grpcproxy.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cache/grpcproxy/grpcproxy.go b/cache/grpcproxy/grpcproxy.go index af0380160..c6c918c46 100644 --- a/cache/grpcproxy/grpcproxy.go +++ b/cache/grpcproxy/grpcproxy.go @@ -166,6 +166,7 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { resourceName := fmt.Sprintf(template, uuid.New().String(), item.Hash, item.LogicalSize) firstIteration := true + writeOffset := int64(0) for { n, readErr := item.Rc.Read(buf) if readErr != nil && readErr != io.EOF { @@ -191,6 +192,7 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { } req := &bs.WriteRequest{ ResourceName: rn, + WriteOffset: writeOffset, Data: buf[:n], FinishWrite: finishWrite, } @@ -198,6 +200,7 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) return } + writeOffset += int64(n) } if finishWrite { @@ -205,7 +208,8 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { // All data was sent in previous iterations without FinishWrite. // io.Reader may return (n>0, nil) for the last chunk then // (0, io.EOF) on the next call. Send a zero-data terminal - // message so the server sees finish_write=true. + // message with the correct write_offset so the server sees + // finish_write=true at the right position. rn := "" if firstIteration { firstIteration = false @@ -213,6 +217,7 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { } req := &bs.WriteRequest{ ResourceName: rn, + WriteOffset: writeOffset, FinishWrite: true, } if err := stream.Send(req); err != nil { From 00ac0b10f2d29a2ba19ee088aa2bd8b9d3fb4986 Mon Sep 17 00:00:00 2001 From: lance-tan-ai Date: Tue, 24 Feb 2026 10:51:43 -0800 Subject: [PATCH 4/5] grpcproxy: pass size_bytes through on AC GetActionResult proxy requests Get() received the action digest size as the size parameter but the AC branch ignored it, always sending SizeBytes=-1 to the upstream. Servers that require a valid digest (e.g. EngFlow) reject this with: action_digest: invalid or missing action digest Fix: pass size through directly; it comes from the client's original GetActionResult request and is always the correct action digest size. Co-authored-by: Cursor --- cache/grpcproxy/grpcproxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/grpcproxy/grpcproxy.go b/cache/grpcproxy/grpcproxy.go index c6c918c46..5ae4fa086 100644 --- a/cache/grpcproxy/grpcproxy.go +++ b/cache/grpcproxy/grpcproxy.go @@ -299,7 +299,7 @@ func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, ha case cache.AC: digest := pb.Digest{ Hash: hash, - SizeBytes: -1, + SizeBytes: size, } req := &pb.GetActionResultRequest{ActionDigest: &digest} From ffc66a3cd6358cb40c7738ab3028f6c2d8a976f4 Mon Sep 17 00:00:00 2001 From: lance-tan-ai Date: Tue, 24 Feb 2026 11:19:05 -0800 Subject: [PATCH 5/5] fix AC proxy reads: thread action digest size_bytes via context grpc_ac.go passes -1 for the ActionResult size (unknown at request time), but the same -1 was also being used as the action digest size_bytes in the proxy's GetActionResult call, which gRPC remote caches reject as invalid. Thread the action digest size_bytes via context so the proxy can construct a correct Digest without changing the Proxy.Get interface. Co-authored-by: Cursor --- cache/cache.go | 8 ++++++++ cache/grpcproxy/grpcproxy.go | 8 +++++++- server/grpc_ac.go | 4 ++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/cache/cache.go b/cache/cache.go index ce5760448..de9954abd 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -7,6 +7,14 @@ import ( "io" ) +type contextKey int + +const ( + // ActionDigestSizeBytesKey carries the action digest size_bytes to the proxy, + // which needs it to issue a valid GetActionResult request. + ActionDigestSizeBytesKey contextKey = iota +) + // EntryKind describes the kind of cache entry type EntryKind int diff --git a/cache/grpcproxy/grpcproxy.go b/cache/grpcproxy/grpcproxy.go index 5ae4fa086..66dd3a056 100644 --- a/cache/grpcproxy/grpcproxy.go +++ b/cache/grpcproxy/grpcproxy.go @@ -297,9 +297,15 @@ func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, ha // is enabled. We can treat them as AC in this scope fallthrough case cache.AC: + actionDigestSize := int64(-1) + if v := ctx.Value(cache.ActionDigestSizeBytesKey); v != nil { + if sz, ok := v.(int64); ok { + actionDigestSize = sz + } + } digest := pb.Digest{ Hash: hash, - SizeBytes: size, + SizeBytes: actionDigestSize, } req := &pb.GetActionResultRequest{ActionDigest: &digest} diff --git a/server/grpc_ac.go b/server/grpc_ac.go index 5cbefa981..a2b9b5e4a 100644 --- a/server/grpc_ac.go +++ b/server/grpc_ac.go @@ -70,6 +70,10 @@ func (s *grpcServer) GetActionResult(ctx context.Context, // checked by the the disk cache. const unknownActionResultSize = -1 + // Thread the action digest size_bytes to the proxy via context; the + // Proxy.Get interface only carries the (unknown) ActionResult size. + ctx = context.WithValue(ctx, cache.ActionDigestSizeBytesKey, req.ActionDigest.SizeBytes) + if !s.depsCheck { logPrefix = "GRPC AC GET NODEPSCHECK"