func()

in go/pkg/fakes/cas.go [673:728]


func (f *CAS) Read(req *bspb.ReadRequest, stream bsgrpc.ByteStream_ReadServer) error {
	if req.ReadOffset != 0 || req.ReadLimit != 0 {
		return status.Error(codes.Unimplemented, "test fake does not implement read_offset or limit")
	}

	path := strings.Split(req.ResourceName, "/")
	if (len(path) != 4 && len(path) != 5) || path[0] != "instance" || (path[1] != "blobs" && path[1] != "compressed-blobs") {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	// indexOffset for all 2+ paths - `compressed-blobs` has one more URI element.
	indexOffset := 0
	if path[1] == "compressed-blobs" {
		indexOffset = 1
	}

	size, err := strconv.Atoi(path[3+indexOffset])
	if err != nil {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	dg := digest.TestNew(path[2+indexOffset], int64(size))
	f.maybeSleep()
	f.maybeBlock(dg)
	blob, ok := f.blobs[dg]
	f.mu.Lock()
	f.reads[dg]++
	f.mu.Unlock()
	if !ok {
		return status.Errorf(codes.NotFound, "test fake missing blob with digest %s was requested", dg)
	}

	if path[1] == "compressed-blobs" {
		if path[2] != "zstd" {
			return status.Error(codes.InvalidArgument, "test fake expected valid compressor, eg zstd")
		}
		blob = zstdEncoder.EncodeAll(blob, nil)
	}
	ue := uploadinfo.EntryFromBlob(blob)
	ch, err := chunker.New(ue, false, 2*1024*1024)
	if err != nil {
		return status.Errorf(codes.Internal, "test fake failed to create chunker: %v", err)
	}

	resp := &bspb.ReadResponse{}
	for ch.HasNext() {
		chunk, err := ch.Next()
		resp.Data = chunk.Data
		if err != nil {
			return err
		}
		err = stream.Send(resp)
		if err != nil {
			return err
		}
	}
	return nil
}