func()

in go/pkg/client/cas.go [979:1043]


func (c *Client) readBlobStreamed(ctx context.Context, d digest.Digest, offset, limit int64, w io.Writer) (*MovedBytesMetadata, error) {
	stats := &MovedBytesMetadata{}
	stats.Requested = d.Size
	if d.Size == 0 {
		// Do not download empty blobs.
		return stats, nil
	}
	sz := d.Size - offset
	if limit > 0 && limit < sz {
		sz = limit
	}
	wt := newWriteTracker(w)
	defer func() { stats.LogicalMoved = wt.n }()
	closure := func() (err error) {
		name, wc, done, e := c.maybeCompressReadBlob(d, wt)
		if e != nil {
			return e
		}

		defer func() {
			errC := wc.Close()
			errD := <-done
			close(done)

			if err != nil && errC != nil {
				err = errC
			}
			if err != nil && errD != nil {
				err = fmt.Errorf("Failed to finalize writing downloaded data downstream: %v", err)
			}
		}()

		wireBytes, err := c.readStreamed(ctx, name, offset+wt.n, limit, wc)
		stats.RealMoved += wireBytes
		if err != nil {
			return err
		}
		return nil
	}
	// Only retry on transient backend issues.
	if err := c.Retrier.Do(ctx, closure); err != nil {
		return stats, err
	}
	if wt.n != sz {
		return stats, fmt.Errorf("partial read of digest %s returned %d bytes", wt.dg, sz)
	}

	// Incomplete reads only, since we can't reliably calculate hash without the full blob
	if d.Size == sz {
		// Signal for writeTracker to take the digest of the data.
		if err := wt.Close(); err != nil {
			return stats, err
		}
		// Wait for the digest to be ready.
		if err := <-wt.ready; err != nil {
			return stats, err
		}
		close(wt.ready)
		if wt.dg != d {
			return stats, fmt.Errorf("calculated digest %s != expected digest %s", wt.dg, d)
		}
	}

	return stats, nil
}