cachelib/navy/scheduler/ThreadPoolJobScheduler.cpp (229 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cachelib/navy/scheduler/ThreadPoolJobScheduler.h"
#include <folly/Format.h>
#include <folly/logging/xlog.h>
#include <folly/system/ThreadName.h>
#include <cassert>
#include "cachelib/common/Utils.h"
namespace facebook {
namespace cachelib {
namespace navy {
std::unique_ptr<JobScheduler> createOrderedThreadPoolJobScheduler(
unsigned int readerThreads,
unsigned int writerThreads,
unsigned int reqOrderShardPower) {
return std::make_unique<OrderedThreadPoolJobScheduler>(
readerThreads, writerThreads, reqOrderShardPower);
}
ThreadPoolExecutor::ThreadPoolExecutor(uint32_t numThreads,
folly::StringPiece name)
: name_{name}, queues_(numThreads) {
XDCHECK_GT(numThreads, 0u);
workers_.reserve(numThreads);
for (uint32_t i = 0; i < numThreads; i++) {
queues_[i] = std::make_unique<JobQueue>();
workers_.emplace_back(
[&q = queues_[i],
threadName = folly::sformat("navy_{}_{}", name.subpiece(0, 6), i)] {
folly::setThreadName(threadName);
q->process();
});
}
}
void ThreadPoolExecutor::enqueue(Job job,
folly::StringPiece name,
JobQueue::QueuePos pos) {
auto index = nextQueue_.fetch_add(1, std::memory_order_relaxed);
queues_[index % queues_.size()]->enqueue(std::move(job), name, pos);
}
void ThreadPoolExecutor::enqueueWithKey(Job job,
folly::StringPiece name,
JobQueue::QueuePos pos,
uint64_t key) {
queues_[key % queues_.size()]->enqueue(std::move(job), name, pos);
}
uint64_t ThreadPoolExecutor::finish() {
uint64_t ec = 0;
for (auto& q : queues_) {
ec += q->finish();
}
return ec;
}
void ThreadPoolExecutor::join() {
for (auto& q : queues_) {
q->requestStop();
}
XLOGF(INFO, "JobScheduler: join threads for {}", name_);
for (auto& t : workers_) {
t.join();
}
XLOGF(INFO, "JobScheduler: join threads for {} successful", name_);
workers_.clear();
}
ThreadPoolExecutor::Stats ThreadPoolExecutor::getStats() const {
Stats stats;
for (const auto& q : queues_) {
auto js = q->getStats();
stats.jobsDone += js.jobsDone;
stats.jobsHighReschedule += js.jobsHighReschedule;
stats.reschedules += js.reschedules;
stats.maxQueueLen = std::max(stats.maxQueueLen, js.maxQueueLen);
stats.maxPendingJobs += js.maxQueueLen;
}
return stats;
}
ThreadPoolJobScheduler::ThreadPoolJobScheduler(uint32_t readerThreads,
uint32_t writerThreads)
: reader_(readerThreads, "reader_pool"),
writer_(writerThreads, "writer_pool") {}
void ThreadPoolJobScheduler::enqueue(Job job,
folly::StringPiece name,
JobType type) {
switch (type) {
case JobType::Read:
reader_.enqueue(std::move(job), name, JobQueue::QueuePos::Back);
break;
case JobType::Write:
writer_.enqueue(std::move(job), name, JobQueue::QueuePos::Back);
break;
case JobType::Reclaim:
writer_.enqueue(std::move(job), name, JobQueue::QueuePos::Front);
break;
case JobType::Flush:
writer_.enqueue(std::move(job), name, JobQueue::QueuePos::Front);
break;
default:
XLOGF(ERR,
"JobScheduler: unrecognized job type: {}",
static_cast<uint32_t>(type));
XDCHECK(false);
}
}
void ThreadPoolJobScheduler::enqueueWithKey(Job job,
folly::StringPiece name,
JobType type,
uint64_t key) {
switch (type) {
case JobType::Read:
reader_.enqueueWithKey(std::move(job), name, JobQueue::QueuePos::Back, key);
break;
case JobType::Write:
writer_.enqueueWithKey(std::move(job), name, JobQueue::QueuePos::Back, key);
break;
case JobType::Reclaim:
writer_.enqueueWithKey(std::move(job), name, JobQueue::QueuePos::Front,
key);
break;
case JobType::Flush:
writer_.enqueueWithKey(std::move(job), name, JobQueue::QueuePos::Front,
key);
break;
default:
XLOGF(ERR,
"JobScheduler: unrecognized job type: {}",
static_cast<uint32_t>(type));
XDCHECK(false);
}
}
void ThreadPoolJobScheduler::join() {
reader_.join();
writer_.join();
}
void ThreadPoolJobScheduler::finish() {
uint64_t enqueueTotalCount = 0;
while (true) {
const uint64_t ec = reader_.finish() + writer_.finish();
// After iterating over queues, we can't guarantee that any of finished jobs
// didn't enqueue another job in a queue that we think "finished". We finish
// queues again and again until no more jobs scheduled.
if (ec == enqueueTotalCount) {
break;
}
enqueueTotalCount = ec;
}
}
void ThreadPoolJobScheduler::getCounters(const CounterVisitor& visitor) const {
auto getStats = [&visitor](ThreadPoolExecutor::Stats stats,
folly::StringPiece name) {
const std::string maxQueueLen =
folly::sformat("navy_{}_max_queue_len", name);
const std::string reschedules = folly::sformat("navy_{}_reschedules", name);
const std::string highReschedules =
folly::sformat("navy_{}_jobs_high_reschedule", name);
const std::string jobsDone = folly::sformat("navy_{}_jobs_done", name);
const std::string maxPendingJobs =
folly::sformat("navy_max_{}_pending_jobs", name);
visitor(maxQueueLen, stats.maxQueueLen);
visitor(reschedules, stats.reschedules);
visitor(highReschedules, stats.jobsHighReschedule);
visitor(jobsDone, stats.jobsDone);
visitor(maxPendingJobs, stats.maxPendingJobs);
};
getStats(reader_.getStats(), reader_.getName());
getStats(writer_.getStats(), writer_.getName());
}
namespace {
constexpr size_t numShards(size_t power) { return 1ULL << power; }
} // namespace
OrderedThreadPoolJobScheduler::OrderedThreadPoolJobScheduler(
size_t readerThreads, size_t writerThreads, size_t numShardsPower)
: mutexes_(numShards(numShardsPower)),
pendingJobs_(numShards(numShardsPower)),
shouldSpool_(numShards(numShardsPower), false),
numShardsPower_(numShardsPower),
scheduler_(readerThreads, writerThreads) {}
void OrderedThreadPoolJobScheduler::enqueueWithKey(Job job,
folly::StringPiece name,
JobType type,
uint64_t key) {
const auto shard = key % numShards(numShardsPower_);
JobParams params{std::move(job), type, name, key};
std::lock_guard<std::mutex> l(mutexes_[shard]);
if (shouldSpool_[shard]) {
// add to the pending jobs since there is already a job for this key
pendingJobs_[shard].emplace_back(std::move(params));
numSpooled_.inc();
currSpooled_.inc();
} else {
shouldSpool_[shard] = true;
scheduleJobLocked(std::move(params), shard);
}
}
void OrderedThreadPoolJobScheduler::scheduleJobLocked(JobParams params,
uint64_t shard) {
scheduler_.enqueueWithKey(
[this, j = std::move(params.job), shard]() mutable {
auto ret = j();
if (ret == JobExitCode::Done) {
scheduleNextJob(shard);
} else {
XDCHECK_EQ(ret, JobExitCode::Reschedule);
}
return ret;
},
params.name,
params.type,
params.key);
}
void OrderedThreadPoolJobScheduler::scheduleNextJob(uint64_t shard) {
std::lock_guard<std::mutex> l(mutexes_[shard]);
if (pendingJobs_[shard].empty()) {
shouldSpool_[shard] = false;
return;
}
currSpooled_.dec();
scheduleJobLocked(std::move(pendingJobs_[shard].front()), shard);
pendingJobs_[shard].pop_front();
}
void OrderedThreadPoolJobScheduler::enqueue(Job job,
folly::StringPiece name,
JobType type) {
scheduler_.enqueue(std::move(job), name, type);
}
void OrderedThreadPoolJobScheduler::finish() {
scheduler_.finish();
XDCHECK_EQ(currSpooled_.get(), 0ULL);
}
void OrderedThreadPoolJobScheduler::getCounters(const CounterVisitor& v) const {
scheduler_.getCounters(v);
v("navy_req_order_spooled", numSpooled_.get());
v("navy_req_order_curr_spool_size", currSpooled_.get());
}
} // namespace navy
} // namespace cachelib
} // namespace facebook