in go/pkg/client/cas.go [1057:1115]
func (c *Client) MissingBlobs(ctx context.Context, ds []digest.Digest) ([]digest.Digest, error) {
var batches [][]digest.Digest
var missing []digest.Digest
var resultMutex sync.Mutex
const maxQueryLimit = 10000
for len(ds) > 0 {
batchSize := maxQueryLimit
if len(ds) < maxQueryLimit {
batchSize = len(ds)
}
var batch []digest.Digest
for i := 0; i < batchSize; i++ {
batch = append(batch, ds[i])
}
ds = ds[batchSize:]
LogContextInfof(ctx, log.Level(3), "Created query batch of %d blobs", len(batch))
batches = append(batches, batch)
}
LogContextInfof(ctx, log.Level(3), "%d query batches created", len(batches))
eg, eCtx := errgroup.WithContext(ctx)
for i, batch := range batches {
i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
if err := c.casUploaders.Acquire(eCtx, 1); err != nil {
return err
}
defer c.casUploaders.Release(1)
if i%logInterval == 0 {
LogContextInfof(ctx, log.Level(3), "%d missing batches left to query", len(batches)-i)
}
var batchPb []*repb.Digest
for _, dg := range batch {
batchPb = append(batchPb, dg.ToProto())
}
req := &repb.FindMissingBlobsRequest{
InstanceName: c.InstanceName,
BlobDigests: batchPb,
}
resp, err := c.FindMissingBlobs(eCtx, req)
if err != nil {
return err
}
resultMutex.Lock()
for _, d := range resp.MissingBlobDigests {
missing = append(missing, digest.NewFromProtoUnvalidated(d))
}
resultMutex.Unlock()
if eCtx.Err() != nil {
return eCtx.Err()
}
return nil
})
}
LogContextInfof(ctx, log.Level(3), "Waiting for remaining query jobs")
err := eg.Wait()
LogContextInfof(ctx, log.Level(3), "Done")
return missing, err
}