CoTryTask SessionManager::ScanTask::run()

in src/meta/components/SessionManager.cc [78:186]


CoTryTask<size_t> SessionManager::ScanTask::run(SessionManager &manager) {
  XLOGF(DBG, "ScanTask-{} start", shard_);
  std::map<std::string, uint64_t> map;
  SCOPE_EXIT {
    for (const auto &[host, cnt] : map) {
      XLOGF(INFO, "SessionManager found {} sessions for dead clients {}", cnt, host);
    }
  };

  // get all active clients
  auto ts = UtcClock::now();
  auto active = co_await getActiveClients(*manager.mgmtd_, false, manager.config_.session_timeout());
  CO_RETURN_ON_ERROR(active);

  size_t total = 0;
  std::optional<FileSession> prev;
  while (true) {
    // scan sessions
    auto txn = manager.kvEngine_->createReadonlyTransaction();
    auto sessions = co_await kv::WithTransaction(kv::FDBRetryStrategy{})
                        .run(std::move(txn), [&](auto &txn) -> CoTryTask<std::vector<FileSession>> {
                          co_return co_await FileSession::scan(txn, shard_, prev);
                        });
    CO_RETURN_ON_ERROR(sessions);
    if (sessions->empty()) {
      break;
    }

    // filter dead sessions
    std::vector<FileSession> deadSessions;
    for (auto &session : *sessions) {
      if (prune_->sessions.rlock()->contains(session.sessionId)) {
        // need prune this session
        XLOGF(INFO, "Need prune session {}", session);
        prune_->sessions.wlock()->erase(session.sessionId);
      } else {
        // check client is active or not
        if (active->contains(session.clientId)) {
          continue;
        }
        if (session.timestamp + 1_min > ts) {
          // concurrent create session and scan
          auto now = UtcClock::now();
          XLOGF_IF(WARN, session.timestamp > now + 5_s, "Session timestamp {} > now {}", session.timestamp, now);
          continue;
        }
        XLOGF(WARN, "SessionManager found dead session {}", session);
      }
      deadSessions.push_back(session);
    }
    prev = sessions->back();

    // prune dead sessions
    if (manager.config_.sync_on_prune_session()) {
      for (auto &session : deadSessions) {
        co_await manager.closeWorkers_->enqueue(std::make_unique<CloseTask>(session));
      }
    } else {
      auto txn = manager.kvEngine_->createReadWriteTransaction();
      auto result =
          co_await kv::WithTransaction(kv::FDBRetryStrategy{}).run(std::move(txn), [&](auto &txn) -> CoTryTask<Void> {
            for (auto &session : deadSessions) {
              CO_RETURN_ON_ERROR(co_await session.remove(txn));
            }
            co_return Void{};
          });
      if (result.hasError()) {
        pruneFailed.addSample(deadSessions.size());
        XLOGF(ERR, "ScanTask-{} prune failed, error {}", shard_, result.error());
        CO_RETURN_ERROR(result);
      }
      total += deadSessions.size();
      pruned.addSample(deadSessions.size());
    }
  }

  auto finished = prune_->finished.fetch_add(1) + 1;
  if (finished < FileSession::kShard) {
    co_return total;
  }

  while (!prune_->sessions.rlock()->empty()) {
    static constexpr size_t kBatch = 64;
    std::vector<FileSession> batch;
    batch.reserve(kBatch);
    auto wlock = prune_->sessions.wlock();
    auto iter = wlock->begin();
    while (iter != wlock->end() && batch.size() < kBatch) {
      XLOGF(INFO, "Need prune session {}", iter->second);
      batch.push_back(iter->second);
      iter = wlock->erase(iter);
    }
    wlock.unlock();
    auto txn = manager.kvEngine_->createReadWriteTransaction();
    auto prune = co_await kv::WithTransaction(kv::FDBRetryStrategy{})
                     .run(std::move(txn), [&](IReadWriteTransaction &txn) -> CoTryTask<Void> {
                       for (auto &session : batch) {
                         CO_RETURN_ON_ERROR(co_await session.remove(txn));
                       }
                       co_return Void{};
                     });
    if (prune.hasError()) {
      pruneFailed.addSample(batch.size());
      XLOGF(WARN, "Prune session failed, error {}", prune.error());
    }
  }

  co_return total;
}