in go/pkg/client/cas.go [690:754]
func (c *Client) BatchDownloadBlobs(ctx context.Context, dgs []digest.Digest) (map[digest.Digest][]byte, error) {
if len(dgs) > int(c.MaxBatchDigests) {
return nil, fmt.Errorf("batch read of %d total blobs exceeds maximum of %d", len(dgs), c.MaxBatchDigests)
}
req := &repb.BatchReadBlobsRequest{InstanceName: c.InstanceName}
var sz int64
foundEmpty := false
for _, dg := range dgs {
if dg.Size == 0 {
foundEmpty = true
continue
}
sz += int64(dg.Size)
req.Digests = append(req.Digests, dg.ToProto())
}
if sz > int64(c.MaxBatchSize) {
return nil, fmt.Errorf("batch read of %d total bytes exceeds maximum of %d", sz, c.MaxBatchSize)
}
res := make(map[digest.Digest][]byte)
if foundEmpty {
res[digest.Empty] = nil
}
opts := c.RPCOpts()
closure := func() error {
var resp *repb.BatchReadBlobsResponse
err := c.CallWithTimeout(ctx, "BatchReadBlobs", func(ctx context.Context) (e error) {
resp, e = c.cas.BatchReadBlobs(ctx, req, opts...)
return e
})
if err != nil {
return err
}
numErrs, errDg, errMsg := 0, &repb.Digest{}, ""
var failedDgs []*repb.Digest
var retriableError error
allRetriable := true
for _, r := range resp.Responses {
st := status.FromProto(r.Status)
if st.Code() != codes.OK {
e := st.Err()
if c.Retrier.ShouldRetry(e) {
failedDgs = append(failedDgs, r.Digest)
retriableError = e
} else {
allRetriable = false
}
numErrs++
errDg = r.Digest
errMsg = r.Status.Message
} else {
res[digest.NewFromProtoUnvalidated(r.Digest)] = r.Data
}
}
req.Digests = failedDgs
if numErrs > 0 {
if allRetriable {
return retriableError // Retriable errors only, retry the failed digests.
}
return fmt.Errorf("downloading blobs as part of a batch resulted in %d failures, including blob %s: %s", numErrs, errDg, errMsg)
}
return nil
}
return res, c.Retrier.Do(ctx, closure)
}