Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ type diskCache struct {
maxBlobSize int64
maxProxyBlobSize int64
accessLogger *log.Logger
containsQueue chan proxyCheck

// Limit the number of simultaneous proxy Contains checks.
containsSem *semaphore.Weighted

// Limit the number of simultaneous file removals.
fileRemovalSem *semaphore.Weighted
Expand Down
78 changes: 39 additions & 39 deletions cache/disk/findmissing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (

pb "github.com/buchgr/bazel-remote/v2/genproto/build/bazel/remote/execution/v2"

"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// maxConcurrentContainsChecks bounds the number of simultaneous proxy
// Contains lookups across all FindMissingBlobs requests.
const maxConcurrentContainsChecks = 512

type proxyCheck struct {
wg *sync.WaitGroup
digest **pb.Digest
Expand Down Expand Up @@ -109,26 +114,27 @@ func (c *diskCache) findMissingCasBlobsInternal(ctx context.Context, blobs []*pb
continue
}

// Adding to the containsQueue channel may have blocked on a previous iteration,
// so check to see if the context has cancelled.
select {
case <-ctx.Done():
// Acquire blocks while maxConcurrentContainsChecks checks are
// in flight, and returns early if the context is cancelled.
if err := c.containsSem.Acquire(ctx, 1); err != nil {
if cancelledDueToFailFast {
return errMissingBlob
}
return errRequestCancelled
default:
}

wg.Add(1)
c.containsQueue <- proxyCheck{
go func(req proxyCheck) {
defer c.containsSem.Release(1)
c.processContainsCheck(req)
}(proxyCheck{
wg: &wg,
digest: &chunk[i],
ctx: ctx,
// When failFast is true, onProxyMiss will have been set to a function that
// will cancel the context, causing the remaining proxyChecks to short-circuit.
onProxyMiss: cancelContextForFailFast,
}
})
}
}
}
Expand Down Expand Up @@ -206,43 +212,37 @@ func (c *diskCache) findMissingLocalCAS(blobs []*pb.Digest) int {
return missing
}

func (c *diskCache) containsWorker() {
var ok bool
for req := range c.containsQueue {
if req.ctx != nil {
select {
case <-req.ctx.Done():
// Fast-fail if the context has already been cancelled.
c.accessLogger.Printf("GRPC CAS HEAD %s CANCELLED", (*req.digest).Hash)
req.wg.Done()
continue
default:
}
// processContainsCheck performs a single proxy Contains lookup and calls
// req.wg.Done exactly once.
func (c *diskCache) processContainsCheck(req proxyCheck) {
defer req.wg.Done()

if req.ctx != nil {
select {
case <-req.ctx.Done():
// Fast-fail if the context has already been cancelled.
c.accessLogger.Printf("GRPC CAS HEAD %s CANCELLED", (*req.digest).Hash)
return
default:
}
}

ok, _ = c.proxy.Contains(req.ctx, cache.CAS, (*req.digest).Hash, (*req.digest).SizeBytes)
if ok {
c.accessLogger.Printf("GRPC CAS HEAD %s OK", (*req.digest).Hash)
// The blob exists on the proxy, remove it from the
// list of missing blobs.
*(req.digest) = nil
} else {
c.accessLogger.Printf("GRPC CAS HEAD %s NOT FOUND", (*req.digest).Hash)
if req.onProxyMiss != nil {
req.onProxyMiss()
}
ok, _ := c.proxy.Contains(req.ctx, cache.CAS, (*req.digest).Hash, (*req.digest).SizeBytes)
if ok {
c.accessLogger.Printf("GRPC CAS HEAD %s OK", (*req.digest).Hash)
// The blob exists on the proxy, remove it from the
// list of missing blobs.
*(req.digest) = nil
} else {
c.accessLogger.Printf("GRPC CAS HEAD %s NOT FOUND", (*req.digest).Hash)
if req.onProxyMiss != nil {
req.onProxyMiss()
}
req.wg.Done()
}
}

func (c *diskCache) spawnContainsQueueWorkers() {
// TODO: make these configurable?
const queueSize = 2048
const numWorkers = 512

c.containsQueue = make(chan proxyCheck, queueSize)
for i := 0; i < numWorkers; i++ {
go c.containsWorker()
func (c *diskCache) initContainsCheckLimiter() {
if c.containsSem == nil {
c.containsSem = semaphore.NewWeighted(maxConcurrentContainsChecks)
}
}
Loading
Loading