e2e-examples/gcs/sample/channel_manager.cc (62 lines of code) (raw):
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "channel_manager.h"
#include <cassert>
ChannelHandle::ChannelHandle(ChannelManager* manager,
std::shared_ptr<grpc::Channel> channel)
: manager_(manager),
channel_(std::move(channel)),
on_rpc_done_called_(false) {}
ChannelHandle::~ChannelHandle() {
manager_->OnChannelHandleDestroyed(channel_.get());
}
void ChannelHandle::OnRpcDone(grpc::Status status) {
assert(!on_rpc_done_called_);
manager_->OnRpcDone(channel_.get(), status);
on_rpc_done_called_ = true;
}
ChannelManager::ChannelManager(
size_t max_channel_count,
std::function<std::shared_ptr<grpc::Channel>()> channel_creator)
: max_channel_count_(max_channel_count),
channel_handle_count_(0),
channel_creator_(channel_creator) {}
ChannelManager::~ChannelManager() {
absl::MutexLock l(&lock_);
assert(channel_handle_count_ == 0);
}
ChannelHandle ChannelManager::GetHandle() {
absl::MutexLock l(&lock_);
// Finds the channel with the least in-use count.
ChannelState* candidate = nullptr;
size_t min_in_use_count = SIZE_MAX;
for (auto& cs : channel_states_) {
if (cs.in_use_count < min_in_use_count) {
candidate = &cs;
min_in_use_count = cs.in_use_count;
}
}
// Should we need to create a new channel? It would create
// a new one when there is no idle channel and a pool is not full yet.
// This lazy channel creation is helpful to avoid underutilized channels.
if (candidate == nullptr || (candidate->in_use_count > 0 &&
channel_states_.size() < max_channel_count_)) {
channel_states_.push_back(ChannelState{channel_creator_(), 0});
candidate = &channel_states_.back();
}
candidate->in_use_count += 1;
channel_handle_count_ += 1;
return ChannelHandle(this, candidate->channel);
}
void ChannelManager::OnRpcDone(grpc::Channel* channel, grpc::Status status) {
absl::MutexLock l(&lock_);
for (auto& cs : channel_states_) {
if (cs.channel.get() == channel) {
cs.in_use_count -= 1;
if (status.error_code() == grpc::StatusCode::CANCELLED ||
status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
// Replace the unhealthy channel with the newly created one.
cs.channel == channel_creator_();
cs.in_use_count = 0;
}
break;
}
}
}
void ChannelManager::OnChannelHandleDestroyed(grpc::Channel* channel) {
absl::MutexLock l(&lock_);
channel_handle_count_ -= 1;
}