in src/fuse/FuseClients.cc [218:367]
CoTask<void> FuseClients::ioRingWorker(int i, int ths) {
// a worker thread has its own priority, but it can also execute jobs from queues with a higher priority
// checkHigher is used to make sure the job queue with the thread's own priority doesn't starve
bool checkHigher = true;
while (true) {
auto res = co_await folly::coro::co_awaitTry([this, &checkHigher, i, ths]() -> CoTask<void> {
IoRingJob job;
auto hiThs = config->io_worker_coros().hi(), loThs = config->io_worker_coros().lo();
auto prio = i < hiThs ? 0 : i < (ths - loThs) ? 1 : 2;
if (!config->enable_priority()) {
job = co_await iojqs[prio]->co_dequeue();
} else {
bool gotJob = false;
// if checkHigher, dequeue from a higher job queue if it is full
while (!gotJob) {
if (checkHigher) {
for (int nprio = 0; nprio < prio; ++nprio) {
if (iojqs[nprio]->full()) {
auto dres = iojqs[nprio]->try_dequeue();
if (dres) {
// got a job from higher priority queue, next time pick a same priority job unless the queue is empty
checkHigher = false;
gotJob = true;
job = std::move(*dres);
break;
}
}
}
if (gotJob) {
break;
}
}
// if checkHigher, check from higher prio to lower; otherwise, reverse the checking direction
for (int nprio = checkHigher ? 0 : prio; checkHigher ? nprio <= prio : nprio >= 0;
nprio += checkHigher ? 1 : -1) {
auto [sres, dres] =
co_await folly::coro::collectAnyNoDiscard(folly::coro::sleep(config->io_job_deq_timeout()),
iojqs[nprio]->co_dequeue());
if (dres.hasValue()) {
// if the job is the thread's own priority, next time it can check from higher priority queues
if (!checkHigher && nprio == prio) {
checkHigher = true;
}
gotJob = true;
job = std::move(*dres);
break;
} else if (sres.hasValue()) {
continue;
} else {
dres.throwUnlessValue();
}
}
}
}
while (true) {
auto lookupFiles =
[this](std::vector<std::shared_ptr<RcInode>> &ins, const IoArgs *args, const IoSqe *sqes, int sqec) {
auto lastIid = 0ull;
std::lock_guard lock(inodesMutex);
for (int i = 0; i < sqec; ++i) {
auto idn = args[sqes[i].index].fileIid;
if (i && idn == lastIid) {
ins.emplace_back(ins.back());
continue;
}
lastIid = idn;
auto iid = meta::InodeId(idn);
auto it = inodes.find(iid);
ins.push_back(it == inodes.end() ? (std::shared_ptr<RcInode>()) : it->second);
}
};
auto lookupBufs =
[this](std::vector<Result<lib::ShmBufForIO>> &bufs, const IoArgs *args, const IoSqe *sqe, int sqec) {
auto lastId = Uuid::zero();
std::shared_ptr<lib::ShmBuf> lastShm;
std::lock_guard lock(iovs.shmLock);
for (int i = 0; i < sqec; ++i) {
auto &arg = args[sqe[i].index];
Uuid id;
memcpy(id.data, arg.bufId, sizeof(id.data));
std::shared_ptr<lib::ShmBuf> shm;
if (i && id == lastId) {
shm = lastShm;
} else {
auto it = iovs.shmsById.find(id);
if (it == iovs.shmsById.end()) {
bufs.emplace_back(makeError(StatusCode::kInvalidArg, "buf id not found"));
continue;
}
auto iovd = it->second;
shm = iovs.iovs->table[iovd].load();
if (!shm) {
bufs.emplace_back(makeError(StatusCode::kInvalidArg, "buf id not found"));
continue;
} else if (shm->size < arg.bufOff + arg.ioLen) {
bufs.emplace_back(makeError(StatusCode::kInvalidArg, "invalid buf off and/or io len"));
continue;
}
lastId = id;
lastShm = shm;
}
bufs.emplace_back(lib::ShmBufForIO(std::move(shm), arg.bufOff));
}
};
co_await job.ior->process(job.sqeProcTail,
job.toProc,
*storageClient,
config->storage_io(),
userConfig,
std::move(lookupFiles),
std::move(lookupBufs));
if (iojqs[0]->full() || job.ior->priority != prio) {
sem_post(iors.sems[job.ior->priority].get()); // wake the watchers
} else {
auto jobs = job.ior->jobsToProc(1);
if (!jobs.empty()) {
job = jobs.front();
if (!iojqs[0]->try_enqueue(job)) {
continue;
}
}
}
break;
}
}());
if (UNLIKELY(res.hasException())) {
XLOGF(INFO, "io worker #{} cancelled", i);
if (res.hasException<OperationCancelled>()) {
break;
} else {
XLOGF(FATAL, "got exception in io worker #{}", i);
}
}
}
}