in src/rpc.cc [2523:2601]
bool handleRecv(T& container, PeerImpl& peer, RpcConnectionImpl<API>& conn, uint32_t rid, Deferrer& defer) {
rpc.log("handleRecv peer %s rid %#x\n", peer.name, rid);
auto find = [&](auto& bucket, auto& l) {
auto check = [&](auto i) -> decltype(&i->second) {
if (i == bucket.map.end()) {
// ack if this is a response to an unknown request.
// Otherwise, the remote peer will just keep sending it.
l.unlock();
BufferHandle buffer;
serializeToBuffer(buffer, rid, Rpc::reqAck);
conn.send(std::move(buffer), defer);
return nullptr;
}
auto& x = i->second;
if (x.rid != rid) {
fatal("handleRecv internal error: rid %#x is not set!\n", rid);
}
if (x.peer != &peer) {
log("peer %p vs %p\n", (void*)x.peer, (void*)&peer);
log("rid collision on recv! (not fatal!)\n");
return nullptr;
}
if (x.recv.done) {
l.unlock();
log("recv for rid %#x already done\n", rid);
BufferHandle buffer;
serializeToBuffer(buffer, rid, Rpc::reqAck);
conn.send(std::move(buffer), defer);
return nullptr;
}
return &x;
};
if constexpr (allowNew) {
auto i = bucket.map.find(rid);
if (i == bucket.map.end()) {
std::lock_guard ril(peer.recentIncomingMutex);
if (!peer.recentIncomingList.empty()) {
peer.clearRecentIncomingTimeouts();
if (!peer.recentIncomingList.empty()) {
if (peer.recentIncomingMap.find(rid) != peer.recentIncomingMap.end()) {
rpc.log("rid %#x recently handled; ignoring\n", rid);
return (Rpc::Impl::Incoming*)nullptr;
}
}
}
auto i = bucket.map.try_emplace(rid);
auto& x = i.first->second;
if (i.second) {
rpc.log("NEW rid %#x\n", rid);
x.peer = &peer;
x.rid = rid;
} else {
fatal("bucket map not found but created!?");
}
return check(i.first);
} else {
return check(i);
}
} else {
return check(bucket.map.find(rid));
}
};
// log("recv part 0 of rid %#x\n", rid);
auto& bucket = rpc.getBucket(container, rid);
std::unique_lock l(bucket.mutex);
auto xptr = find(bucket, l);
if (xptr) {
// log("recv %d tensors\n", nTensors);
auto& x = *xptr;
x.recv.done = true;
l.unlock();
BufferHandle buffer;
serializeToBuffer(buffer, rid, Rpc::reqAck);
conn.send(std::move(buffer), defer);
return true;
}
return false;
}