void onRequest()

in src/rpc.cc [2670:2844]


  void onRequest(
      PeerImpl& peer, RpcConnectionImpl<API>& conn, uint32_t rid, uint32_t fid, const std::byte* ptr, size_t len,
      BufferHandle buffer, Deferrer& defer) {
    if (rpc.terminate_.load(std::memory_order_relaxed)) {
      return;
    }
    log("onRequest peer %s rid %#x (%#x) fid %#x\n", peer.name, rid, rid & ~(uint32_t)1, fid);
    rid &= ~(uint32_t)1;
    switch (fid) {
    case Rpc::reqAck: {
      // Peer acknowledged that it has received the response
      // (return value of an RPC call)
      handleAck<true>(rpc.incoming_, peer, rid, defer);
      break;
    }
    case Rpc::reqPoke: {
      // Peer is poking us to check the status of an RPC call
      // log("got poke for %#x\n", rid);
      handlePoke(rpc.incoming_, peer, conn, rid, defer);
      break;
    }
    case Rpc::reqNack: {
      // Peer nacked a poke from us; this means we may need to
      // resend a rpc response
      handleNack(rpc.incoming_, peer, rid, defer);
      break;
    }
    case Rpc::reqLookingForPeer: {
      // Peer is looking for some other peer(s)
      std::vector<std::string_view> names;
      deserializeBuffer(ptr, len, names);
      std::vector<PeerImpl*> foundPeers;
      std::unordered_map<std::string_view, std::vector<ConnectionTypeInfo>> info;
      {
        std::lock_guard l(rpc.peersMutex_);
        for (auto name : names) {
          auto i = rpc.peers_.find(name);
          if (i != rpc.peers_.end() && i->second.hasId.load(std::memory_order_relaxed)) {
            log("Peer '%s' is looking for '%s', and we know them!\n", peer.name, name);
            foundPeers.push_back(&i->second);
          } else {
            log("Peer '%s' is looking for '%s', but we don't know them :/\n", peer.name, name);
          }
        }
      }
      if (!foundPeers.empty()) {
        for (auto* ptr : foundPeers) {
          PeerImpl& p = *ptr;
          std::lock_guard l(p.idMutex_);
          if (!p.connections_.empty()) {
            std::vector<ConnectionTypeInfo> vec;
            for (auto& x : p.connections_) {
              if (!x.remoteAddresses.empty()) {
                vec.emplace_back();
                vec.back().name = connectionTypeName.at(&x - p.connections_.data());
                vec.back().addr = x.remoteAddresses;
              }
            }
            if (!vec.empty()) {
              info[p.name] = std::move(vec);
            }
          }
        }
        if (!info.empty()) {
          BufferHandle buffer;
          serializeToBuffer(buffer, rid, Rpc::reqPeerFound, info);
          log("send reqPeerFound! rid %#x\n", rid);
          conn.send(std::move(buffer), defer);
        } else {
          log("no info :(\n");
        }
      }
      break;
    }
    default:
      if (fid < (uint32_t)Rpc::reqCallOffset) {
        return;
      }
      // RPC call
      Rpc::FBase* f = nullptr;
      {
        std::lock_guard l(rpc.mutex_);
        auto i = rpc.funcs_.find(fid);
        if (i != rpc.funcs_.end()) {
          f = &*i->second;
        }
      }
      auto getFuncName = [&]() {
        std::lock_guard l(rpc.mutex_);
        for (auto& v : rpc.funcIds_) {
          if (v.second == fid) {
            return v.first;
          }
        }
        return (std::string_view) "NOT-FOUND";
      };
      if (!f) {
        BufferHandle buffer;
        serializeToBuffer(buffer, rid, Rpc::reqFunctionNotFound);
        conn.send(std::move(buffer), defer);
      } else {
        bool recvOk = handleRecv<true>(rpc.incoming_, peer, conn, rid, defer);
        if (recvOk) {
          log("got request rid %#x (%s) from %s\n", rid, getFuncName(), peer.name);
          f->call(std::move(buffer), [weak = weakLock, peer = &peer, rid](BufferHandle outbuffer) {
            auto me = weak->template lock<RpcImpl>();
            if (!me) {
              return;
            }
            Deferrer defer;
            auto& rpc = me->rpc;
            {
              auto* ptr = dataptr<std::byte>(&*outbuffer);
              std::memcpy(ptr, &rid, sizeof(rid));
              ptr += sizeof(rid);
              uint32_t outFid;
              std::memcpy(&outFid, ptr, sizeof(outFid));
              ptr += sizeof(outFid);

              SharedBufferHandle shared(outbuffer.release());
              me->log("sending response for rid %#x of %d bytes to %s\n", rid, shared->size, peer->name);

              auto now = std::chrono::steady_clock::now();
              Rpc::Impl::IncomingBucket& bucket = rpc.getBucket(rpc.incoming_, rid);
              std::lock_guard l2(rpc.incomingFifoMutex_);
              std::unique_lock l(bucket.mutex);
              size_t totalResponseSize;
              auto i = bucket.map.find(rid);
              if (i != bucket.map.end()) {
                auto& x = bucket.map[rid];
                x.responseTimestamp = now;
                totalResponseSize = rpc.totalResponseSize_ += shared->size;
                x.timeout = now + std::chrono::milliseconds(250);
                x.resend.buffer = std::move(shared);
                // log("x is %p, resend.buffer is %p\n", (void*)&x, (void*)&*x.resend.buffer);
                rpc.resend(*peer, x.resend, defer);
                listInsert(rpc.incomingFifo_.prev, &x);

                rpc.updateTimeout(now + std::chrono::seconds(1));
              } else {
                totalResponseSize = rpc.totalResponseSize_;
              }
              l.unlock();

              // Erase outgoing data if it has not been acknowledged within a
              // certain time period. This prevents us from using resources
              // for peers that are permanently gone.
              auto timeout = std::chrono::seconds(300);
              if (now - rpc.incomingFifo_.next->responseTimestamp >= std::chrono::seconds(5)) {
                if (totalResponseSize < 1024 * 1024 && rpc.incoming_.size() < 1024) {
                  timeout = std::chrono::seconds(1800);
                } else if (totalResponseSize >= 1024 * 1024 * 1024 || rpc.incoming_.size() >= 1024 * 1024) {
                  timeout = std::chrono::seconds(60);
                }
                while (rpc.incomingFifo_.next != &rpc.incomingFifo_ &&
                       now - rpc.incomingFifo_.next->responseTimestamp >= timeout) {
                  Rpc::Impl::Incoming* i = rpc.incomingFifo_.next;
                  listErase(i);
                  auto& iBucket = rpc.getBucket(rpc.incoming_, i->rid);
                  std::lock_guard l3(iBucket.mutex);
                  if (i->resend.buffer) {
                    rpc.totalResponseSize_ -= i->resend.buffer->size;
                  }
                  i->peer->addRecentIncoming(i->rid, now + std::chrono::minutes(1));
                  me->rpc.cleanup(*i, defer);
                  iBucket.map.erase(i->rid);
                  me->log("permanent timeout of response for peer %s rid %#x!?\n", i->peer->name, i->rid);
                }
              }
            }
          });
        }
      }
    }
  }