diff --git a/cache/cache.go b/cache/cache.go index ce5760448..de9954abd 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -7,6 +7,14 @@ import ( "io" ) +type contextKey int + +const ( + // ActionDigestSizeBytesKey carries the action digest size_bytes to the proxy, + // which needs it to issue a valid GetActionResult request. + ActionDigestSizeBytesKey contextKey = iota +) + // EntryKind describes the kind of cache entry type EntryKind int diff --git a/cache/grpcproxy/grpcproxy.go b/cache/grpcproxy/grpcproxy.go index 568fefdf0..66dd3a056 100644 --- a/cache/grpcproxy/grpcproxy.go +++ b/cache/grpcproxy/grpcproxy.go @@ -166,16 +166,24 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { resourceName := fmt.Sprintf(template, uuid.New().String(), item.Hash, item.LogicalSize) firstIteration := true + writeOffset := int64(0) for { - n, err := item.Rc.Read(buf) - if err != nil && err != io.EOF { - logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) + n, readErr := item.Rc.Read(buf) + if readErr != nil && readErr != io.EOF { + logResponse(r.errorLogger, "Write", readErr.Error(), item.Kind, item.Hash) err := stream.CloseSend() if err != nil { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) } return } + + // The ByteStream Write protocol requires finish_write=true on the + // last WriteRequest. We set it when the reader signals EOF, whether + // that comes with the last data chunk (n>0, readErr==io.EOF) or as + // a standalone termination (n==0, readErr==io.EOF). + finishWrite := readErr == io.EOF + if n > 0 { rn := "" if firstIteration { @@ -184,15 +192,40 @@ func (r *remoteGrpcProxyCache) UploadFile(item backendproxy.UploadReq) { } req := &bs.WriteRequest{ ResourceName: rn, + WriteOffset: writeOffset, Data: buf[:n], + FinishWrite: finishWrite, } - err := stream.Send(req) - if err != nil { + if err := stream.Send(req); err != nil { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) return } - } else { - _, err = stream.CloseAndRecv() + writeOffset += int64(n) + } + + if finishWrite { + if n == 0 { + // All data was sent in previous iterations without FinishWrite. + // io.Reader may return (n>0, nil) for the last chunk then + // (0, io.EOF) on the next call. Send a zero-data terminal + // message with the correct write_offset so the server sees + // finish_write=true at the right position. + rn := "" + if firstIteration { + firstIteration = false + rn = resourceName + } + req := &bs.WriteRequest{ + ResourceName: rn, + WriteOffset: writeOffset, + FinishWrite: true, + } + if err := stream.Send(req); err != nil { + logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) + return + } + } + _, err := stream.CloseAndRecv() if err != nil { logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash) return @@ -264,9 +297,15 @@ func (r *remoteGrpcProxyCache) Get(ctx context.Context, kind cache.EntryKind, ha // is enabled. We can treat them as AC in this scope fallthrough case cache.AC: + actionDigestSize := int64(-1) + if v := ctx.Value(cache.ActionDigestSizeBytesKey); v != nil { + if sz, ok := v.(int64); ok { + actionDigestSize = sz + } + } digest := pb.Digest{ Hash: hash, - SizeBytes: -1, + SizeBytes: actionDigestSize, } req := &pb.GetActionResultRequest{ActionDigest: &digest} diff --git a/server/grpc_ac.go b/server/grpc_ac.go index 5cbefa981..a2b9b5e4a 100644 --- a/server/grpc_ac.go +++ b/server/grpc_ac.go @@ -70,6 +70,10 @@ func (s *grpcServer) GetActionResult(ctx context.Context, // checked by the the disk cache. const unknownActionResultSize = -1 + // Thread the action digest size_bytes to the proxy via context; the + // Proxy.Get interface only carries the (unknown) ActionResult size. + ctx = context.WithValue(ctx, cache.ActionDigestSizeBytesKey, req.ActionDigest.SizeBytes) + if !s.depsCheck { logPrefix = "GRPC AC GET NODEPSCHECK"