void timeoutThreadEntry()

in src/rpc.cc [1672:1781]


  void timeoutThreadEntry() {
    async::setCurrentThreadName("timeout");
    while (!terminate_.load(std::memory_order_relaxed)) {
      Deferrer defer;
      auto now = std::chrono::steady_clock::now();
      if (lastRanMisc.load() + std::chrono::milliseconds(250) <= now) {
        lastRanMisc.store(now);
        collectFloatingConnections(now);
        collectGarbage();
        findPeersImpl(defer);
        defer.execute();
        now = std::chrono::steady_clock::now();
      }

      auto timeout = timeout_.load(std::memory_order_relaxed);
      timeout = std::min(timeout, now + std::chrono::seconds(1));
      while (now < timeout && !terminate_.load(std::memory_order_relaxed)) {
        timeoutSem_.wait_for(timeout - now);
        now = std::chrono::steady_clock::now();
        continue;
      }
      auto newTimeout = now + std::chrono::seconds(5);
      timeout_.store(newTimeout);
      auto process = [&](auto& container) {
        auto absTimeoutDuration = std::chrono::milliseconds(timeoutMilliseconds_.load(std::memory_order_relaxed));
        for (auto& b : container) {
          std::unique_lock l(b.mutex);
          bool anyToRemove = false;
          for (auto& v : b.map) {
            if (now - v.second.creationTimestamp >= absTimeoutDuration) {
              anyToRemove = true;
            }
            if (v.second.resend.buffer) {
              if (now >= v.second.timeout) {
                processTimeout(now, v.second, defer);
              }
              newTimeout = std::min(newTimeout, v.second.timeout);
            }

            //            constexpr bool isIncoming = std::is_same_v<std::decay_t<decltype(v.second)>, Incoming>;
            //            float t = std::chrono::duration_cast<std::chrono::duration<float, std::ratio<1, 1>>>(now -
            //            v.second.creationTimestamp).count(); if constexpr (isIncoming) {
            //              log("Response %#x age: %g\n", v.second.rid, t);
            //            } else {
            //              log("Request %#x age: %g\n", v.second.rid, t);
            //            }
          }
          if (anyToRemove) {
            std::unique_lock l2(incomingFifoMutex_, std::defer_lock);
            constexpr bool isIncoming = std::is_same_v<std::decay_t<decltype(b.map.begin()->second)>, Incoming>;
            if (isIncoming) {
              if (!l2.try_lock()) {
                l.unlock();
                l2.lock();
                l.lock();
              }
            }
            for (auto i = b.map.begin(); i != b.map.end();) {
              auto& v = i->second;
              if (now - v.creationTimestamp >= absTimeoutDuration) {
                if constexpr (isIncoming) {
                  if (v.resend.buffer) {
                    listErase(&v);
                  }
                  log("Response %#x timed out for real\n", v.rid);

                  v.peer->addRecentIncoming(v.rid, now + std::chrono::minutes(1));
                } else {
                  log("Request %#x timed out for real\n", v.rid);
                  Error err(fmt::sprintf("Call (%s::%s) timed out", v.peer->name, v.peer->functionName(v.fid)));
                  std::move(v.response)(nullptr, &err);
                }
                if (v.resend.connection) {
                  log("sent over %s\n", connectionTypeName.at(v.resend.connectionIndex));
                } else {
                  log("null connection\n");
                }
                cleanup(v, defer);
                i = b.map.erase(i);
              } else {
                ++i;
              }
            }
          }
        }
      };
      process(outgoing_);
      process(incoming_);
      defer.execute();
      timeout = timeout_.load(std::memory_order_relaxed);
      while (newTimeout < timeout && !timeout_.compare_exchange_weak(timeout, newTimeout))
        ;
      // log("new timeout is in %d\n", std::chrono::duration_cast<std::chrono::milliseconds>(newTimeout - now).count());

      //      if (now - lastPrint >= std::chrono::seconds(30)) {
      //        lastPrint = now;
      //        std::lock_guard l(peersMutex_);
      //        for (auto& v : peers_) {
      //          auto& p = v.second;
      //          std::lock_guard l(p.idMutex_);
      //          log("Peer %s (%s)\n", std::string(p.name).c_str(), p.id.toString().c_str());
      //          for (size_t i = 0; i != p.connections_.size(); ++i) {
      //            auto& x = p.connections_[i];
      //            log(" %s x%d  latency %g bandit %g\n", connectionTypeName[i],
      //            x.sendCount.load(std::memory_order_relaxed), x.runningLatency.load(), x.readBanditValue.load());
      //          }
      //        }
      //      }
    }
  }