in src/rpc.cc [2846:2950]
void onResponse(
PeerImpl& peer, RpcConnectionImpl<API>& conn, uint32_t rid, uint32_t fid, const std::byte* ptr, size_t len,
BufferHandle buffer, Deferrer& defer) noexcept {
log("onResponse peer %s rid %#x fid %#x\n", peer.name, rid, fid);
rid |= 1;
switch (fid) {
case Rpc::reqClose: {
log("got reqClose from %s\n", peer.name);
auto& x = peer.connections_.at(index<API>);
std::lock_guard l(x.mutex);
for (size_t i = 0; i != x.conns.size(); ++i) {
if (&*x.conns[i] == &conn) {
peer.throwAway(x, i);
break;
}
}
break;
}
case Rpc::reqPoke: {
handlePoke(rpc.outgoing_, peer, conn, rid, defer);
break;
}
case Rpc::reqAck: {
handleAck<false>(rpc.outgoing_, peer, rid, defer);
break;
}
case Rpc::reqNack: {
handleNack(rpc.outgoing_, peer, rid, defer);
break;
}
case Rpc::reqPeerFound: {
std::unordered_map<std::string_view, std::vector<ConnectionTypeInfo>> info;
deserializeBuffer(ptr, len, info);
for (auto& [name, vec] : info) {
log("Received some connection info about peer %s\n", name);
PeerImpl& peer = rpc.getPeer(name);
std::lock_guard l(peer.idMutex_);
for (auto& n : vec) {
for (size_t i = 0; i != peer.connections_.size(); ++i) {
if (connectionTypeName[i] == n.name) {
auto& x = peer.connections_[i];
std::lock_guard l(x.mutex);
x.valid = true;
for (auto& v2 : n.addr) {
if (std::find(x.remoteAddresses.begin(), x.remoteAddresses.end(), v2) == x.remoteAddresses.end()) {
log("Adding address %s\n", v2);
x.remoteAddresses.push_back(rpc.persistentString(v2));
if (x.remoteAddresses.size() > 48) {
x.remoteAddresses.erase(
x.remoteAddresses.begin(), x.remoteAddresses.begin() + (x.remoteAddresses.size() - 24));
}
}
}
if (x.remoteAddresses.size() > 24) {
x.remoteAddresses.erase(
x.remoteAddresses.begin(), x.remoteAddresses.begin() + (x.remoteAddresses.size() - 24));
}
}
}
}
}
break;
}
case Rpc::reqFunctionNotFound:
case Rpc::reqError:
case Rpc::reqSuccess: {
bool recvOk = handleRecv<false>(rpc.outgoing_, peer, conn, rid, defer);
if (recvOk) {
Rpc::ResponseCallback response;
uint32_t ofid;
{
auto& oBucket = rpc.getBucket(rpc.outgoing_, rid);
std::lock_guard l(oBucket.mutex);
auto i = oBucket.map.find(rid);
if (i != oBucket.map.end() && i->second.peer == &peer) {
// log("got response for rid %#x from %s\n", rid, peer.name);
response = std::move(i->second.response);
ofid = i->second.fid;
rpc.cleanup(i->second, defer);
oBucket.map.erase(i);
} else {
// log("got response for unknown rid %#x from %s\n", rid, peer.name);
}
}
if (response) {
if (fid == Rpc::reqFunctionNotFound) {
Error err("Remote function not found");
std::move(response)(std::move(buffer), &err);
} else if (fid == Rpc::reqError) {
uint32_t xrid, xfid;
std::string_view str;
deserializeBuffer(std::move(buffer), xrid, xfid, str);
Error err{fmt::sprintf("Remote exception during RPC call (%s): %s", peer.functionName(ofid), str)};
std::move(response)(nullptr, &err);
} else if (fid == Rpc::reqSuccess) {
std::move(response)(std::move(buffer), nullptr);
}
}
}
break;
}
default:
log("onResponse: unknown fid %#x\n", fid);
}
}