in src/rpc.cc [1672:1781]
void timeoutThreadEntry() {
async::setCurrentThreadName("timeout");
while (!terminate_.load(std::memory_order_relaxed)) {
Deferrer defer;
auto now = std::chrono::steady_clock::now();
if (lastRanMisc.load() + std::chrono::milliseconds(250) <= now) {
lastRanMisc.store(now);
collectFloatingConnections(now);
collectGarbage();
findPeersImpl(defer);
defer.execute();
now = std::chrono::steady_clock::now();
}
auto timeout = timeout_.load(std::memory_order_relaxed);
timeout = std::min(timeout, now + std::chrono::seconds(1));
while (now < timeout && !terminate_.load(std::memory_order_relaxed)) {
timeoutSem_.wait_for(timeout - now);
now = std::chrono::steady_clock::now();
continue;
}
auto newTimeout = now + std::chrono::seconds(5);
timeout_.store(newTimeout);
auto process = [&](auto& container) {
auto absTimeoutDuration = std::chrono::milliseconds(timeoutMilliseconds_.load(std::memory_order_relaxed));
for (auto& b : container) {
std::unique_lock l(b.mutex);
bool anyToRemove = false;
for (auto& v : b.map) {
if (now - v.second.creationTimestamp >= absTimeoutDuration) {
anyToRemove = true;
}
if (v.second.resend.buffer) {
if (now >= v.second.timeout) {
processTimeout(now, v.second, defer);
}
newTimeout = std::min(newTimeout, v.second.timeout);
}
// constexpr bool isIncoming = std::is_same_v<std::decay_t<decltype(v.second)>, Incoming>;
// float t = std::chrono::duration_cast<std::chrono::duration<float, std::ratio<1, 1>>>(now -
// v.second.creationTimestamp).count(); if constexpr (isIncoming) {
// log("Response %#x age: %g\n", v.second.rid, t);
// } else {
// log("Request %#x age: %g\n", v.second.rid, t);
// }
}
if (anyToRemove) {
std::unique_lock l2(incomingFifoMutex_, std::defer_lock);
constexpr bool isIncoming = std::is_same_v<std::decay_t<decltype(b.map.begin()->second)>, Incoming>;
if (isIncoming) {
if (!l2.try_lock()) {
l.unlock();
l2.lock();
l.lock();
}
}
for (auto i = b.map.begin(); i != b.map.end();) {
auto& v = i->second;
if (now - v.creationTimestamp >= absTimeoutDuration) {
if constexpr (isIncoming) {
if (v.resend.buffer) {
listErase(&v);
}
log("Response %#x timed out for real\n", v.rid);
v.peer->addRecentIncoming(v.rid, now + std::chrono::minutes(1));
} else {
log("Request %#x timed out for real\n", v.rid);
Error err(fmt::sprintf("Call (%s::%s) timed out", v.peer->name, v.peer->functionName(v.fid)));
std::move(v.response)(nullptr, &err);
}
if (v.resend.connection) {
log("sent over %s\n", connectionTypeName.at(v.resend.connectionIndex));
} else {
log("null connection\n");
}
cleanup(v, defer);
i = b.map.erase(i);
} else {
++i;
}
}
}
}
};
process(outgoing_);
process(incoming_);
defer.execute();
timeout = timeout_.load(std::memory_order_relaxed);
while (newTimeout < timeout && !timeout_.compare_exchange_weak(timeout, newTimeout))
;
// log("new timeout is in %d\n", std::chrono::duration_cast<std::chrono::milliseconds>(newTimeout - now).count());
// if (now - lastPrint >= std::chrono::seconds(30)) {
// lastPrint = now;
// std::lock_guard l(peersMutex_);
// for (auto& v : peers_) {
// auto& p = v.second;
// std::lock_guard l(p.idMutex_);
// log("Peer %s (%s)\n", std::string(p.name).c_str(), p.id.toString().c_str());
// for (size_t i = 0; i != p.connections_.size(); ++i) {
// auto& x = p.connections_[i];
// log(" %s x%d latency %g bandit %g\n", connectionTypeName[i],
// x.sendCount.load(std::memory_order_relaxed), x.runningLatency.load(), x.readBanditValue.load());
// }
// }
// }
}
}