bool GrpcRunner::DoRandomRead()

in e2e-examples/gcs/benchmark/grpc_runner.cc [294:397]


bool GrpcRunner::DoRandomRead(
    int thread_id, std::shared_ptr<StorageStubProvider> storage_stub_provider) {
  if (parameters_.read_limit <= 0) {
    std::cerr << "read_limit should be greater than 0." << std::endl;
    return false;
  }
  int64_t read_span =
      parameters_.read_limit - std::max(int64_t(0), parameters_.read_offset);
  if (read_span <= 0) {
    std::cerr << "read_limit should be greater than read_offset." << std::endl;
    return false;
  }
  if (parameters_.chunk_size == 0) {
    std::cerr << "chunk_size should be greater than 0." << std::endl;
    return false;
  }
  int64_t chunks = read_span / parameters_.chunk_size;
  if (chunks <= 0) {
    std::cerr
        << "read_limit should be greater than or equal to readable window."
        << std::endl;
    return false;
  }

  auto storage = storage_stub_provider->GetStorageStub();
  std::string object = object_resolver_.Resolve(thread_id, 0);
  absl::BitGen gen;
  for (int run = 0; run < parameters_.runs; run++) {
    int64_t offset = absl::Uniform(gen, 0, chunks) * parameters_.chunk_size;
    ReadObjectRequest request;
    request.set_bucket(ToV2BucketName(parameters_.bucket));
    request.set_object(object);
    request.set_read_offset(offset);
    request.set_read_limit(parameters_.chunk_size);

    absl::Time run_start = absl::Now();
    grpc::ClientContext context;
    ApplyRoutingHeaders(&context, parameters_.bucket);
    ApplyCallTimeout(&context, parameters_.timeout);
    std::unique_ptr<grpc::ClientReader<ReadObjectResponse>> reader =
        storage.stub->ReadObject(&context, request);

    int64_t total_bytes = 0;
    std::vector<RunnerWatcher::Chunk> chunks;
    chunks.reserve(256);

    ReadObjectResponse response;
    while (reader->Read(&response)) {
      const auto& content = response.checksummed_data().content();
      int64_t content_size = content.size();

      if (parameters_.crc32c) {
        uint32_t content_crc = response.checksummed_data().crc32c();
        uint32_t calculated_crc = (uint32_t)ComputeCrc32c(content);
        if (content_crc != calculated_crc) {
          std::cerr << "CRC32 is not identical. " << content_crc << " vs "
                    << calculated_crc << std::endl;
          break;
        }
      }

      RunnerWatcher::Chunk chunk = {absl::Now(), content_size};
      chunks.push_back(chunk);
      total_bytes += content_size;
    }

    auto status = reader->Finish();
    absl::Time run_end = absl::Now();

    if (!status.ok()) {
      std::cerr << "Download Failure!" << std::endl;
      std::cerr << "Peer:   " << context.peer() << std::endl;
      std::cerr << "Start:  " << run_start << std::endl;
      std::cerr << "End:    " << run_end << std::endl;
      std::cerr << "Elapsed: " << (run_end - run_start) << std::endl;
      std::cerr << "Bucket: " << parameters_.bucket.c_str() << std::endl;
      std::cerr << "Object: " << object.c_str() << std::endl;
      std::cerr << "Bytes:  " << total_bytes << std::endl;
      std::cerr << "Status: " << std::endl;
      std::cerr << "- Code:    " << status.error_code() << std::endl;
      std::cerr << "- Message: " << status.error_message() << std::endl;
      std::cerr << "- Details: " << status.error_details() << std::endl;
    }

    storage_stub_provider->ReportResult(storage.handle, status, context,
                                        run_end - run_start, total_bytes);

    watcher_->NotifyCompleted(
        OperationType::Read, thread_id, GetChannelId(storage.handle),
        context.peer(), parameters_.bucket, object, status, total_bytes,
        run_start, run_end - run_start, std::move(chunks));

    if (status.ok()) {
      ;
    } else if (parameters_.trying) {
      // let's try the same if keep_trying is set and it failed
      run -= 1;
    } else {
      return false;
    }
  }

  return true;
}