bool handleRecv()

in src/rpc.cc [2523:2601]


  bool handleRecv(T& container, PeerImpl& peer, RpcConnectionImpl<API>& conn, uint32_t rid, Deferrer& defer) {
    rpc.log("handleRecv peer %s rid %#x\n", peer.name, rid);
    auto find = [&](auto& bucket, auto& l) {
      auto check = [&](auto i) -> decltype(&i->second) {
        if (i == bucket.map.end()) {
          // ack if this is a response to an unknown request.
          // Otherwise, the remote peer will just keep sending it.
          l.unlock();
          BufferHandle buffer;
          serializeToBuffer(buffer, rid, Rpc::reqAck);
          conn.send(std::move(buffer), defer);
          return nullptr;
        }
        auto& x = i->second;
        if (x.rid != rid) {
          fatal("handleRecv internal error: rid %#x is not set!\n", rid);
        }
        if (x.peer != &peer) {
          log("peer %p vs %p\n", (void*)x.peer, (void*)&peer);
          log("rid collision on recv! (not fatal!)\n");
          return nullptr;
        }
        if (x.recv.done) {
          l.unlock();
          log("recv for rid %#x already done\n", rid);
          BufferHandle buffer;
          serializeToBuffer(buffer, rid, Rpc::reqAck);
          conn.send(std::move(buffer), defer);
          return nullptr;
        }
        return &x;
      };
      if constexpr (allowNew) {
        auto i = bucket.map.find(rid);
        if (i == bucket.map.end()) {
          std::lock_guard ril(peer.recentIncomingMutex);
          if (!peer.recentIncomingList.empty()) {
            peer.clearRecentIncomingTimeouts();
            if (!peer.recentIncomingList.empty()) {
              if (peer.recentIncomingMap.find(rid) != peer.recentIncomingMap.end()) {
                rpc.log("rid %#x recently handled; ignoring\n", rid);
                return (Rpc::Impl::Incoming*)nullptr;
              }
            }
          }
          auto i = bucket.map.try_emplace(rid);
          auto& x = i.first->second;
          if (i.second) {
            rpc.log("NEW rid %#x\n", rid);
            x.peer = &peer;
            x.rid = rid;
          } else {
            fatal("bucket map not found but created!?");
          }
          return check(i.first);
        } else {
          return check(i);
        }
      } else {
        return check(bucket.map.find(rid));
      }
    };

    // log("recv part 0 of rid %#x\n", rid);
    auto& bucket = rpc.getBucket(container, rid);
    std::unique_lock l(bucket.mutex);
    auto xptr = find(bucket, l);
    if (xptr) {
      // log("recv %d tensors\n", nTensors);
      auto& x = *xptr;
      x.recv.done = true;
      l.unlock();
      BufferHandle buffer;
      serializeToBuffer(buffer, rid, Rpc::reqAck);
      conn.send(std::move(buffer), defer);
      return true;
    }
    return false;
  }