in go/pkg/fakes/cas.go [561:670]
func (f *CAS) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
off := int64(0)
buf := new(bytes.Buffer)
req, err := stream.Recv()
if err == io.EOF {
return status.Error(codes.InvalidArgument, "no write request received")
}
if err != nil {
return err
}
path := strings.Split(req.ResourceName, "/")
if (len(path) != 6 && len(path) != 7) || path[0] != "instance" || path[1] != "uploads" || (path[3] != "blobs" && path[3] != "compressed-blobs") {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
// indexOffset for all 4+ paths - `compressed-blobs` paths have one more element.
indexOffset := 0
if path[3] == "compressed-blobs" {
indexOffset = 1
// TODO(rubensf): Change this to all the possible compressors in https://github.com/bazelbuild/remote-apis/pull/168.
if path[4] != "zstd" {
return status.Error(codes.InvalidArgument, "test fake expected valid compressor, eg zstd")
}
}
size, err := strconv.ParseInt(path[5+indexOffset], 10, 64)
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
dg, err := digest.New(path[4+indexOffset], size)
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected a valid digest as part of the resource name: \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
if uuid.Parse(path[2]) == nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
f.maybeSleep()
f.maybeBlock(dg)
f.mu.Lock()
f.writeReqs++
f.concReqs++
defer func() {
f.mu.Lock()
f.concReqs--
f.mu.Unlock()
}()
if f.concReqs > f.maxConcReqs {
f.maxConcReqs = f.concReqs
}
f.mu.Unlock()
res := req.ResourceName
done := false
for {
if req.ResourceName != res && req.ResourceName != "" {
return status.Errorf(codes.InvalidArgument, "follow-up request had resource name %q different from original %q", req.ResourceName, res)
}
if req.WriteOffset != off {
return status.Errorf(codes.InvalidArgument, "request had incorrect offset %d, expected %d", req.WriteOffset, off)
}
if done {
return status.Errorf(codes.InvalidArgument, "received write request after the client finished writing")
}
// 2 MB is the protocol max.
if len(req.Data) > 2*1024*1024 {
return status.Errorf(codes.InvalidArgument, "data chunk greater than 2MB")
}
// bytes.Buffer.Write can't error
_, _ = buf.Write(req.Data)
off += int64(len(req.Data))
if req.FinishWrite {
done = true
}
req, err = stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
}
if !done {
return status.Errorf(codes.InvalidArgument, "reached end of stream before the client finished writing")
}
uncompressedBuf := buf.Bytes()
if path[3] == "compressed-blobs" {
if path[4] != "zstd" {
return status.Errorf(codes.InvalidArgument, "%s compressor isn't supported", path[4])
}
var err error
uncompressedBuf, err = zstdDecoder.DecodeAll(buf.Bytes(), nil)
if err != nil {
return status.Errorf(codes.InvalidArgument, "served bytes can't be decompressed: %v", err)
}
}
f.mu.Lock()
f.blobs[dg] = uncompressedBuf
f.writes[dg]++
f.mu.Unlock()
cDg := digest.NewFromBlob(uncompressedBuf)
if dg != cDg {
return status.Errorf(codes.InvalidArgument, "mismatched digest: received %s, computed %s", dg, cDg)
}
return stream.SendAndClose(&bspb.WriteResponse{CommittedSize: dg.Size})
}