src/fuse/FuseClients.cc (375 lines of code) (raw):
#include "FuseClients.h"
#include <folly/Random.h>
#include <folly/ScopeGuard.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/functional/Partial.h>
#include <folly/logging/xlog.h>
#include <fuse3/fuse_lowlevel.h>
#include <memory>
#include <thread>
#include <utility>
#include "common/app/ApplicationBase.h"
#include "common/monitor/Recorder.h"
#include "common/utils/BackgroundRunner.h"
#include "common/utils/Coroutine.h"
#include "common/utils/Duration.h"
#include "common/utils/FileUtils.h"
#include "common/utils/SysResource.h"
#include "fbs/meta/Common.h"
#include "fbs/mgmtd/Rpc.h"
#include "stubs/MetaService/MetaServiceStub.h"
#include "stubs/common/RealStubFactory.h"
#include "stubs/mgmtd/MgmtdServiceStub.h"
namespace hf3fs::fuse {
namespace {
monitor::ValueRecorder dirtyInodesCnt("fuse.dirty_inodes");
Result<Void> establishClientSession(client::IMgmtdClientForClient &mgmtdClient) {
return folly::coro::blockingWait([&]() -> CoTryTask<void> {
auto retryInterval = std::chrono::milliseconds(10);
constexpr auto maxRetryInterval = std::chrono::milliseconds(1000);
Result<Void> res = Void{};
for (int i = 0; i < 40; ++i) {
res = co_await mgmtdClient.extendClientSession();
if (res) break;
XLOGF(CRITICAL, "Try to establish client session failed: {}\nretryCount: {}", res.error(), i);
co_await folly::coro::sleep(retryInterval);
retryInterval = std::min(2 * retryInterval, maxRetryInterval);
}
co_return res;
}());
}
} // namespace
FuseClients::~FuseClients() { stop(); }
Result<Void> FuseClients::init(const flat::AppInfo &appInfo,
const String &mountPoint,
const String &tokenFile,
FuseConfig &fuseConfig) {
config = &fuseConfig;
fuseMount = appInfo.clusterId;
XLOGF_IF(FATAL,
fuseMount.size() >= 32,
"FUSE only support mount name shorter than 32 characters, but {} got.",
fuseMount);
fuseMountpoint = Path(mountPoint).lexically_normal();
if (fuseConfig.remount_prefix()) {
fuseRemountPref = Path(*fuseConfig.remount_prefix()).lexically_normal();
}
if (const char *env_p = std::getenv("HF3FS_FUSE_TOKEN")) {
XLOGF(INFO, "Use token from env var");
fuseToken = std::string(env_p);
} else {
XLOGF(INFO, "Use token from config");
auto tokenRes = loadFile(tokenFile);
RETURN_ON_ERROR(tokenRes);
fuseToken = folly::trimWhitespace(*tokenRes);
}
enableWritebackCache = fuseConfig.enable_writeback_cache();
memsetBeforeRead = fuseConfig.memset_before_read();
maxIdleThreads = fuseConfig.max_idle_threads();
int logicalCores = std::thread::hardware_concurrency();
if (logicalCores != 0) {
maxThreads = std::min(fuseConfig.max_threads(), (logicalCores + 1) / 2);
} else {
maxThreads = fuseConfig.max_threads();
}
bufPool = net::RDMABufPool::create(fuseConfig.io_bufs().max_buf_size(), fuseConfig.rdma_buf_pool_size());
iovs.init(fuseRemountPref.value_or(fuseMountpoint), fuseConfig.iov_limit());
iors.init(fuseConfig.iov_limit());
userConfig.init(fuseConfig);
if (!client) {
client = std::make_unique<net::Client>(fuseConfig.client());
RETURN_ON_ERROR(client->start());
}
auto ctxCreator = [this](net::Address addr) { return client->serdeCtx(addr); };
if (!mgmtdClient) {
mgmtdClient = std::make_shared<client::MgmtdClientForClient>(
appInfo.clusterId,
std::make_unique<stubs::RealStubFactory<mgmtd::MgmtdServiceStub>>(ctxCreator),
fuseConfig.mgmtd());
}
auto physicalHostnameRes = SysResource::hostname(/*physicalMachineName=*/true);
RETURN_ON_ERROR(physicalHostnameRes);
auto containerHostnameRes = SysResource::hostname(/*physicalMachineName=*/false);
RETURN_ON_ERROR(containerHostnameRes);
auto clientId = ClientId::random(*physicalHostnameRes);
mgmtdClient->setClientSessionPayload({clientId.uuid.toHexString(),
flat::NodeType::FUSE,
flat::ClientSessionData::create(
/*universalId=*/*physicalHostnameRes,
/*description=*/fmt::format("fuse: {}", *containerHostnameRes),
appInfo.serviceGroups,
appInfo.releaseVersion),
// TODO: use real user info
flat::UserInfo{}});
mgmtdClient->setConfigListener(ApplicationBase::updateConfig);
folly::coro::blockingWait(mgmtdClient->start(&client->tpg().bgThreadPool().randomPick()));
folly::coro::blockingWait(mgmtdClient->refreshRoutingInfo(/*force=*/false));
RETURN_ON_ERROR(establishClientSession(*mgmtdClient));
storageClient = storage::client::StorageClient::create(clientId, fuseConfig.storage(), *mgmtdClient);
metaClient =
std::make_shared<meta::client::MetaClient>(clientId,
fuseConfig.meta(),
std::make_unique<meta::client::MetaClient::StubFactory>(ctxCreator),
mgmtdClient,
storageClient,
true /* dynStripe */);
metaClient->start(client->tpg().bgThreadPool());
iojqs.reserve(3);
iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_sizes().hi()));
iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_size()));
iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_sizes().lo()));
jitter = fuseConfig.submit_wait_jitter();
auto &tp = client->tpg().bgThreadPool();
auto coros = fuseConfig.batch_io_coros();
for (int i = 0; i < coros; ++i) {
auto exec = &tp.get(i % tp.size());
co_withCancellation(cancelIos.getToken(), ioRingWorker(i, coros)).scheduleOn(exec).start();
}
ioWatches.reserve(3);
for (int i = 0; i < 3; ++i) {
ioWatches.emplace_back(folly::partial(&FuseClients::watch, this, i));
}
periodicSyncWorker = std::make_unique<CoroutinesPool<InodeId>>(config->periodic_sync().worker());
periodicSyncWorker->start(folly::partial(&FuseClients::periodicSync, this), tp);
periodicSyncRunner = std::make_unique<BackgroundRunner>(&tp.pickNextFree());
periodicSyncRunner->start("PeriodSync", folly::partial(&FuseClients::periodicSyncScan, this), [&]() {
return config->periodic_sync().interval() * folly::Random::randDouble(0.7, 1.3);
});
onFuseConfigUpdated = fuseConfig.addCallbackGuard([&fuseConfig = fuseConfig, this] {
memsetBeforeRead = fuseConfig.memset_before_read();
jitter = std::chrono::duration_cast<std::chrono::nanoseconds>(fuseConfig.submit_wait_jitter());
});
notifyInvalExec =
std::make_unique<folly::IOThreadPoolExecutor>(fuseConfig.notify_inval_threads(),
std::make_shared<folly::NamedThreadFactory>("NotifyInvalThread"));
return Void{};
}
void FuseClients::stop() {
if (notifyInvalExec) {
notifyInvalExec->stop();
notifyInvalExec.reset();
}
if (onFuseConfigUpdated) {
onFuseConfigUpdated.reset();
}
cancelIos.requestCancellation();
for (auto &t : ioWatches) {
t.request_stop();
}
if (periodicSyncRunner) {
folly::coro::blockingWait(periodicSyncRunner->stopAll());
periodicSyncRunner.reset();
}
if (periodicSyncWorker) {
periodicSyncWorker->stopAndJoin();
periodicSyncWorker.reset();
}
if (metaClient) {
metaClient->stop();
metaClient.reset();
}
if (storageClient) {
storageClient->stop();
storageClient.reset();
}
if (mgmtdClient) {
folly::coro::blockingWait(mgmtdClient->stop());
mgmtdClient.reset();
}
if (client) {
client->stopAndJoin();
client.reset();
}
}
CoTask<void> FuseClients::ioRingWorker(int i, int ths) {
// a worker thread has its own priority, but it can also execute jobs from queues with a higher priority
// checkHigher is used to make sure the job queue with the thread's own priority doesn't starve
bool checkHigher = true;
while (true) {
auto res = co_await folly::coro::co_awaitTry([this, &checkHigher, i, ths]() -> CoTask<void> {
IoRingJob job;
auto hiThs = config->io_worker_coros().hi(), loThs = config->io_worker_coros().lo();
auto prio = i < hiThs ? 0 : i < (ths - loThs) ? 1 : 2;
if (!config->enable_priority()) {
job = co_await iojqs[prio]->co_dequeue();
} else {
bool gotJob = false;
// if checkHigher, dequeue from a higher job queue if it is full
while (!gotJob) {
if (checkHigher) {
for (int nprio = 0; nprio < prio; ++nprio) {
if (iojqs[nprio]->full()) {
auto dres = iojqs[nprio]->try_dequeue();
if (dres) {
// got a job from higher priority queue, next time pick a same priority job unless the queue is empty
checkHigher = false;
gotJob = true;
job = std::move(*dres);
break;
}
}
}
if (gotJob) {
break;
}
}
// if checkHigher, check from higher prio to lower; otherwise, reverse the checking direction
for (int nprio = checkHigher ? 0 : prio; checkHigher ? nprio <= prio : nprio >= 0;
nprio += checkHigher ? 1 : -1) {
auto [sres, dres] =
co_await folly::coro::collectAnyNoDiscard(folly::coro::sleep(config->io_job_deq_timeout()),
iojqs[nprio]->co_dequeue());
if (dres.hasValue()) {
// if the job is the thread's own priority, next time it can check from higher priority queues
if (!checkHigher && nprio == prio) {
checkHigher = true;
}
gotJob = true;
job = std::move(*dres);
break;
} else if (sres.hasValue()) {
continue;
} else {
dres.throwUnlessValue();
}
}
}
}
while (true) {
auto lookupFiles =
[this](std::vector<std::shared_ptr<RcInode>> &ins, const IoArgs *args, const IoSqe *sqes, int sqec) {
auto lastIid = 0ull;
std::lock_guard lock(inodesMutex);
for (int i = 0; i < sqec; ++i) {
auto idn = args[sqes[i].index].fileIid;
if (i && idn == lastIid) {
ins.emplace_back(ins.back());
continue;
}
lastIid = idn;
auto iid = meta::InodeId(idn);
auto it = inodes.find(iid);
ins.push_back(it == inodes.end() ? (std::shared_ptr<RcInode>()) : it->second);
}
};
auto lookupBufs =
[this](std::vector<Result<lib::ShmBufForIO>> &bufs, const IoArgs *args, const IoSqe *sqe, int sqec) {
auto lastId = Uuid::zero();
std::shared_ptr<lib::ShmBuf> lastShm;
std::lock_guard lock(iovs.shmLock);
for (int i = 0; i < sqec; ++i) {
auto &arg = args[sqe[i].index];
Uuid id;
memcpy(id.data, arg.bufId, sizeof(id.data));
std::shared_ptr<lib::ShmBuf> shm;
if (i && id == lastId) {
shm = lastShm;
} else {
auto it = iovs.shmsById.find(id);
if (it == iovs.shmsById.end()) {
bufs.emplace_back(makeError(StatusCode::kInvalidArg, "buf id not found"));
continue;
}
auto iovd = it->second;
shm = iovs.iovs->table[iovd].load();
if (!shm) {
bufs.emplace_back(makeError(StatusCode::kInvalidArg, "buf id not found"));
continue;
} else if (shm->size < arg.bufOff + arg.ioLen) {
bufs.emplace_back(makeError(StatusCode::kInvalidArg, "invalid buf off and/or io len"));
continue;
}
lastId = id;
lastShm = shm;
}
bufs.emplace_back(lib::ShmBufForIO(std::move(shm), arg.bufOff));
}
};
co_await job.ior->process(job.sqeProcTail,
job.toProc,
*storageClient,
config->storage_io(),
userConfig,
std::move(lookupFiles),
std::move(lookupBufs));
if (iojqs[0]->full() || job.ior->priority != prio) {
sem_post(iors.sems[job.ior->priority].get()); // wake the watchers
} else {
auto jobs = job.ior->jobsToProc(1);
if (!jobs.empty()) {
job = jobs.front();
if (!iojqs[0]->try_enqueue(job)) {
continue;
}
}
}
break;
}
}());
if (UNLIKELY(res.hasException())) {
XLOGF(INFO, "io worker #{} cancelled", i);
if (res.hasException<OperationCancelled>()) {
break;
} else {
XLOGF(FATAL, "got exception in io worker #{}", i);
}
}
}
}
void FuseClients::watch(int prio, std::stop_token stop) {
while (!stop.stop_requested()) {
struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
continue;
}
auto nsec = ts.tv_nsec + jitter.load().count();
ts.tv_nsec = nsec % 1000000000;
ts.tv_sec += nsec / 1000000000;
if (sem_timedwait(iors.sems[prio].get(), &ts) < 0 && errno == ETIMEDOUT) {
continue;
}
auto gotJobs = false;
do {
gotJobs = false;
auto n = iors.ioRings->slots.nextAvail.load();
for (int i = 0; i < n; ++i) {
auto ior = iors.ioRings->table[i].load();
if (ior && ior->priority == prio) {
auto jobs = ior->jobsToProc(config->max_jobs_per_ioring());
for (auto &&job : jobs) {
gotJobs = true;
iojqs[prio]->enqueue(std::move(job));
}
}
}
} while (gotJobs); // loop till we found no more jobs and then block in the next iter
}
}
CoTask<void> FuseClients::periodicSyncScan() {
if (!config->periodic_sync().enable() || config->readonly()) {
co_return;
}
XLOGF(INFO, "periodicSyncScan run");
std::set<InodeId> dirty;
{
auto guard = dirtyInodes.lock();
auto limit = config->periodic_sync().limit();
dirtyInodesCnt.set(guard->size());
if (guard->size() <= limit) {
dirty = std::exchange(*guard, {});
} else {
XLOGF(WARN, "dirty inodes {} > limit {}", guard->size(), limit);
auto iter = guard->find(lastSynced);
while (dirty.size() < limit) {
if (iter == guard->end()) {
iter = guard->begin();
XLOGF_IF(FATAL, iter == guard->end(), "iter == guard->end() shouldn't happen");
} else {
auto inode = *iter;
lastSynced = inode;
iter = guard->erase(iter);
dirty.insert(inode);
}
}
}
}
for (auto inode : dirty) {
co_await periodicSyncWorker->enqueue(inode);
}
co_return;
}
} // namespace hf3fs::fuse