in src/meta/components/SessionManager.cc [78:186]
CoTryTask<size_t> SessionManager::ScanTask::run(SessionManager &manager) {
XLOGF(DBG, "ScanTask-{} start", shard_);
std::map<std::string, uint64_t> map;
SCOPE_EXIT {
for (const auto &[host, cnt] : map) {
XLOGF(INFO, "SessionManager found {} sessions for dead clients {}", cnt, host);
}
};
// get all active clients
auto ts = UtcClock::now();
auto active = co_await getActiveClients(*manager.mgmtd_, false, manager.config_.session_timeout());
CO_RETURN_ON_ERROR(active);
size_t total = 0;
std::optional<FileSession> prev;
while (true) {
// scan sessions
auto txn = manager.kvEngine_->createReadonlyTransaction();
auto sessions = co_await kv::WithTransaction(kv::FDBRetryStrategy{})
.run(std::move(txn), [&](auto &txn) -> CoTryTask<std::vector<FileSession>> {
co_return co_await FileSession::scan(txn, shard_, prev);
});
CO_RETURN_ON_ERROR(sessions);
if (sessions->empty()) {
break;
}
// filter dead sessions
std::vector<FileSession> deadSessions;
for (auto &session : *sessions) {
if (prune_->sessions.rlock()->contains(session.sessionId)) {
// need prune this session
XLOGF(INFO, "Need prune session {}", session);
prune_->sessions.wlock()->erase(session.sessionId);
} else {
// check client is active or not
if (active->contains(session.clientId)) {
continue;
}
if (session.timestamp + 1_min > ts) {
// concurrent create session and scan
auto now = UtcClock::now();
XLOGF_IF(WARN, session.timestamp > now + 5_s, "Session timestamp {} > now {}", session.timestamp, now);
continue;
}
XLOGF(WARN, "SessionManager found dead session {}", session);
}
deadSessions.push_back(session);
}
prev = sessions->back();
// prune dead sessions
if (manager.config_.sync_on_prune_session()) {
for (auto &session : deadSessions) {
co_await manager.closeWorkers_->enqueue(std::make_unique<CloseTask>(session));
}
} else {
auto txn = manager.kvEngine_->createReadWriteTransaction();
auto result =
co_await kv::WithTransaction(kv::FDBRetryStrategy{}).run(std::move(txn), [&](auto &txn) -> CoTryTask<Void> {
for (auto &session : deadSessions) {
CO_RETURN_ON_ERROR(co_await session.remove(txn));
}
co_return Void{};
});
if (result.hasError()) {
pruneFailed.addSample(deadSessions.size());
XLOGF(ERR, "ScanTask-{} prune failed, error {}", shard_, result.error());
CO_RETURN_ERROR(result);
}
total += deadSessions.size();
pruned.addSample(deadSessions.size());
}
}
auto finished = prune_->finished.fetch_add(1) + 1;
if (finished < FileSession::kShard) {
co_return total;
}
while (!prune_->sessions.rlock()->empty()) {
static constexpr size_t kBatch = 64;
std::vector<FileSession> batch;
batch.reserve(kBatch);
auto wlock = prune_->sessions.wlock();
auto iter = wlock->begin();
while (iter != wlock->end() && batch.size() < kBatch) {
XLOGF(INFO, "Need prune session {}", iter->second);
batch.push_back(iter->second);
iter = wlock->erase(iter);
}
wlock.unlock();
auto txn = manager.kvEngine_->createReadWriteTransaction();
auto prune = co_await kv::WithTransaction(kv::FDBRetryStrategy{})
.run(std::move(txn), [&](IReadWriteTransaction &txn) -> CoTryTask<Void> {
for (auto &session : batch) {
CO_RETURN_ON_ERROR(co_await session.remove(txn));
}
co_return Void{};
});
if (prune.hasError()) {
pruneFailed.addSample(batch.size());
XLOGF(WARN, "Prune session failed, error {}", prune.error());
}
}
co_return total;
}