func()

in go/pkg/fakes/cas.go [145:247]


func (f *Writer) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
	// Store the error so we can verify that the client didn't drop the stream early, meaning the
	// request won't error.
	defer func() { f.Err = err }()

	off := int64(0)
	buf := new(bytes.Buffer)

	req, err := stream.Recv()
	if err == io.EOF {
		return status.Error(codes.InvalidArgument, "no write request received")
	}
	if err != nil {
		return err
	}

	path := strings.Split(req.ResourceName, "/")
	if (len(path) != 6 && len(path) != 7) || path[0] != "instance" || path[1] != "uploads" || (path[3] != "blobs" && path[3] != "compressed-blobs") {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	// indexOffset for all 4+ paths - `compressed-blobs` paths have one more element.
	indexOffset := 0
	if path[3] == "compressed-blobs" {
		indexOffset = 1
		// TODO(rubensf): Change this to all the possible compressors in https://github.com/bazelbuild/remote-apis/pull/168.
		if path[4] != "zstd" {
			return status.Error(codes.InvalidArgument, "test fake expected valid compressor, eg zstd")
		}
	}

	size, err := strconv.ParseInt(path[5+indexOffset], 10, 64)
	if err != nil {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	dg, e := digest.New(path[4+indexOffset], size)
	if e != nil {
		return status.Error(codes.InvalidArgument, "test fake expected valid digest as part of resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	if uuid.Parse(path[2]) == nil {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}

	res := req.ResourceName
	done := false
	for {
		if req.ResourceName != res && req.ResourceName != "" {
			return status.Errorf(codes.InvalidArgument, "follow-up request had resource name %q different from original %q", req.ResourceName, res)
		}
		if req.WriteOffset != off {
			return status.Errorf(codes.InvalidArgument, "request had incorrect offset %d, expected %d", req.WriteOffset, off)
		}
		if done {
			return status.Errorf(codes.InvalidArgument, "received write request after the client finished writing")
		}
		// 2 MB is the protocol max.
		if len(req.Data) > 2*1024*1024 {
			return status.Errorf(codes.InvalidArgument, "data chunk greater than 2MB")
		}

		// bytes.Buffer.Write can't error
		_, _ = buf.Write(req.Data)
		off += int64(len(req.Data))
		if req.FinishWrite {
			done = true
		}

		req, err = stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
	}

	if !done {
		return status.Errorf(codes.InvalidArgument, "reached end of stream before the client finished writing")
	}

	if path[3] == "compressed-blobs" {
		if !f.ExpectCompressed {
			return status.Errorf(codes.FailedPrecondition, "fake expected a call with uncompressed bytes")
		}
		if path[4] != "zstd" {
			return status.Errorf(codes.InvalidArgument, "%s compressor isn't supported", path[4])
		}
		f.Buf, err = zstdDecoder.DecodeAll(buf.Bytes(), nil)
		if err != nil {
			return status.Errorf(codes.InvalidArgument, "served bytes can't be decompressed: %v", err)
		}
	} else {
		if f.ExpectCompressed {
			return status.Errorf(codes.FailedPrecondition, "fake expected a call with compressed bytes")
		}
		f.Buf = buf.Bytes()
	}

	cDg := digest.NewFromBlob(f.Buf)
	if dg != cDg {
		return status.Errorf(codes.InvalidArgument, "mismatched digest: received %s, computed %s", dg, cDg)
	}
	return stream.SendAndClose(&bspb.WriteResponse{CommittedSize: dg.Size})
}