void onResponse()

in src/rpc.cc [2846:2950]


  void onResponse(
      PeerImpl& peer, RpcConnectionImpl<API>& conn, uint32_t rid, uint32_t fid, const std::byte* ptr, size_t len,
      BufferHandle buffer, Deferrer& defer) noexcept {
    log("onResponse peer %s rid %#x fid %#x\n", peer.name, rid, fid);
    rid |= 1;
    switch (fid) {
    case Rpc::reqClose: {
      log("got reqClose from %s\n", peer.name);
      auto& x = peer.connections_.at(index<API>);
      std::lock_guard l(x.mutex);
      for (size_t i = 0; i != x.conns.size(); ++i) {
        if (&*x.conns[i] == &conn) {
          peer.throwAway(x, i);
          break;
        }
      }
      break;
    }
    case Rpc::reqPoke: {
      handlePoke(rpc.outgoing_, peer, conn, rid, defer);
      break;
    }
    case Rpc::reqAck: {
      handleAck<false>(rpc.outgoing_, peer, rid, defer);
      break;
    }
    case Rpc::reqNack: {
      handleNack(rpc.outgoing_, peer, rid, defer);
      break;
    }
    case Rpc::reqPeerFound: {
      std::unordered_map<std::string_view, std::vector<ConnectionTypeInfo>> info;
      deserializeBuffer(ptr, len, info);
      for (auto& [name, vec] : info) {
        log("Received some connection info about peer %s\n", name);
        PeerImpl& peer = rpc.getPeer(name);
        std::lock_guard l(peer.idMutex_);
        for (auto& n : vec) {
          for (size_t i = 0; i != peer.connections_.size(); ++i) {
            if (connectionTypeName[i] == n.name) {
              auto& x = peer.connections_[i];
              std::lock_guard l(x.mutex);
              x.valid = true;
              for (auto& v2 : n.addr) {
                if (std::find(x.remoteAddresses.begin(), x.remoteAddresses.end(), v2) == x.remoteAddresses.end()) {
                  log("Adding address %s\n", v2);
                  x.remoteAddresses.push_back(rpc.persistentString(v2));
                  if (x.remoteAddresses.size() > 48) {
                    x.remoteAddresses.erase(
                        x.remoteAddresses.begin(), x.remoteAddresses.begin() + (x.remoteAddresses.size() - 24));
                  }
                }
              }
              if (x.remoteAddresses.size() > 24) {
                x.remoteAddresses.erase(
                    x.remoteAddresses.begin(), x.remoteAddresses.begin() + (x.remoteAddresses.size() - 24));
              }
            }
          }
        }
      }
      break;
    }
    case Rpc::reqFunctionNotFound:
    case Rpc::reqError:
    case Rpc::reqSuccess: {
      bool recvOk = handleRecv<false>(rpc.outgoing_, peer, conn, rid, defer);
      if (recvOk) {
        Rpc::ResponseCallback response;
        uint32_t ofid;
        {
          auto& oBucket = rpc.getBucket(rpc.outgoing_, rid);
          std::lock_guard l(oBucket.mutex);
          auto i = oBucket.map.find(rid);
          if (i != oBucket.map.end() && i->second.peer == &peer) {
            // log("got response for rid %#x from %s\n", rid, peer.name);
            response = std::move(i->second.response);
            ofid = i->second.fid;
            rpc.cleanup(i->second, defer);
            oBucket.map.erase(i);
          } else {
            // log("got response for unknown rid %#x from %s\n", rid, peer.name);
          }
        }
        if (response) {
          if (fid == Rpc::reqFunctionNotFound) {
            Error err("Remote function not found");
            std::move(response)(std::move(buffer), &err);
          } else if (fid == Rpc::reqError) {
            uint32_t xrid, xfid;
            std::string_view str;
            deserializeBuffer(std::move(buffer), xrid, xfid, str);
            Error err{fmt::sprintf("Remote exception during RPC call (%s): %s", peer.functionName(ofid), str)};
            std::move(response)(nullptr, &err);
          } else if (fid == Rpc::reqSuccess) {
            std::move(response)(std::move(buffer), nullptr);
          }
        }
      }
      break;
    }
    default:
      log("onResponse: unknown fid %#x\n", fid);
    }
  }