in go/pkg/client/cas.go [622:684]
func (c *Client) BatchWriteBlobs(ctx context.Context, blobs map[digest.Digest][]byte) error {
var reqs []*repb.BatchUpdateBlobsRequest_Request
var sz int64
for k, b := range blobs {
sz += int64(k.Size)
reqs = append(reqs, &repb.BatchUpdateBlobsRequest_Request{
Digest: k.ToProto(),
Data: b,
})
}
if sz > int64(c.MaxBatchSize) {
return fmt.Errorf("batch update of %d total bytes exceeds maximum of %d", sz, c.MaxBatchSize)
}
if len(blobs) > int(c.MaxBatchDigests) {
return fmt.Errorf("batch update of %d total blobs exceeds maximum of %d", len(blobs), c.MaxBatchDigests)
}
opts := c.RPCOpts()
closure := func() error {
var resp *repb.BatchUpdateBlobsResponse
err := c.CallWithTimeout(ctx, "BatchUpdateBlobs", func(ctx context.Context) (e error) {
resp, e = c.cas.BatchUpdateBlobs(ctx, &repb.BatchUpdateBlobsRequest{
InstanceName: c.InstanceName,
Requests: reqs,
}, opts...)
return e
})
if err != nil {
return err
}
numErrs, errDg, errMsg := 0, new(repb.Digest), ""
var failedReqs []*repb.BatchUpdateBlobsRequest_Request
var retriableError error
allRetriable := true
for _, r := range resp.Responses {
st := status.FromProto(r.Status)
if st.Code() != codes.OK {
e := StatusDetailedError(st)
if c.Retrier.ShouldRetry(e) {
failedReqs = append(failedReqs, &repb.BatchUpdateBlobsRequest_Request{
Digest: r.Digest,
Data: blobs[digest.NewFromProtoUnvalidated(r.Digest)],
})
retriableError = e
} else {
allRetriable = false
}
numErrs++
errDg = r.Digest
errMsg = e.Error()
}
}
reqs = failedReqs
if numErrs > 0 {
if allRetriable {
return retriableError // Retriable errors only, retry the failed requests.
}
return fmt.Errorf("uploading blobs as part of a batch resulted in %d failures, including blob %s: %s", numErrs, errDg, errMsg)
}
return nil
}
return c.Retrier.Do(ctx, closure)
}