src/common/utils/DynamicCoroutinesPool.cc (75 lines of code) (raw):
#include "common/utils/DynamicCoroutinesPool.h"
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Invoke.h>
#include "common/monitor/Recorder.h"
namespace hf3fs {
namespace {
monitor::ValueRecorder currentCoroutinesNum{"common.coroutines_num.current", std::nullopt, false};
} // namespace
DynamicCoroutinesPool::DynamicCoroutinesPool(const Config &config, std::string_view name)
: config_(config),
queue_(config_.queue_size()),
guard_(config_.addCallbackGuard([&] { setCoroutinesNum(config_.coroutines_num()); })),
executor_(std::make_pair(config.threads_num(), config.threads_num()),
std::make_shared<folly::NamedThreadFactory>(name)),
coroutinesNumRecorder_(currentCoroutinesNum.getRecorderWithTag(monitor::instanceTagSet(std::string{name}))) {}
Result<Void> DynamicCoroutinesPool::start() { return setCoroutinesNum(config_.coroutines_num()); }
Result<Void> DynamicCoroutinesPool::stopAndJoin() {
if (stopped_.test_and_set()) {
XLOGF(DBG, "coroutines pool {} is already stopped.", fmt::ptr(this));
return Void{};
}
guard_->dismiss(); // disable coroutines num hot updated.
RETURN_AND_LOG_ON_ERROR(setCoroutinesNum(0));
afterCoroutineStop();
folly::coro::blockingWait(baton_);
executor_.join();
return Void{};
}
Result<Void> DynamicCoroutinesPool::setCoroutinesNum(uint32_t num) {
auto lock = std::unique_lock(mutex_);
if (executor_.numThreads() != config_.threads_num()) {
executor_.setNumThreads(config_.threads_num(), true);
}
if (num == coroutinesNum_) {
coroutinesNumRecorder_->set(*currentRunning_.lock());
return Void{};
}
XLOGF(INFO, "pool {} coroutines num {} -> {}", fmt::ptr(this), coroutinesNum_, num);
if (coroutinesNum_ < num) {
// increase coroutines num.
auto inc = num - coroutinesNum_;
for (; coroutinesNum_ < num; ++coroutinesNum_) {
run().scheduleOn(&executor_).start();
}
auto guard = currentRunning_.lock();
coroutinesNumRecorder_->set(*guard += inc);
} else {
// decrease coroutines num.
for (; coroutinesNum_ > num; --coroutinesNum_) {
queue_.enqueue(nullptr);
}
}
return Void{};
}
CoTask<void> DynamicCoroutinesPool::run() {
SCOPE_EXIT { afterCoroutineStop(); };
while (true) {
auto task = co_await queue_.co_dequeue();
if (task == nullptr) {
co_return;
}
auto result = co_await folly::coro::co_awaitTry(std::move(*task));
if (UNLIKELY(result.hasException())) {
XLOGF(FATAL, "DynamicCoroutinesPool has exception: {}", result.exception().what());
co_return;
}
}
}
void DynamicCoroutinesPool::afterCoroutineStop() {
auto guard = currentRunning_.lock();
auto now = --*guard;
coroutinesNumRecorder_->set(now);
if (now == 0) {
baton_.post();
}
}
} // namespace hf3fs