in go/pkg/client/cas.go [979:1043]
func (c *Client) readBlobStreamed(ctx context.Context, d digest.Digest, offset, limit int64, w io.Writer) (*MovedBytesMetadata, error) {
stats := &MovedBytesMetadata{}
stats.Requested = d.Size
if d.Size == 0 {
// Do not download empty blobs.
return stats, nil
}
sz := d.Size - offset
if limit > 0 && limit < sz {
sz = limit
}
wt := newWriteTracker(w)
defer func() { stats.LogicalMoved = wt.n }()
closure := func() (err error) {
name, wc, done, e := c.maybeCompressReadBlob(d, wt)
if e != nil {
return e
}
defer func() {
errC := wc.Close()
errD := <-done
close(done)
if err != nil && errC != nil {
err = errC
}
if err != nil && errD != nil {
err = fmt.Errorf("Failed to finalize writing downloaded data downstream: %v", err)
}
}()
wireBytes, err := c.readStreamed(ctx, name, offset+wt.n, limit, wc)
stats.RealMoved += wireBytes
if err != nil {
return err
}
return nil
}
// Only retry on transient backend issues.
if err := c.Retrier.Do(ctx, closure); err != nil {
return stats, err
}
if wt.n != sz {
return stats, fmt.Errorf("partial read of digest %s returned %d bytes", wt.dg, sz)
}
// Incomplete reads only, since we can't reliably calculate hash without the full blob
if d.Size == sz {
// Signal for writeTracker to take the digest of the data.
if err := wt.Close(); err != nil {
return stats, err
}
// Wait for the digest to be ready.
if err := <-wt.ready; err != nil {
return stats, err
}
close(wt.ready)
if wt.dg != d {
return stats, fmt.Errorf("calculated digest %s != expected digest %s", wt.dg, d)
}
}
return stats, nil
}