e2e-examples/gcs/benchmark/channel_policy.cc (283 lines of code) (raw):
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "channel_policy.h"
#include <unordered_map>
#include "absl/synchronization/mutex.h"
#include "channel_poller.h"
class ConstChannelPool : public StorageStubProvider {
public:
ConstChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator)
: channel_creator_(channel_creator) {
channel_ = channel_creator();
channel_poller_.reset(new ChannelPoller(channel_));
}
~ConstChannelPool() {
// Resetting channel_ first so that channel can be destroyed when
// channel_poller is destroyed.
channel_.reset();
}
StorageStubProvider::StubHolder GetStorageStub() override {
StorageStubProvider::StubHolder holder = {
google::storage::v2::Storage::NewStub(channel_), (void*)channel_.get()};
return holder;
}
void ReportResult(void* handle, const grpc::Status& status,
const grpc::ClientContext& context,
absl::Duration elapsed_time, int64_t bytes) override {
if (status.error_code() == grpc::StatusCode::CANCELLED) {
channel_ = channel_creator_();
}
}
private:
std::function<std::shared_ptr<grpc::Channel>()> channel_creator_;
std::shared_ptr<grpc::Channel> channel_;
std::unique_ptr<ChannelPoller> channel_poller_;
};
std::shared_ptr<StorageStubProvider> CreateConstChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator) {
return std::make_shared<ConstChannelPool>(channel_creator);
}
class CreateNewChannelStubProvider : public StorageStubProvider {
public:
CreateNewChannelStubProvider(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator)
: channel_creator_(channel_creator) {}
StorageStubProvider::StubHolder GetStorageStub() override {
auto channel = channel_creator_();
StorageStubProvider::StubHolder holder = {
google::storage::v2::Storage::NewStub(channel), (void*)channel.get()};
return holder;
}
void ReportResult(void* handle, const grpc::Status& status,
const grpc::ClientContext& context,
absl::Duration elapsed_time, int64_t bytes) override {}
private:
std::function<std::shared_ptr<grpc::Channel>()> channel_creator_;
};
std::shared_ptr<StorageStubProvider> CreateCreateNewChannelStubProvider(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator) {
return std::make_shared<CreateNewChannelStubProvider>(channel_creator);
}
class RoundRobinChannelPool : public StorageStubProvider {
public:
RoundRobinChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator,
int size) {
channel_creator_ = channel_creator;
for (int i = 0; i < size; i++) {
channels_.push_back(channel_creator());
}
}
StorageStubProvider::StubHolder GetStorageStub() override {
absl::MutexLock l(&lock_);
cursor_ = (cursor_ + 1) % channels_.size();
StorageStubProvider::StubHolder holder = {
google::storage::v2::Storage::NewStub(channels_[cursor_]),
(void*)channels_[cursor_].get()};
return holder;
}
void ReportResult(void* handle, const grpc::Status& status,
const grpc::ClientContext& context,
absl::Duration elapsed_time, int64_t bytes) override {
if (status.error_code() == grpc::StatusCode::CANCELLED ||
status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
auto i = std::find_if(channels_.begin(), channels_.end(),
[handle](std::shared_ptr<grpc::Channel> val) {
return (void*)val.get() == handle;
});
if (i != channels_.end()) {
std::cout << "Evict the channel (peer=" << context.peer()
<< ") due to error:" << status.error_code() << std::endl;
*i = channel_creator_();
}
}
}
private:
absl::Mutex lock_;
std::function<std::shared_ptr<grpc::Channel>()> channel_creator_;
std::vector<std::shared_ptr<grpc::Channel>> channels_;
int cursor_ = 0;
};
std::shared_ptr<StorageStubProvider> CreateRoundRobinChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator, int size) {
return std::make_shared<RoundRobinChannelPool>(channel_creator, size);
}
class RoundRobinPlusChannelPool : public StorageStubProvider {
public:
RoundRobinPlusChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator,
int size) {
channel_creator_ = channel_creator;
for (int i = 0; i < size; i++) {
channel_states_.push_back(ChannelState{channel_creator(), 0});
}
}
StorageStubProvider::StubHolder GetStorageStub() override {
absl::MutexLock l(&lock_);
// Finds the channel with the least number of use.
auto least = channel_states_.begin();
for (auto i = channel_states_.begin(); i != channel_states_.end(); ++i) {
if (i->in_use_count < least->in_use_count) {
least = i;
}
}
// Increases in-use count for the channel to be returned.
least->in_use_count += 1;
StorageStubProvider::StubHolder holder = {
google::storage::v2::Storage::NewStub(least->channel),
(void*)least->channel.get()};
return holder;
}
void ReportResult(void* handle, const grpc::Status& status,
const grpc::ClientContext& context,
absl::Duration elapsed_time, int64_t bytes) override {
absl::MutexLock l(&lock_);
// Decreases in-use count for the channel
auto i = std::find_if(channel_states_.begin(), channel_states_.end(),
[handle](const ChannelState& val) {
return (void*)val.channel.get() == handle;
});
if (i != channel_states_.end()) {
i->in_use_count -= 1;
}
// If the error indicates that the channel is hopeless,
// replace it with the newly created one.
if (status.error_code() == grpc::StatusCode::CANCELLED ||
status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
auto i = std::find_if(channel_states_.begin(), channel_states_.end(),
[handle](const ChannelState& val) {
return (void*)val.channel.get() == handle;
});
if (i != channel_states_.end()) {
std::cerr << "Evict the channel (peer=" << context.peer()
<< ") due to error:" << status.error_code() << std::endl;
i->channel = channel_creator_();
i->in_use_count = 0;
}
}
}
private:
absl::Mutex lock_;
std::function<std::shared_ptr<grpc::Channel>()> channel_creator_;
struct ChannelState {
std::shared_ptr<grpc::Channel> channel;
int32_t in_use_count;
};
std::vector<ChannelState> channel_states_;
};
std::shared_ptr<StorageStubProvider> CreateRoundRobinPlusChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator, int size) {
return std::make_shared<RoundRobinPlusChannelPool>(channel_creator, size);
}
class SmartRoundRobinChannelPool : public StorageStubProvider {
public:
SmartRoundRobinChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator,
int size) {
channel_creator_ = channel_creator;
for (int i = 0; i < size; i++) {
channels_.push_back(channel_creator());
}
InitChannelStateMap();
}
StorageStubProvider::StubHolder GetStorageStub() override {
absl::MutexLock l(&lock_);
cursor_ = (cursor_ + 1) % channels_.size();
StorageStubProvider::StubHolder holder = {
google::storage::v2::Storage::NewStub(channels_[cursor_]),
(void*)channels_[cursor_].get()};
return holder;
}
void ReportResult(void* handle, const grpc::Status& status,
const grpc::ClientContext& context,
absl::Duration elapsed_time, int64_t bytes) override {
absl::MutexLock l(&lock_);
// if error, evict it right away.
if (!status.ok()) {
if (status.error_code() == grpc::CANCELLED ||
status.error_code() == grpc::DEADLINE_EXCEEDED) {
auto i = std::find_if(channels_.begin(), channels_.end(),
[handle](std::shared_ptr<grpc::Channel> val) {
return (void*)val.get() == handle;
});
if (i != channels_.end()) {
std::cout << "Evict the channel (peer=" << context.peer()
<< ") due to error:" << status.error_code() << std::endl;
*i = channel_creator_();
}
return;
} else {
return;
}
}
// Update the state of the corresponding channel.
auto iter = channep_state_map_.find(handle);
if (iter != channep_state_map_.end()) {
iter->second.count += 1;
iter->second.total_bytes += bytes;
iter->second.total_time += elapsed_time;
iter->second.peer = context.peer();
}
// once the update counter exceeds the threadhold, it evaluates
// the last performer to be evicted.
score_count += 1;
if (score_count > (int)channep_state_map_.size() * 3) {
void* least_key = 0;
int valid_count = 0;
int64_t least_score = 0xffffffffffffl;
std::string least_peer;
int64_t best_score = 0;
for (auto s : channep_state_map_) {
if (s.second.count == 0) {
continue;
}
valid_count += 1;
int64_t score = s.second.total_bytes /
absl::ToInt64Milliseconds(s.second.total_time);
if (score < least_score) {
least_key = s.first;
least_score = score;
least_peer = s.second.peer;
}
if (score > best_score) {
best_score = score;
}
}
if (valid_count < int(channep_state_map_.size()) / 2) {
std::cerr << "No quorum: " << valid_count << " of "
<< channep_state_map_.size() / 2 << std::endl;
} else if (least_score >= best_score / 3) {
std::cerr << "No least to evict: least_score " << least_score
<< ", best_score: " << best_score << std::endl;
score_count = 0;
InitChannelStateMap();
} else {
auto i = std::find_if(channels_.begin(), channels_.end(),
[least_key](std::shared_ptr<grpc::Channel> val) {
return (void*)val.get() == least_key;
});
if (i != channels_.end()) {
std::cout << "Evict the channel (peer=" << least_peer
<< ") because it underperformed score: " << least_score
<< ", best_score: " << best_score << std::endl;
*i = channel_creator_();
}
score_count = 0;
InitChannelStateMap();
}
}
}
private:
void InitChannelStateMap() {
channep_state_map_.clear();
for (auto c : channels_) {
channep_state_map_[(void*)c.get()] =
ChannelState{0, 0, absl::Duration(), ""};
}
}
private:
absl::Mutex lock_;
std::function<std::shared_ptr<grpc::Channel>()> channel_creator_;
std::vector<std::shared_ptr<grpc::Channel>> channels_;
int cursor_ = 0;
struct ChannelState {
int count;
int64_t total_bytes;
absl::Duration total_time;
std::string peer;
};
std::unordered_map<void*, ChannelState> channep_state_map_;
int score_count = 0;
};
std::shared_ptr<StorageStubProvider> CreateSmartRoundRobinChannelPool(
std::function<std::shared_ptr<grpc::Channel>()> channel_creator, int size) {
return std::make_shared<SmartRoundRobinChannelPool>(channel_creator, size);
}