in go/pkg/cas/upload.go [1050:1135]
func (u *uploader) streamFromReader(ctx context.Context, r io.Reader, digest *repb.Digest, compressed, updateCacheStats bool) error {
ctx, cancel, withTimeout := withPerCallTimeout(ctx, u.Config.ByteStreamWrite.Timeout)
defer cancel()
stream, err := u.byteStream.Write(ctx)
if err != nil {
return err
}
defer stream.CloseSend()
req := &bspb.WriteRequest{}
if compressed {
req.ResourceName = fmt.Sprintf("%s/uploads/%s/compressed-blobs/zstd/%s/%d", u.InstanceName, uuid.New(), digest.Hash, digest.SizeBytes)
} else {
req.ResourceName = fmt.Sprintf("%s/uploads/%s/blobs/%s/%d", u.InstanceName, uuid.New(), digest.Hash, digest.SizeBytes)
}
buf := u.streamBufs.Get().(*[]byte)
defer u.streamBufs.Put(buf)
chunkLoop:
for {
// Before reading, check if the context if canceled.
if ctx.Err() != nil {
return ctx.Err()
}
// Read the next chunk from the pipe.
// Use ReadFull to ensure we aren't sending tiny blobs over RPC.
region := trace.StartRegion(ctx, "ReadFull in streamFromReader")
n, err := io.ReadFull(r, *buf)
region.End()
switch {
case err == io.EOF || err == io.ErrUnexpectedEOF:
req.FinishWrite = true
case err != nil:
return err
}
req.Data = (*buf)[:n] // must limit by `:n` in ErrUnexpectedEOF case
// Send the chunk.
withTimeout(func() {
trace.WithRegion(ctx, "stream.Send", func() {
err = stream.Send(req)
})
})
switch {
case err == io.EOF:
// The server closed the stream.
// Most likely the file is already uploaded, see the CommittedSize check below.
break chunkLoop
case err != nil:
return err
case req.FinishWrite:
break chunkLoop
}
// Prepare the next request.
req.ResourceName = "" // send the resource name only in the first request
req.WriteOffset += int64(len(req.Data))
}
// Finalize the request.
switch res, err := stream.CloseAndRecv(); {
case err != nil:
return err
case res.CommittedSize != digest.SizeBytes:
return fmt.Errorf("unexpected commitSize: got %d, want %d", res.CommittedSize, digest.SizeBytes)
}
// Update stats.
cacheHit := !req.FinishWrite
if !cacheHit {
atomic.AddInt64(&u.stats.Streamed.Bytes, digest.SizeBytes)
atomic.AddInt64(&u.stats.Streamed.Digests, 1)
}
if updateCacheStats {
st := &u.stats.CacheMisses
if cacheHit {
st = &u.stats.CacheHits
}
atomic.AddInt64(&st.Bytes, digest.SizeBytes)
atomic.AddInt64(&st.Digests, 1)
}
return nil
}