Result FuseClients::init()

in src/fuse/FuseClients.cc [50:176]


Result<Void> FuseClients::init(const flat::AppInfo &appInfo,
                               const String &mountPoint,
                               const String &tokenFile,
                               FuseConfig &fuseConfig) {
  config = &fuseConfig;

  fuseMount = appInfo.clusterId;
  XLOGF_IF(FATAL,
           fuseMount.size() >= 32,
           "FUSE only support mount name shorter than 32 characters, but {} got.",
           fuseMount);

  fuseMountpoint = Path(mountPoint).lexically_normal();

  if (fuseConfig.remount_prefix()) {
    fuseRemountPref = Path(*fuseConfig.remount_prefix()).lexically_normal();
  }

  if (const char *env_p = std::getenv("HF3FS_FUSE_TOKEN")) {
    XLOGF(INFO, "Use token from env var");
    fuseToken = std::string(env_p);
  } else {
    XLOGF(INFO, "Use token from config");
    auto tokenRes = loadFile(tokenFile);
    RETURN_ON_ERROR(tokenRes);
    fuseToken = folly::trimWhitespace(*tokenRes);
  }
  enableWritebackCache = fuseConfig.enable_writeback_cache();
  memsetBeforeRead = fuseConfig.memset_before_read();
  maxIdleThreads = fuseConfig.max_idle_threads();
  int logicalCores = std::thread::hardware_concurrency();
  if (logicalCores != 0) {
    maxThreads = std::min(fuseConfig.max_threads(), (logicalCores + 1) / 2);
  } else {
    maxThreads = fuseConfig.max_threads();
  }
  bufPool = net::RDMABufPool::create(fuseConfig.io_bufs().max_buf_size(), fuseConfig.rdma_buf_pool_size());

  iovs.init(fuseRemountPref.value_or(fuseMountpoint), fuseConfig.iov_limit());
  iors.init(fuseConfig.iov_limit());
  userConfig.init(fuseConfig);

  if (!client) {
    client = std::make_unique<net::Client>(fuseConfig.client());
    RETURN_ON_ERROR(client->start());
  }
  auto ctxCreator = [this](net::Address addr) { return client->serdeCtx(addr); };
  if (!mgmtdClient) {
    mgmtdClient = std::make_shared<client::MgmtdClientForClient>(
        appInfo.clusterId,
        std::make_unique<stubs::RealStubFactory<mgmtd::MgmtdServiceStub>>(ctxCreator),
        fuseConfig.mgmtd());
  }

  auto physicalHostnameRes = SysResource::hostname(/*physicalMachineName=*/true);
  RETURN_ON_ERROR(physicalHostnameRes);

  auto containerHostnameRes = SysResource::hostname(/*physicalMachineName=*/false);
  RETURN_ON_ERROR(containerHostnameRes);

  auto clientId = ClientId::random(*physicalHostnameRes);

  mgmtdClient->setClientSessionPayload({clientId.uuid.toHexString(),
                                        flat::NodeType::FUSE,
                                        flat::ClientSessionData::create(
                                            /*universalId=*/*physicalHostnameRes,
                                            /*description=*/fmt::format("fuse: {}", *containerHostnameRes),
                                            appInfo.serviceGroups,
                                            appInfo.releaseVersion),
                                        // TODO: use real user info
                                        flat::UserInfo{}});

  mgmtdClient->setConfigListener(ApplicationBase::updateConfig);

  folly::coro::blockingWait(mgmtdClient->start(&client->tpg().bgThreadPool().randomPick()));
  folly::coro::blockingWait(mgmtdClient->refreshRoutingInfo(/*force=*/false));
  RETURN_ON_ERROR(establishClientSession(*mgmtdClient));

  storageClient = storage::client::StorageClient::create(clientId, fuseConfig.storage(), *mgmtdClient);

  metaClient =
      std::make_shared<meta::client::MetaClient>(clientId,
                                                 fuseConfig.meta(),
                                                 std::make_unique<meta::client::MetaClient::StubFactory>(ctxCreator),
                                                 mgmtdClient,
                                                 storageClient,
                                                 true /* dynStripe */);
  metaClient->start(client->tpg().bgThreadPool());

  iojqs.reserve(3);
  iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_sizes().hi()));
  iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_size()));
  iojqs.emplace_back(new BoundedQueue<IoRingJob>(fuseConfig.io_jobq_sizes().lo()));

  jitter = fuseConfig.submit_wait_jitter();

  auto &tp = client->tpg().bgThreadPool();
  auto coros = fuseConfig.batch_io_coros();
  for (int i = 0; i < coros; ++i) {
    auto exec = &tp.get(i % tp.size());
    co_withCancellation(cancelIos.getToken(), ioRingWorker(i, coros)).scheduleOn(exec).start();
  }

  ioWatches.reserve(3);
  for (int i = 0; i < 3; ++i) {
    ioWatches.emplace_back(folly::partial(&FuseClients::watch, this, i));
  }

  periodicSyncWorker = std::make_unique<CoroutinesPool<InodeId>>(config->periodic_sync().worker());
  periodicSyncWorker->start(folly::partial(&FuseClients::periodicSync, this), tp);

  periodicSyncRunner = std::make_unique<BackgroundRunner>(&tp.pickNextFree());
  periodicSyncRunner->start("PeriodSync", folly::partial(&FuseClients::periodicSyncScan, this), [&]() {
    return config->periodic_sync().interval() * folly::Random::randDouble(0.7, 1.3);
  });

  onFuseConfigUpdated = fuseConfig.addCallbackGuard([&fuseConfig = fuseConfig, this] {
    memsetBeforeRead = fuseConfig.memset_before_read();
    jitter = std::chrono::duration_cast<std::chrono::nanoseconds>(fuseConfig.submit_wait_jitter());
  });

  notifyInvalExec =
      std::make_unique<folly::IOThreadPoolExecutor>(fuseConfig.notify_inval_threads(),
                                                    std::make_shared<folly::NamedThreadFactory>("NotifyInvalThread"));

  return Void{};
}