e2e-examples/gcs/benchmark/gcscpp_runner.cc (226 lines of code) (raw):
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "gcscpp_runner.h"
#include <string>
#include <thread>
#include "absl/random/random.h"
#include "absl/strings/cord.h"
#include "absl/time/time.h"
#include "e2e-examples/gcs/benchmark/random_data.h"
#include "google/cloud/grpc_options.h"
#include "google/cloud/storage/client.h"
#include "google/cloud/storage/grpc_plugin.h"
GcscppRunner::GcscppRunner(Parameters parameters,
std::shared_ptr<RunnerWatcher> watcher)
: parameters_(parameters),
object_resolver_(parameters_.object, parameters_.object_format,
parameters_.object_start, parameters_.object_stop),
watcher_(watcher) {}
static google::cloud::storage::Client CreateClient(
const Parameters& parameters) {
auto opts = google::cloud::Options{};
if (parameters.write_size > 0) {
// Make a upload buffer big enough to make it done in a single rpc call if
// chunk_size is not specified explicitly.
std::size_t upload_buffer_size =
(std::size_t)((parameters.chunk_size < 0) ? parameters.write_size
: parameters.chunk_size);
opts.set<google::cloud::storage::UploadBufferSizeOption>(
upload_buffer_size);
}
if (parameters.client == "gcscpp-grpc") {
std::string target = parameters.host;
if (target.empty()) {
target = "storage.googleapis.com";
}
if (parameters.td) {
target = "google-c2p:///" + target;
}
if (parameters.carg != 0) {
opts.set<google::cloud::GrpcNumChannelsOption>(parameters.carg);
}
if (parameters.tx_zerocopy) {
grpc::ChannelArguments channel_arguments;
channel_arguments.SetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED, 1);
opts.set<google::cloud::GrpcChannelArgumentsNativeOption>(
channel_arguments);
}
return ::google::cloud::storage::MakeGrpcClient(
opts.set<google::cloud::EndpointOption>(target));
} else {
if (!parameters.host.empty()) {
opts.set<google::cloud::storage::RestEndpointOption>(parameters.host);
}
if (!parameters.target_api_version.empty()) {
opts.set<google::cloud::storage::internal::TargetApiVersionOption>(
parameters.target_api_version);
}
return ::google::cloud::storage::Client(std::move(opts));
}
}
bool GcscppRunner::Run() {
auto client = CreateClient(parameters_);
// Spawns benchmark threads and waits until they're done.
std::vector<std::thread> threads;
std::vector<bool> returns(parameters_.threads);
for (int i = 1; i <= parameters_.threads; i++) {
int thread_id = i;
threads.emplace_back([thread_id, client, &returns, this]() {
bool r = this->DoOperation(thread_id, client);
if (!r && !parameters_.wait_threads) {
std::cerr << "Thread id=" << thread_id << " stopped." << std::endl;
exit(1);
}
returns[thread_id - 1] = r;
});
}
std::for_each(threads.begin(), threads.end(),
[](std::thread& t) { t.join(); });
return std::all_of(returns.begin(), returns.end(), [](bool v) { return v; });
}
bool GcscppRunner::DoOperation(int thread_id,
google::cloud::storage::Client storage_client) {
switch (parameters_.operation_type) {
case OperationType::Read:
return DoRead(thread_id, storage_client);
case OperationType::RandomRead:
return DoRandomRead(thread_id, storage_client);
case OperationType::Write:
return DoWrite(thread_id, storage_client);
default:
return false;
}
}
std::string ExtractPeer(
std::multimap<std::string, std::string> const& headers) {
auto p = headers.find(":grpc-context-peer");
if (p == headers.end()) {
p = headers.find(":curl-peer");
}
return p == headers.end() ? "" : p->second;
}
bool GcscppRunner::DoRead(int thread_id,
google::cloud::storage::Client storage_client) {
std::vector<char> buffer(4 * 1024 * 1024);
auto const buffer_size = static_cast<std::streamsize>(buffer.size());
for (int run = 0; run < parameters_.runs; run++) {
std::string object = object_resolver_.Resolve(thread_id, run);
absl::Time run_start = absl::Now();
auto reader = storage_client.ReadObject(parameters_.bucket, object);
if (!reader) {
std::cerr << "Error reading object: " << reader.status() << "\n";
return false;
}
int64_t total_bytes = 0;
std::vector<RunnerWatcher::Chunk> chunks;
chunks.reserve(256);
while (!reader.eof()) {
reader.read(buffer.data(), buffer_size);
int64_t content_size = reader.gcount();
RunnerWatcher::Chunk chunk = {absl::Now(), content_size};
chunks.push_back(chunk);
total_bytes += content_size;
}
reader.Close();
absl::Time run_end = absl::Now();
watcher_->NotifyCompleted(OperationType::Read, thread_id, 0,
ExtractPeer(reader.headers()), parameters_.bucket,
object, grpc::Status::OK, total_bytes, run_start,
run_end - run_start, chunks);
}
return true;
}
bool GcscppRunner::DoRandomRead(int thread_id,
google::cloud::storage::Client storage_client) {
if (parameters_.read_limit <= 0) {
std::cerr << "read_limit should be greater than 0." << std::endl;
return false;
}
int64_t read_span =
parameters_.read_limit - std::max(int64_t(0), parameters_.read_offset);
if (read_span <= 0) {
std::cerr << "read_limit should be greater than read_offset." << std::endl;
return false;
}
if (parameters_.chunk_size == 0) {
std::cerr << "chunk_size should be greater than 0." << std::endl;
return false;
}
int64_t chunks = read_span / parameters_.chunk_size;
if (chunks <= 0) {
std::cerr
<< "read_limit should be greater than or equal to readable window."
<< std::endl;
return false;
}
std::string object = object_resolver_.Resolve(thread_id, 0);
absl::BitGen gen;
std::vector<char> buffer(4 * 1024 * 1024);
for (int run = 0; run < parameters_.runs; run++) {
int64_t offset = absl::Uniform(gen, 0, chunks) * parameters_.chunk_size;
absl::Time run_start = absl::Now();
auto reader =
storage_client.ReadObject(parameters_.bucket, object,
google::cloud::storage::ReadRange(
offset, offset + parameters_.chunk_size));
if (!reader) {
std::cerr << "Error reading object: " << reader.status() << "\n";
return false;
}
int64_t total_bytes = 0;
std::vector<RunnerWatcher::Chunk> chunks;
chunks.reserve(256);
while (total_bytes < parameters_.chunk_size) {
reader.read(buffer.data(),
std::min(buffer.size(), (size_t)parameters_.chunk_size));
int64_t content_size = reader.gcount();
RunnerWatcher::Chunk chunk = {absl::Now(), content_size};
chunks.push_back(chunk);
total_bytes += content_size;
}
reader.Close();
absl::Time run_end = absl::Now();
watcher_->NotifyCompleted(OperationType::Read, thread_id, 0,
ExtractPeer(reader.headers()), parameters_.bucket,
object, grpc::Status::OK, total_bytes, run_start,
run_end - run_start, chunks);
}
return true;
}
bool GcscppRunner::DoWrite(int thread_id,
google::cloud::storage::Client storage_client) {
const int64_t max_chunk_size = (parameters_.chunk_size < 0)
? parameters_.write_size
: parameters_.chunk_size;
absl::Cord content = GetRandomData(max_chunk_size);
absl::string_view content_data = content.Flatten();
if (parameters_.object_stop > 0) {
std::cerr << "write doesn't support object_stop" << std::endl;
return false;
}
if (parameters_.write_size <= 0) {
std::cerr << "write_size should be greater than 0." << std::endl;
return false;
}
for (int run = 0; run < parameters_.runs; run++) {
std::string object = object_resolver_.Resolve(thread_id, run);
absl::Time run_start = absl::Now();
int64_t total_bytes = 0;
std::vector<RunnerWatcher::Chunk> chunks;
chunks.reserve(256);
auto writer = storage_client.WriteObject(parameters_.bucket, object);
if (!writer) {
std::cerr << "Error writing object: " << writer.last_status() << "\n";
return false;
}
for (int64_t o = 0; o < parameters_.write_size; o += max_chunk_size) {
int64_t chunk_size = std::min(max_chunk_size, parameters_.write_size - o);
writer.write(content_data.data(), chunk_size);
RunnerWatcher::Chunk chunk = {absl::Now(), chunk_size};
chunks.push_back(chunk);
total_bytes += chunk_size;
}
writer.Close();
absl::Time run_end = absl::Now();
watcher_->NotifyCompleted(OperationType::Write, thread_id, 0,
ExtractPeer(writer.headers()), parameters_.bucket,
object, grpc::Status::OK, total_bytes, run_start,
run_end - run_start, std::move(chunks));
}
return true;
}