in src/fuse/FuseClients.cc [50:176]
Result<Void> FuseClients::init(const flat::AppInfo &appInfo,
const String &mountPoint,
const String &tokenFile,
FuseConfig &fuseConfig) {
config = &fuseConfig;
fuseMount = appInfo.clusterId;
XLOGF_IF(FATAL,
fuseMount.size() >= 32,
"FUSE only support mount name shorter than 32 characters, but {} got.",
fuseMount);
fuseMountpoint = Path(mountPoint).lexically_normal();
if (fuseConfig.remount_prefix()) {
fuseRemountPref = Path(*fuseConfig.remount_prefix()).lexically_normal();
}
if (const char *env_p = std::getenv("HF3FS_FUSE_TOKEN")) {
XLOGF(INFO, "Use token from env var");
fuseToken = std::string(env_p);
} else {
XLOGF(INFO, "Use token from config");
auto tokenRes = loadFile(tokenFile);
RETURN_ON_ERROR(tokenRes);
fuseToken = folly::trimWhitespace(*tokenRes);
}
enableWritebackCache = fuseConfig.enable_writeback_cache();
memsetBeforeRead = fuseConfig.memset_before_read();
maxIdleThreads = fuseConfig.max_idle_threads();
int logicalCores = std::thread::hardware_concurrency();
if (logicalCores != 0) {
maxThreads = std::min(fuseConfig.max_threads(), (logicalCores + 1) / 2);
} else {
maxThreads = fuseConfig.max_threads();
}
bufPool = net::RDMABufPool::create(fuseConfig.io_bufs().max_buf_size(), fuseConfig.rdma_buf_pool_size());
iovs.init(fuseRemountPref.value_or(fuseMountpoint), fuseConfig.iov_limit());
iors.init(fuseConfig.iov_limit());
userConfig.init(fuseConfig);
if (!client) {
client = std::make_unique<net::Client>(fuseConfig.client());
RETURN_ON_ERROR(client->start());
}
auto ctxCreator = [this](net::Address addr) { return client->serdeCtx(addr); };
if (!mgmtdClient) {
mgmtdClient = std::make_shared<client::MgmtdClientForClient>(
appInfo.clusterId,
std::make_unique<stubs::RealStubFactory<mgmtd::MgmtdServiceStub>>(ctxCreator),
fuseConfig.mgmtd());
}
auto physicalHostnameRes = SysResource::hostname(/*physicalMachineName=*/true);
RETURN_ON_ERROR(physicalHostnameRes);
auto containerHostnameRes = SysResource::hostname(/*physicalMachineName=*/false);
RETURN_ON_ERROR(containerHostnameRes);
auto clientId = ClientId::random(*physicalHostnameRes);
mgmtdClient->setClientSessionPayload({clientId.uuid.toHexString(),
flat::NodeType::FUSE,
flat::ClientSessionData::create(
/*universalId=*/*physicalHostnameRes,
/*description=*/fmt::format("fuse: {}", *containerHostnameRes),
appInfo.serviceGroups,
appInfo.releaseVersion),
// TODO: use real user info
flat::UserInfo{}});
mgmtdClient->setConfigListener(ApplicationBase::updateConfig);
folly::coro::blockingWait(mgmtdClient->start(&client->tpg().bgThreadPool().randomPick()));
folly::coro::blockingWait(mgmtdClient->refreshRoutingInfo(/*force=*/false));
RETURN_ON_ERROR(establishClientSession(*mgmtdClient));
storageClient = storage::client::StorageClient::create(clientId, fuseConfig.storage(), *mgmtdClient);
metaClient =
std::make_shared<meta::client::MetaClient>(clientId,
fuseConfig.meta(),
std::make_unique<meta::client::MetaClient::StubFactory>(ctxCreator),
mgmtdClient,
storageClient,
true /* dynStripe */);
metaClient->start(client->tpg().bgThreadPool());
iojqs.reserve(3);
iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_sizes().hi()));
iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_size()));
iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_sizes().lo()));
jitter = fuseConfig.submit_wait_jitter();
auto &tp = client->tpg().bgThreadPool();
auto coros = fuseConfig.batch_io_coros();
for (int i = 0; i < coros; ++i) {
auto exec = &tp.get(i % tp.size());
co_withCancellation(cancelIos.getToken(), ioRingWorker(i, coros)).scheduleOn(exec).start();
}
ioWatches.reserve(3);
for (int i = 0; i < 3; ++i) {
ioWatches.emplace_back(folly::partial(&FuseClients::watch, this, i));
}
periodicSyncWorker = std::make_unique<CoroutinesPool<InodeId>>(config->periodic_sync().worker());
periodicSyncWorker->start(folly::partial(&FuseClients::periodicSync, this), tp);
periodicSyncRunner = std::make_unique<BackgroundRunner>(&tp.pickNextFree());
periodicSyncRunner->start("PeriodSync", folly::partial(&FuseClients::periodicSyncScan, this), [&]() {
return config->periodic_sync().interval() * folly::Random::randDouble(0.7, 1.3);
});
onFuseConfigUpdated = fuseConfig.addCallbackGuard([&fuseConfig = fuseConfig, this] {
memsetBeforeRead = fuseConfig.memset_before_read();
jitter = std::chrono::duration_cast<std::chrono::nanoseconds>(fuseConfig.submit_wait_jitter());
});
notifyInvalExec =
std::make_unique<folly::IOThreadPoolExecutor>(fuseConfig.notify_inval_threads(),
std::make_shared<folly::NamedThreadFactory>("NotifyInvalThread"));
return Void{};
}