func()

in go/pkg/cas/upload.go [1050:1135]


func (u *uploader) streamFromReader(ctx context.Context, r io.Reader, digest *repb.Digest, compressed, updateCacheStats bool) error {
	ctx, cancel, withTimeout := withPerCallTimeout(ctx, u.Config.ByteStreamWrite.Timeout)
	defer cancel()

	stream, err := u.byteStream.Write(ctx)
	if err != nil {
		return err
	}
	defer stream.CloseSend()

	req := &bspb.WriteRequest{}
	if compressed {
		req.ResourceName = fmt.Sprintf("%s/uploads/%s/compressed-blobs/zstd/%s/%d", u.InstanceName, uuid.New(), digest.Hash, digest.SizeBytes)
	} else {
		req.ResourceName = fmt.Sprintf("%s/uploads/%s/blobs/%s/%d", u.InstanceName, uuid.New(), digest.Hash, digest.SizeBytes)
	}

	buf := u.streamBufs.Get().(*[]byte)
	defer u.streamBufs.Put(buf)

chunkLoop:
	for {
		// Before reading, check if the context if canceled.
		if ctx.Err() != nil {
			return ctx.Err()
		}

		// Read the next chunk from the pipe.
		// Use ReadFull to ensure we aren't sending tiny blobs over RPC.
		region := trace.StartRegion(ctx, "ReadFull in streamFromReader")
		n, err := io.ReadFull(r, *buf)
		region.End()
		switch {
		case err == io.EOF || err == io.ErrUnexpectedEOF:
			req.FinishWrite = true
		case err != nil:
			return err
		}
		req.Data = (*buf)[:n] // must limit by `:n` in ErrUnexpectedEOF case

		// Send the chunk.
		withTimeout(func() {
			trace.WithRegion(ctx, "stream.Send", func() {
				err = stream.Send(req)
			})
		})
		switch {
		case err == io.EOF:
			// The server closed the stream.
			// Most likely the file is already uploaded, see the CommittedSize check below.
			break chunkLoop
		case err != nil:
			return err
		case req.FinishWrite:
			break chunkLoop
		}

		// Prepare the next request.
		req.ResourceName = "" // send the resource name only in the first request
		req.WriteOffset += int64(len(req.Data))
	}

	// Finalize the request.
	switch res, err := stream.CloseAndRecv(); {
	case err != nil:
		return err
	case res.CommittedSize != digest.SizeBytes:
		return fmt.Errorf("unexpected commitSize: got %d, want %d", res.CommittedSize, digest.SizeBytes)
	}

	// Update stats.
	cacheHit := !req.FinishWrite
	if !cacheHit {
		atomic.AddInt64(&u.stats.Streamed.Bytes, digest.SizeBytes)
		atomic.AddInt64(&u.stats.Streamed.Digests, 1)
	}
	if updateCacheStats {
		st := &u.stats.CacheMisses
		if cacheHit {
			st = &u.stats.CacheHits
		}
		atomic.AddInt64(&st.Bytes, digest.SizeBytes)
		atomic.AddInt64(&st.Digests, 1)
	}
	return nil
}