bool GrpcRunner::DoRead()

in e2e-examples/gcs/benchmark/grpc_runner.cc [200:292]


bool GrpcRunner::DoRead(
    int thread_id, std::shared_ptr<StorageStubProvider> storage_stub_provider) {
  while (true) {
    auto work = work_queue_->pop(thread_id);
    auto work_tid = std::get<0>(work);
    auto work_run = std::get<1>(work);
    if (work_run == 0) {
      break;
    }
    while (true) {
      auto storage = storage_stub_provider->GetStorageStub();

      std::string object = object_resolver_.Resolve(work_tid, work_run);
      ReadObjectRequest request;
      request.set_bucket(ToV2BucketName(parameters_.bucket));
      request.set_object(object);
      if (parameters_.read_offset >= 0) {
        request.set_read_offset(parameters_.read_offset);
      }
      if (parameters_.read_limit >= 0) {
        request.set_read_limit(parameters_.read_limit);
      }

      absl::Time run_start = absl::Now();
      grpc::ClientContext context;
      ApplyCallTimeout(&context, parameters_.timeout);
      ApplyRoutingHeaders(&context, parameters_.bucket);
      std::unique_ptr<grpc::ClientReader<ReadObjectResponse>> reader =
          storage.stub->ReadObject(&context, request);

      int64_t total_bytes = 0;
      std::vector<RunnerWatcher::Chunk> chunks;
      chunks.reserve(256);

      ReadObjectResponse response;
      while (reader->Read(&response)) {
        const auto& content = response.checksummed_data().content();
        int64_t content_size = content.size();

        if (parameters_.crc32c) {
          uint32_t content_crc = response.checksummed_data().crc32c();
          uint32_t calculated_crc = (uint32_t)ComputeCrc32c(content);
          if (content_crc != calculated_crc) {
            std::cerr << "CRC32 is not identical. " << content_crc << " vs "
                      << calculated_crc << std::endl;
            break;
          }
        }

        RunnerWatcher::Chunk chunk = {absl::Now(), content_size};
        chunks.push_back(chunk);
        total_bytes += content_size;
      }

      auto status = reader->Finish();
      absl::Time run_end = absl::Now();

      if (!status.ok()) {
        std::cerr << "Download Failure!" << std::endl;
        std::cerr << "Peer:   " << context.peer() << std::endl;
        std::cerr << "Start:  " << run_start << std::endl;
        std::cerr << "End:    " << run_end << std::endl;
        std::cerr << "Elapsed: " << (run_end - run_start) << std::endl;
        std::cerr << "Bucket: " << parameters_.bucket.c_str() << std::endl;
        std::cerr << "Object: " << object.c_str() << std::endl;
        std::cerr << "Bytes:  " << total_bytes << std::endl;
        std::cerr << "Status: " << std::endl;
        std::cerr << "- Code:    " << status.error_code() << std::endl;
        std::cerr << "- Message: " << status.error_message() << std::endl;
        std::cerr << "- Details: " << status.error_details() << std::endl;
      }

      storage_stub_provider->ReportResult(storage.handle, status, context,
                                          run_end - run_start, total_bytes);

      watcher_->NotifyCompleted(
          OperationType::Read, work_tid, GetChannelId(storage.handle),
          context.peer(), parameters_.bucket, object, status, total_bytes,
          run_start, run_end - run_start, std::move(chunks));

      if (status.ok()) {
        break;
      } else if (parameters_.trying) {
        // let's try the same if keep_trying is set and it failed
        continue;
      } else {
        return false;
      }
    }
  }

  return true;
}