in src/rpc.cc [2670:2844]
void onRequest(
PeerImpl& peer, RpcConnectionImpl<API>& conn, uint32_t rid, uint32_t fid, const std::byte* ptr, size_t len,
BufferHandle buffer, Deferrer& defer) {
if (rpc.terminate_.load(std::memory_order_relaxed)) {
return;
}
log("onRequest peer %s rid %#x (%#x) fid %#x\n", peer.name, rid, rid & ~(uint32_t)1, fid);
rid &= ~(uint32_t)1;
switch (fid) {
case Rpc::reqAck: {
// Peer acknowledged that it has received the response
// (return value of an RPC call)
handleAck<true>(rpc.incoming_, peer, rid, defer);
break;
}
case Rpc::reqPoke: {
// Peer is poking us to check the status of an RPC call
// log("got poke for %#x\n", rid);
handlePoke(rpc.incoming_, peer, conn, rid, defer);
break;
}
case Rpc::reqNack: {
// Peer nacked a poke from us; this means we may need to
// resend a rpc response
handleNack(rpc.incoming_, peer, rid, defer);
break;
}
case Rpc::reqLookingForPeer: {
// Peer is looking for some other peer(s)
std::vector<std::string_view> names;
deserializeBuffer(ptr, len, names);
std::vector<PeerImpl*> foundPeers;
std::unordered_map<std::string_view, std::vector<ConnectionTypeInfo>> info;
{
std::lock_guard l(rpc.peersMutex_);
for (auto name : names) {
auto i = rpc.peers_.find(name);
if (i != rpc.peers_.end() && i->second.hasId.load(std::memory_order_relaxed)) {
log("Peer '%s' is looking for '%s', and we know them!\n", peer.name, name);
foundPeers.push_back(&i->second);
} else {
log("Peer '%s' is looking for '%s', but we don't know them :/\n", peer.name, name);
}
}
}
if (!foundPeers.empty()) {
for (auto* ptr : foundPeers) {
PeerImpl& p = *ptr;
std::lock_guard l(p.idMutex_);
if (!p.connections_.empty()) {
std::vector<ConnectionTypeInfo> vec;
for (auto& x : p.connections_) {
if (!x.remoteAddresses.empty()) {
vec.emplace_back();
vec.back().name = connectionTypeName.at(&x - p.connections_.data());
vec.back().addr = x.remoteAddresses;
}
}
if (!vec.empty()) {
info[p.name] = std::move(vec);
}
}
}
if (!info.empty()) {
BufferHandle buffer;
serializeToBuffer(buffer, rid, Rpc::reqPeerFound, info);
log("send reqPeerFound! rid %#x\n", rid);
conn.send(std::move(buffer), defer);
} else {
log("no info :(\n");
}
}
break;
}
default:
if (fid < (uint32_t)Rpc::reqCallOffset) {
return;
}
// RPC call
Rpc::FBase* f = nullptr;
{
std::lock_guard l(rpc.mutex_);
auto i = rpc.funcs_.find(fid);
if (i != rpc.funcs_.end()) {
f = &*i->second;
}
}
auto getFuncName = [&]() {
std::lock_guard l(rpc.mutex_);
for (auto& v : rpc.funcIds_) {
if (v.second == fid) {
return v.first;
}
}
return (std::string_view) "NOT-FOUND";
};
if (!f) {
BufferHandle buffer;
serializeToBuffer(buffer, rid, Rpc::reqFunctionNotFound);
conn.send(std::move(buffer), defer);
} else {
bool recvOk = handleRecv<true>(rpc.incoming_, peer, conn, rid, defer);
if (recvOk) {
log("got request rid %#x (%s) from %s\n", rid, getFuncName(), peer.name);
f->call(std::move(buffer), [weak = weakLock, peer = &peer, rid](BufferHandle outbuffer) {
auto me = weak->template lock<RpcImpl>();
if (!me) {
return;
}
Deferrer defer;
auto& rpc = me->rpc;
{
auto* ptr = dataptr<std::byte>(&*outbuffer);
std::memcpy(ptr, &rid, sizeof(rid));
ptr += sizeof(rid);
uint32_t outFid;
std::memcpy(&outFid, ptr, sizeof(outFid));
ptr += sizeof(outFid);
SharedBufferHandle shared(outbuffer.release());
me->log("sending response for rid %#x of %d bytes to %s\n", rid, shared->size, peer->name);
auto now = std::chrono::steady_clock::now();
Rpc::Impl::IncomingBucket& bucket = rpc.getBucket(rpc.incoming_, rid);
std::lock_guard l2(rpc.incomingFifoMutex_);
std::unique_lock l(bucket.mutex);
size_t totalResponseSize;
auto i = bucket.map.find(rid);
if (i != bucket.map.end()) {
auto& x = bucket.map[rid];
x.responseTimestamp = now;
totalResponseSize = rpc.totalResponseSize_ += shared->size;
x.timeout = now + std::chrono::milliseconds(250);
x.resend.buffer = std::move(shared);
// log("x is %p, resend.buffer is %p\n", (void*)&x, (void*)&*x.resend.buffer);
rpc.resend(*peer, x.resend, defer);
listInsert(rpc.incomingFifo_.prev, &x);
rpc.updateTimeout(now + std::chrono::seconds(1));
} else {
totalResponseSize = rpc.totalResponseSize_;
}
l.unlock();
// Erase outgoing data if it has not been acknowledged within a
// certain time period. This prevents us from using resources
// for peers that are permanently gone.
auto timeout = std::chrono::seconds(300);
if (now - rpc.incomingFifo_.next->responseTimestamp >= std::chrono::seconds(5)) {
if (totalResponseSize < 1024 * 1024 && rpc.incoming_.size() < 1024) {
timeout = std::chrono::seconds(1800);
} else if (totalResponseSize >= 1024 * 1024 * 1024 || rpc.incoming_.size() >= 1024 * 1024) {
timeout = std::chrono::seconds(60);
}
while (rpc.incomingFifo_.next != &rpc.incomingFifo_ &&
now - rpc.incomingFifo_.next->responseTimestamp >= timeout) {
Rpc::Impl::Incoming* i = rpc.incomingFifo_.next;
listErase(i);
auto& iBucket = rpc.getBucket(rpc.incoming_, i->rid);
std::lock_guard l3(iBucket.mutex);
if (i->resend.buffer) {
rpc.totalResponseSize_ -= i->resend.buffer->size;
}
i->peer->addRecentIncoming(i->rid, now + std::chrono::minutes(1));
me->rpc.cleanup(*i, defer);
iBucket.map.erase(i->rid);
me->log("permanent timeout of response for peer %s rid %#x!?\n", i->peer->name, i->rid);
}
}
}
});
}
}
}
}