in go/pkg/fakes/cas.go [673:728]
func (f *CAS) Read(req *bspb.ReadRequest, stream bsgrpc.ByteStream_ReadServer) error {
if req.ReadOffset != 0 || req.ReadLimit != 0 {
return status.Error(codes.Unimplemented, "test fake does not implement read_offset or limit")
}
path := strings.Split(req.ResourceName, "/")
if (len(path) != 4 && len(path) != 5) || path[0] != "instance" || (path[1] != "blobs" && path[1] != "compressed-blobs") {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
// indexOffset for all 2+ paths - `compressed-blobs` has one more URI element.
indexOffset := 0
if path[1] == "compressed-blobs" {
indexOffset = 1
}
size, err := strconv.Atoi(path[3+indexOffset])
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
dg := digest.TestNew(path[2+indexOffset], int64(size))
f.maybeSleep()
f.maybeBlock(dg)
blob, ok := f.blobs[dg]
f.mu.Lock()
f.reads[dg]++
f.mu.Unlock()
if !ok {
return status.Errorf(codes.NotFound, "test fake missing blob with digest %s was requested", dg)
}
if path[1] == "compressed-blobs" {
if path[2] != "zstd" {
return status.Error(codes.InvalidArgument, "test fake expected valid compressor, eg zstd")
}
blob = zstdEncoder.EncodeAll(blob, nil)
}
ue := uploadinfo.EntryFromBlob(blob)
ch, err := chunker.New(ue, false, 2*1024*1024)
if err != nil {
return status.Errorf(codes.Internal, "test fake failed to create chunker: %v", err)
}
resp := &bspb.ReadResponse{}
for ch.HasNext() {
chunk, err := ch.Next()
resp.Data = chunk.Data
if err != nil {
return err
}
err = stream.Send(resp)
if err != nil {
return err
}
}
return nil
}