CoTask FuseClients::ioRingWorker()

in src/fuse/FuseClients.cc [218:367]


CoTask<void> FuseClients::ioRingWorker(int i, int ths) {
  // a worker thread has its own priority, but it can also execute jobs from queues with a higher priority
  // checkHigher is used to make sure the job queue with the thread's own priority doesn't starve
  bool checkHigher = true;

  while (true) {
    auto res = co_await folly::coro::co_awaitTry([this, &checkHigher, i, ths]() -> CoTask<void> {
      IoRingJob job;
      auto hiThs = config->io_worker_coros().hi(), loThs = config->io_worker_coros().lo();
      auto prio = i < hiThs ? 0 : i < (ths - loThs) ? 1 : 2;
      if (!config->enable_priority()) {
        job = co_await iojqs[prio]->co_dequeue();
      } else {
        bool gotJob = false;

        // if checkHigher, dequeue from a higher job queue if it is full
        while (!gotJob) {
          if (checkHigher) {
            for (int nprio = 0; nprio < prio; ++nprio) {
              if (iojqs[nprio]->full()) {
                auto dres = iojqs[nprio]->try_dequeue();
                if (dres) {
                  // got a job from higher priority queue, next time pick a same priority job unless the queue is empty
                  checkHigher = false;
                  gotJob = true;
                  job = std::move(*dres);
                  break;
                }
              }
            }

            if (gotJob) {
              break;
            }
          }

          // if checkHigher, check from higher prio to lower; otherwise, reverse the checking direction
          for (int nprio = checkHigher ? 0 : prio; checkHigher ? nprio <= prio : nprio >= 0;
               nprio += checkHigher ? 1 : -1) {
            auto [sres, dres] =
                co_await folly::coro::collectAnyNoDiscard(folly::coro::sleep(config->io_job_deq_timeout()),
                                                          iojqs[nprio]->co_dequeue());
            if (dres.hasValue()) {
              // if the job is the thread's own priority, next time it can check from higher priority queues
              if (!checkHigher && nprio == prio) {
                checkHigher = true;
              }
              gotJob = true;
              job = std::move(*dres);
              break;
            } else if (sres.hasValue()) {
              continue;
            } else {
              dres.throwUnlessValue();
            }
          }
        }
      }

      while (true) {
        auto lookupFiles =
            [this](std::vector<std::shared_ptr<RcInode>> &ins, const IoArgs *args, const IoSqe *sqes, int sqec) {
              auto lastIid = 0ull;

              std::lock_guard lock(inodesMutex);
              for (int i = 0; i < sqec; ++i) {
                auto idn = args[sqes[i].index].fileIid;
                if (i && idn == lastIid) {
                  ins.emplace_back(ins.back());
                  continue;
                }

                lastIid = idn;
                auto iid = meta::InodeId(idn);
                auto it = inodes.find(iid);
                ins.push_back(it == inodes.end() ? (std::shared_ptr<RcInode>()) : it->second);
              }
            };
        auto lookupBufs =
            [this](std::vector<Result<lib::ShmBufForIO>> &bufs, const IoArgs *args, const IoSqe *sqe, int sqec) {
              auto lastId = Uuid::zero();
              std::shared_ptr<lib::ShmBuf> lastShm;

              std::lock_guard lock(iovs.shmLock);
              for (int i = 0; i < sqec; ++i) {
                auto &arg = args[sqe[i].index];
                Uuid id;
                memcpy(id.data, arg.bufId, sizeof(id.data));

                std::shared_ptr<lib::ShmBuf> shm;
                if (i && id == lastId) {
                  shm = lastShm;
                } else {
                  auto it = iovs.shmsById.find(id);
                  if (it == iovs.shmsById.end()) {
                    bufs.emplace_back(makeError(StatusCode::kInvalidArg, "buf id not found"));
                    continue;
                  }

                  auto iovd = it->second;
                  shm = iovs.iovs->table[iovd].load();
                  if (!shm) {
                    bufs.emplace_back(makeError(StatusCode::kInvalidArg, "buf id not found"));
                    continue;
                  } else if (shm->size < arg.bufOff + arg.ioLen) {
                    bufs.emplace_back(makeError(StatusCode::kInvalidArg, "invalid buf off and/or io len"));
                    continue;
                  }

                  lastId = id;
                  lastShm = shm;
                }

                bufs.emplace_back(lib::ShmBufForIO(std::move(shm), arg.bufOff));
              }
            };

        co_await job.ior->process(job.sqeProcTail,
                                  job.toProc,
                                  *storageClient,
                                  config->storage_io(),
                                  userConfig,
                                  std::move(lookupFiles),
                                  std::move(lookupBufs));

        if (iojqs[0]->full() || job.ior->priority != prio) {
          sem_post(iors.sems[job.ior->priority].get());  // wake the watchers
        } else {
          auto jobs = job.ior->jobsToProc(1);
          if (!jobs.empty()) {
            job = jobs.front();
            if (!iojqs[0]->try_enqueue(job)) {
              continue;
            }
          }
        }

        break;
      }
    }());
    if (UNLIKELY(res.hasException())) {
      XLOGF(INFO, "io worker #{} cancelled", i);
      if (res.hasException<OperationCancelled>()) {
        break;
      } else {
        XLOGF(FATAL, "got exception in io worker #{}", i);
      }
    }
  }
}