func()

in go/pkg/fakes/cas.go [561:670]


func (f *CAS) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
	off := int64(0)
	buf := new(bytes.Buffer)

	req, err := stream.Recv()
	if err == io.EOF {
		return status.Error(codes.InvalidArgument, "no write request received")
	}
	if err != nil {
		return err
	}

	path := strings.Split(req.ResourceName, "/")
	if (len(path) != 6 && len(path) != 7) || path[0] != "instance" || path[1] != "uploads" || (path[3] != "blobs" && path[3] != "compressed-blobs") {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	// indexOffset for all 4+ paths - `compressed-blobs` paths have one more element.
	indexOffset := 0
	if path[3] == "compressed-blobs" {
		indexOffset = 1
		// TODO(rubensf): Change this to all the possible compressors in https://github.com/bazelbuild/remote-apis/pull/168.
		if path[4] != "zstd" {
			return status.Error(codes.InvalidArgument, "test fake expected valid compressor, eg zstd")
		}
	}
	size, err := strconv.ParseInt(path[5+indexOffset], 10, 64)
	if err != nil {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	dg, err := digest.New(path[4+indexOffset], size)
	if err != nil {
		return status.Error(codes.InvalidArgument, "test fake expected a valid digest as part of the resource name: \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}
	if uuid.Parse(path[2]) == nil {
		return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
	}

	f.maybeSleep()
	f.maybeBlock(dg)
	f.mu.Lock()
	f.writeReqs++
	f.concReqs++
	defer func() {
		f.mu.Lock()
		f.concReqs--
		f.mu.Unlock()
	}()
	if f.concReqs > f.maxConcReqs {
		f.maxConcReqs = f.concReqs
	}
	f.mu.Unlock()
	res := req.ResourceName
	done := false
	for {
		if req.ResourceName != res && req.ResourceName != "" {
			return status.Errorf(codes.InvalidArgument, "follow-up request had resource name %q different from original %q", req.ResourceName, res)
		}
		if req.WriteOffset != off {
			return status.Errorf(codes.InvalidArgument, "request had incorrect offset %d, expected %d", req.WriteOffset, off)
		}
		if done {
			return status.Errorf(codes.InvalidArgument, "received write request after the client finished writing")
		}
		// 2 MB is the protocol max.
		if len(req.Data) > 2*1024*1024 {
			return status.Errorf(codes.InvalidArgument, "data chunk greater than 2MB")
		}

		// bytes.Buffer.Write can't error
		_, _ = buf.Write(req.Data)
		off += int64(len(req.Data))
		if req.FinishWrite {
			done = true
		}

		req, err = stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
	}

	if !done {
		return status.Errorf(codes.InvalidArgument, "reached end of stream before the client finished writing")
	}

	uncompressedBuf := buf.Bytes()
	if path[3] == "compressed-blobs" {
		if path[4] != "zstd" {
			return status.Errorf(codes.InvalidArgument, "%s compressor isn't supported", path[4])
		}
		var err error
		uncompressedBuf, err = zstdDecoder.DecodeAll(buf.Bytes(), nil)
		if err != nil {
			return status.Errorf(codes.InvalidArgument, "served bytes can't be decompressed: %v", err)
		}
	}

	f.mu.Lock()
	f.blobs[dg] = uncompressedBuf
	f.writes[dg]++
	f.mu.Unlock()
	cDg := digest.NewFromBlob(uncompressedBuf)
	if dg != cDg {
		return status.Errorf(codes.InvalidArgument, "mismatched digest: received %s, computed %s", dg, cDg)
	}
	return stream.SendAndClose(&bspb.WriteResponse{CommittedSize: dg.Size})
}