e2e-examples/gcs/benchmark/grpc_runner.cc (458 lines of code) (raw):

// Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "grpc_runner.h" #include <grpc/status.h> #include <grpcpp/channel.h> #include <grpcpp/client_context.h> #include <grpcpp/create_channel.h> #include <grpcpp/grpcpp.h> #include <grpcpp/security/credentials.h> #include <stdlib.h> #include <functional> #include <thread> #include "absl/crc/crc32c.h" #include "absl/random/random.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" #include "absl/strings/str_replace.h" #include "absl/strings/string_view.h" #include "absl/time/clock.h" #include "absl/time/time.h" #include "channel_creator.h" #include "channel_policy.h" #include "e2e-examples/gcs/benchmark/random_data.h" #include "google/storage/v2/storage.grpc.pb.h" using ::google::storage::v2::Object; using ::google::storage::v2::ReadObjectRequest; using ::google::storage::v2::ReadObjectResponse; using ::google::storage::v2::StartResumableWriteRequest; using ::google::storage::v2::StartResumableWriteResponse; using ::google::storage::v2::WriteObjectRequest; using ::google::storage::v2::WriteObjectResponse; extern int run_ctest( std::function<std::shared_ptr<grpc::Channel>()> channel_creator, const Parameters& parameters); extern int run_mtest( std::function<std::shared_ptr<grpc::Channel>()> channel_creator, const Parameters& parameters); static std::shared_ptr<grpc::Channel> CreateBenchmarkGrpcChannel( const Parameters& parameters) { return CreateGrpcChannel(parameters.host, parameters.access_token, parameters.network, parameters.cred, parameters.ssl_cert, parameters.rr, parameters.td, parameters.tx_zerocopy); } static int32_t GetChannelId(void* stub_handle) { static std::unordered_map<void*, int32_t> handle_to_id_map; static absl::Mutex lock; absl::MutexLock l(&lock); auto i = handle_to_id_map.find(stub_handle); if (i != handle_to_id_map.end()) { return i->second; } auto new_id = int32_t(handle_to_id_map.size() + 1); handle_to_id_map[stub_handle] = new_id; return new_id; } static std::string ToV2BucketName(absl::string_view bucket_name) { static const absl::string_view V2_BUCKET_NAME_PREFIX = "projects/_/buckets/"; return absl::StrCat(V2_BUCKET_NAME_PREFIX, bucket_name); } static void ApplyRoutingHeaders(grpc::ClientContext* context, absl::string_view bucket_name) { context->AddMetadata("x-goog-request-params", "bucket=" + ToV2BucketName(bucket_name)); } static void ApplyCallTimeout(grpc::ClientContext* context, absl::Duration timeout) { if (timeout != absl::InfiniteDuration()) { context->set_deadline( std::chrono::system_clock::now() + std::chrono::milliseconds(absl::ToInt64Milliseconds(timeout))); } } namespace { absl::crc32c_t ComputeCrc32c(const absl::Cord& cord) { absl::crc32c_t crc(0); for (absl::string_view chunk : cord.Chunks()) { crc = absl::ExtendCrc32c(crc, chunk); } return crc; } } // namespace GrpcRunner::GrpcRunner(Parameters parameters, std::shared_ptr<RunnerWatcher> watcher) : parameters_(parameters), object_resolver_(parameters_.object, parameters_.object_format, parameters_.object_start, parameters_.object_stop), watcher_(watcher) {} bool GrpcRunner::Run() { std::function<std::shared_ptr<grpc::Channel>()> channel_creator = [&]() { return CreateBenchmarkGrpcChannel(parameters_); }; if (parameters_.ctest > 0) { return run_ctest(channel_creator, parameters_); } if (parameters_.mtest > 0) { return run_mtest(channel_creator, parameters_); } // Initializes a gRPC channel pool. std::shared_ptr<StorageStubProvider> stub_pool; if (parameters_.cpolicy == "const") { stub_pool = CreateConstChannelPool(channel_creator); } else if (parameters_.cpolicy == "pool") { if (parameters_.carg <= 0) { std::cerr << "Invalid carg: " << parameters_.carg << std::endl; return false; } stub_pool = CreateRoundRobinChannelPool(channel_creator, parameters_.carg); } else if (parameters_.cpolicy == "bpool") { if (parameters_.carg <= 0) { std::cerr << "Invalid carg: " << parameters_.carg << std::endl; return false; } stub_pool = CreateRoundRobinPlusChannelPool(channel_creator, parameters_.carg); } else if (parameters_.cpolicy == "spool") { if (parameters_.carg <= 0) { std::cerr << "Invalid carg: " << parameters_.carg << std::endl; return false; } stub_pool = CreateSmartRoundRobinChannelPool(channel_creator, parameters_.carg); } // Spawns benchmark threads and waits until they're done. std::vector<std::thread> threads; std::vector<bool> returns(parameters_.threads); work_queue_.reset(new WorkQueue(parameters_.threads, parameters_.runs, parameters_.steal_work)); for (int i = 1; i <= parameters_.threads; i++) { int thread_id = i; std::shared_ptr<StorageStubProvider> storage_stub_provider; if (stub_pool != nullptr) { storage_stub_provider = stub_pool; } else if (parameters_.cpolicy == "perthread") { storage_stub_provider = CreateConstChannelPool(channel_creator); } else if (parameters_.cpolicy == "percall") { storage_stub_provider = CreateCreateNewChannelStubProvider(channel_creator); } threads.emplace_back([thread_id, storage_stub_provider, &returns, this]() { bool r = this->DoOperation(thread_id, storage_stub_provider); if (!r && !parameters_.wait_threads) { std::cerr << "Thread id=" << thread_id << " stopped." << std::endl; exit(1); } returns[thread_id - 1] = r; }); } std::for_each(threads.begin(), threads.end(), [](std::thread& t) { t.join(); }); return std::all_of(returns.begin(), returns.end(), [](bool v) { return v; }); } bool GrpcRunner::DoOperation( int thread_id, std::shared_ptr<StorageStubProvider> storage_stub_provider) { switch (parameters_.operation_type) { case OperationType::Read: return DoRead(thread_id, storage_stub_provider); case OperationType::RandomRead: return DoRandomRead(thread_id, storage_stub_provider); case OperationType::Write: return DoWrite(thread_id, storage_stub_provider); default: return false; } } bool GrpcRunner::DoRead( int thread_id, std::shared_ptr<StorageStubProvider> storage_stub_provider) { while (true) { auto work = work_queue_->pop(thread_id); auto work_tid = std::get<0>(work); auto work_run = std::get<1>(work); if (work_run == 0) { break; } while (true) { auto storage = storage_stub_provider->GetStorageStub(); std::string object = object_resolver_.Resolve(work_tid, work_run); ReadObjectRequest request; request.set_bucket(ToV2BucketName(parameters_.bucket)); request.set_object(object); if (parameters_.read_offset >= 0) { request.set_read_offset(parameters_.read_offset); } if (parameters_.read_limit >= 0) { request.set_read_limit(parameters_.read_limit); } absl::Time run_start = absl::Now(); grpc::ClientContext context; ApplyCallTimeout(&context, parameters_.timeout); ApplyRoutingHeaders(&context, parameters_.bucket); std::unique_ptr<grpc::ClientReader<ReadObjectResponse>> reader = storage.stub->ReadObject(&context, request); int64_t total_bytes = 0; std::vector<RunnerWatcher::Chunk> chunks; chunks.reserve(256); ReadObjectResponse response; while (reader->Read(&response)) { const auto& content = response.checksummed_data().content(); int64_t content_size = content.size(); if (parameters_.crc32c) { uint32_t content_crc = response.checksummed_data().crc32c(); uint32_t calculated_crc = (uint32_t)ComputeCrc32c(content); if (content_crc != calculated_crc) { std::cerr << "CRC32 is not identical. " << content_crc << " vs " << calculated_crc << std::endl; break; } } RunnerWatcher::Chunk chunk = {absl::Now(), content_size}; chunks.push_back(chunk); total_bytes += content_size; } auto status = reader->Finish(); absl::Time run_end = absl::Now(); if (!status.ok()) { std::cerr << "Download Failure!" << std::endl; std::cerr << "Peer: " << context.peer() << std::endl; std::cerr << "Start: " << run_start << std::endl; std::cerr << "End: " << run_end << std::endl; std::cerr << "Elapsed: " << (run_end - run_start) << std::endl; std::cerr << "Bucket: " << parameters_.bucket.c_str() << std::endl; std::cerr << "Object: " << object.c_str() << std::endl; std::cerr << "Bytes: " << total_bytes << std::endl; std::cerr << "Status: " << std::endl; std::cerr << "- Code: " << status.error_code() << std::endl; std::cerr << "- Message: " << status.error_message() << std::endl; std::cerr << "- Details: " << status.error_details() << std::endl; } storage_stub_provider->ReportResult(storage.handle, status, context, run_end - run_start, total_bytes); watcher_->NotifyCompleted( OperationType::Read, work_tid, GetChannelId(storage.handle), context.peer(), parameters_.bucket, object, status, total_bytes, run_start, run_end - run_start, std::move(chunks)); if (status.ok()) { break; } else if (parameters_.trying) { // let's try the same if keep_trying is set and it failed continue; } else { return false; } } } return true; } bool GrpcRunner::DoRandomRead( int thread_id, std::shared_ptr<StorageStubProvider> storage_stub_provider) { if (parameters_.read_limit <= 0) { std::cerr << "read_limit should be greater than 0." << std::endl; return false; } int64_t read_span = parameters_.read_limit - std::max(int64_t(0), parameters_.read_offset); if (read_span <= 0) { std::cerr << "read_limit should be greater than read_offset." << std::endl; return false; } if (parameters_.chunk_size == 0) { std::cerr << "chunk_size should be greater than 0." << std::endl; return false; } int64_t chunks = read_span / parameters_.chunk_size; if (chunks <= 0) { std::cerr << "read_limit should be greater than or equal to readable window." << std::endl; return false; } auto storage = storage_stub_provider->GetStorageStub(); std::string object = object_resolver_.Resolve(thread_id, 0); absl::BitGen gen; for (int run = 0; run < parameters_.runs; run++) { int64_t offset = absl::Uniform(gen, 0, chunks) * parameters_.chunk_size; ReadObjectRequest request; request.set_bucket(ToV2BucketName(parameters_.bucket)); request.set_object(object); request.set_read_offset(offset); request.set_read_limit(parameters_.chunk_size); absl::Time run_start = absl::Now(); grpc::ClientContext context; ApplyRoutingHeaders(&context, parameters_.bucket); ApplyCallTimeout(&context, parameters_.timeout); std::unique_ptr<grpc::ClientReader<ReadObjectResponse>> reader = storage.stub->ReadObject(&context, request); int64_t total_bytes = 0; std::vector<RunnerWatcher::Chunk> chunks; chunks.reserve(256); ReadObjectResponse response; while (reader->Read(&response)) { const auto& content = response.checksummed_data().content(); int64_t content_size = content.size(); if (parameters_.crc32c) { uint32_t content_crc = response.checksummed_data().crc32c(); uint32_t calculated_crc = (uint32_t)ComputeCrc32c(content); if (content_crc != calculated_crc) { std::cerr << "CRC32 is not identical. " << content_crc << " vs " << calculated_crc << std::endl; break; } } RunnerWatcher::Chunk chunk = {absl::Now(), content_size}; chunks.push_back(chunk); total_bytes += content_size; } auto status = reader->Finish(); absl::Time run_end = absl::Now(); if (!status.ok()) { std::cerr << "Download Failure!" << std::endl; std::cerr << "Peer: " << context.peer() << std::endl; std::cerr << "Start: " << run_start << std::endl; std::cerr << "End: " << run_end << std::endl; std::cerr << "Elapsed: " << (run_end - run_start) << std::endl; std::cerr << "Bucket: " << parameters_.bucket.c_str() << std::endl; std::cerr << "Object: " << object.c_str() << std::endl; std::cerr << "Bytes: " << total_bytes << std::endl; std::cerr << "Status: " << std::endl; std::cerr << "- Code: " << status.error_code() << std::endl; std::cerr << "- Message: " << status.error_message() << std::endl; std::cerr << "- Details: " << status.error_details() << std::endl; } storage_stub_provider->ReportResult(storage.handle, status, context, run_end - run_start, total_bytes); watcher_->NotifyCompleted( OperationType::Read, thread_id, GetChannelId(storage.handle), context.peer(), parameters_.bucket, object, status, total_bytes, run_start, run_end - run_start, std::move(chunks)); if (status.ok()) { ; } else if (parameters_.trying) { // let's try the same if keep_trying is set and it failed run -= 1; } else { return false; } } return true; } bool GrpcRunner::DoWrite( int thread_id, std::shared_ptr<StorageStubProvider> storage_stub_provider) { const int64_t max_chunk_size = (parameters_.chunk_size < 0) ? 2097152 : parameters_.chunk_size; if (parameters_.object_stop > 0) { std::cerr << "write doesn't support object_stop" << std::endl; return false; } if (parameters_.write_size <= 0) { std::cerr << "write_size should be greater than 0." << std::endl; return false; } while (true) { auto work = work_queue_->pop(thread_id); auto work_tid = std::get<0>(work); auto work_run = std::get<1>(work); if (work_run == 0) { break; } while (true) { auto storage = storage_stub_provider->GetStorageStub(); std::string object = object_resolver_.Resolve(work_tid, work_run); absl::Time run_start = absl::Now(); absl::crc32c_t object_crc32c(0); std::string upload_id; if (parameters_.resumable) { grpc::ClientContext context; ApplyRoutingHeaders(&context, parameters_.bucket); ApplyCallTimeout(&context, parameters_.timeout); StartResumableWriteRequest start_request; auto resource = start_request.mutable_write_object_spec()->mutable_resource(); resource->set_bucket(ToV2BucketName(parameters_.bucket)); resource->set_name(object); StartResumableWriteResponse start_response; auto status = storage.stub->StartResumableWrite(&context, start_request, &start_response); if (!status.ok()) { std::cerr << "StartResumableWrite failed code=" << status.error_code() << std::endl; return false; } upload_id = start_response.upload_id(); } grpc::ClientContext context; ApplyRoutingHeaders(&context, parameters_.bucket); ApplyCallTimeout(&context, parameters_.timeout); WriteObjectResponse reply; std::unique_ptr<grpc::ClientWriter<WriteObjectRequest>> writer( storage.stub->WriteObject(&context, &reply)); int64_t total_bytes = 0; std::vector<RunnerWatcher::Chunk> chunks; chunks.reserve(256); for (int64_t o = 0; o < parameters_.write_size; o += max_chunk_size) { bool first_request = o == 0; bool last_request = (o + max_chunk_size) >= parameters_.write_size; int64_t chunk_size = std::min(max_chunk_size, parameters_.write_size - o); WriteObjectRequest request; if (first_request) { if (parameters_.resumable) { request.set_upload_id(upload_id); } else { auto resource = request.mutable_write_object_spec()->mutable_resource(); resource->set_bucket(ToV2BucketName(parameters_.bucket)); resource->set_name(object); } } absl::Cord content = GetRandomData(chunk_size); request.mutable_checksummed_data()->set_content(content); if (parameters_.crc32c) { auto& content = request.mutable_checksummed_data()->content(); auto crc32c = ComputeCrc32c(content); request.mutable_checksummed_data()->set_crc32c((uint32_t)crc32c); object_crc32c = absl::ConcatCrc32c( object_crc32c, crc32c, request.mutable_checksummed_data()->content().size()); } request.set_write_offset(o); if (last_request) { request.set_finish_write(true); if (parameters_.crc32c) { request.mutable_object_checksums()->set_crc32c( (uint32_t)object_crc32c); } } if (!writer->Write(request)) break; RunnerWatcher::Chunk chunk = {absl::Now(), chunk_size}; chunks.push_back(chunk); total_bytes += chunk_size; } writer->WritesDone(); auto status = writer->Finish(); absl::Time run_end = absl::Now(); if (!status.ok()) { std::cerr << "Upload Failure!" << std::endl; std::cerr << "Peer: " << context.peer() << std::endl; std::cerr << "Start: " << run_start << std::endl; std::cerr << "End: " << run_end << std::endl; std::cerr << "Elapsed: " << (run_end - run_start) << std::endl; std::cerr << "Bucket: " << parameters_.bucket.c_str() << std::endl; std::cerr << "Object: " << object.c_str() << std::endl; std::cerr << "Bytes: " << total_bytes << std::endl; std::cerr << "Status: " << std::endl; std::cerr << "- Code: " << status.error_code() << std::endl; std::cerr << "- Message: " << status.error_message() << std::endl; std::cerr << "- Details: " << status.error_details() << std::endl; } storage_stub_provider->ReportResult(storage.handle, status, context, run_end - run_start, total_bytes); watcher_->NotifyCompleted( OperationType::Write, work_tid, GetChannelId(storage.handle), context.peer(), parameters_.bucket, object, status, total_bytes, run_start, run_end - run_start, std::move(chunks)); if (status.ok()) { break; } else if (parameters_.trying) { // let's try the same if keep_trying is set and it failed continue; } else { return false; } } } return true; }