bool GrpcRunner::DoWrite()

in e2e-examples/gcs/benchmark/grpc_runner.cc [399:543]


bool GrpcRunner::DoWrite(
    int thread_id, std::shared_ptr<StorageStubProvider> storage_stub_provider) {
  const int64_t max_chunk_size =
      (parameters_.chunk_size < 0) ? 2097152 : parameters_.chunk_size;

  if (parameters_.object_stop > 0) {
    std::cerr << "write doesn't support object_stop" << std::endl;
    return false;
  }
  if (parameters_.write_size <= 0) {
    std::cerr << "write_size should be greater than 0." << std::endl;
    return false;
  }

  while (true) {
    auto work = work_queue_->pop(thread_id);
    auto work_tid = std::get<0>(work);
    auto work_run = std::get<1>(work);
    if (work_run == 0) {
      break;
    }
    while (true) {
      auto storage = storage_stub_provider->GetStorageStub();

      std::string object = object_resolver_.Resolve(work_tid, work_run);
      absl::Time run_start = absl::Now();
      absl::crc32c_t object_crc32c(0);

      std::string upload_id;
      if (parameters_.resumable) {
        grpc::ClientContext context;
        ApplyRoutingHeaders(&context, parameters_.bucket);
        ApplyCallTimeout(&context, parameters_.timeout);
        StartResumableWriteRequest start_request;
        auto resource =
            start_request.mutable_write_object_spec()->mutable_resource();
        resource->set_bucket(ToV2BucketName(parameters_.bucket));
        resource->set_name(object);
        StartResumableWriteResponse start_response;
        auto status = storage.stub->StartResumableWrite(&context, start_request,
                                                        &start_response);
        if (!status.ok()) {
          std::cerr << "StartResumableWrite failed code=" << status.error_code()
                    << std::endl;
          return false;
        }
        upload_id = start_response.upload_id();
      }

      grpc::ClientContext context;
      ApplyRoutingHeaders(&context, parameters_.bucket);
      ApplyCallTimeout(&context, parameters_.timeout);
      WriteObjectResponse reply;
      std::unique_ptr<grpc::ClientWriter<WriteObjectRequest>> writer(
          storage.stub->WriteObject(&context, &reply));

      int64_t total_bytes = 0;
      std::vector<RunnerWatcher::Chunk> chunks;
      chunks.reserve(256);

      for (int64_t o = 0; o < parameters_.write_size; o += max_chunk_size) {
        bool first_request = o == 0;
        bool last_request = (o + max_chunk_size) >= parameters_.write_size;
        int64_t chunk_size =
            std::min(max_chunk_size, parameters_.write_size - o);

        WriteObjectRequest request;
        if (first_request) {
          if (parameters_.resumable) {
            request.set_upload_id(upload_id);
          } else {
            auto resource =
                request.mutable_write_object_spec()->mutable_resource();
            resource->set_bucket(ToV2BucketName(parameters_.bucket));
            resource->set_name(object);
          }
        }

        absl::Cord content = GetRandomData(chunk_size);
        request.mutable_checksummed_data()->set_content(content);
        if (parameters_.crc32c) {
          auto& content = request.mutable_checksummed_data()->content();
          auto crc32c = ComputeCrc32c(content);
          request.mutable_checksummed_data()->set_crc32c((uint32_t)crc32c);
          object_crc32c = absl::ConcatCrc32c(
              object_crc32c, crc32c,
              request.mutable_checksummed_data()->content().size());
        }

        request.set_write_offset(o);
        if (last_request) {
          request.set_finish_write(true);
          if (parameters_.crc32c) {
            request.mutable_object_checksums()->set_crc32c(
                (uint32_t)object_crc32c);
          }
        }

        if (!writer->Write(request)) break;

        RunnerWatcher::Chunk chunk = {absl::Now(), chunk_size};
        chunks.push_back(chunk);
        total_bytes += chunk_size;
      }
      writer->WritesDone();

      auto status = writer->Finish();
      absl::Time run_end = absl::Now();

      if (!status.ok()) {
        std::cerr << "Upload Failure!" << std::endl;
        std::cerr << "Peer:   " << context.peer() << std::endl;
        std::cerr << "Start:  " << run_start << std::endl;
        std::cerr << "End:    " << run_end << std::endl;
        std::cerr << "Elapsed: " << (run_end - run_start) << std::endl;
        std::cerr << "Bucket: " << parameters_.bucket.c_str() << std::endl;
        std::cerr << "Object: " << object.c_str() << std::endl;
        std::cerr << "Bytes:  " << total_bytes << std::endl;
        std::cerr << "Status: " << std::endl;
        std::cerr << "- Code:    " << status.error_code() << std::endl;
        std::cerr << "- Message: " << status.error_message() << std::endl;
        std::cerr << "- Details: " << status.error_details() << std::endl;
      }

      storage_stub_provider->ReportResult(storage.handle, status, context,
                                          run_end - run_start, total_bytes);

      watcher_->NotifyCompleted(
          OperationType::Write, work_tid, GetChannelId(storage.handle),
          context.peer(), parameters_.bucket, object, status, total_bytes,
          run_start, run_end - run_start, std::move(chunks));

      if (status.ok()) {
        break;
      } else if (parameters_.trying) {
        // let's try the same if keep_trying is set and it failed
        continue;
      } else {
        return false;
      }
    }
  }

  return true;
}