in src/rpc.cc [2188:2325]
void onGreeting(
RpcConnectionImpl<API>& conn, std::string_view peerName, PeerId peerId, std::vector<ConnectionTypeInfo>&& info) {
log("%s::%s::onGreeting!(\"%s\", %s)\n", std::string(myName).c_str(), connectionTypeName[index<API>],
std::string(peerName).c_str(), peerId.toString().c_str());
for (auto& v : info) {
log(" %s\n", std::string(v.name).c_str());
for (auto& v2 : v.addr) {
log(" @ %s\n", std::string(v2).c_str());
}
}
Deferrer defer;
{
std::unique_lock l(listenersMutex_);
if (terminate_.load(std::memory_order_relaxed)) {
return;
}
auto i = floatingConnectionsMap_.find(&conn);
if (i != floatingConnectionsMap_.end()) {
auto i2 = i->second;
floatingConnectionsMap_.erase(i);
auto cptr = std::move(*i2);
floatingConnections_.erase(i2);
l.unlock();
if (peerId == myId) {
std::lock_guard l(garbageMutex_);
for (auto& c : cptr->conns) {
garbageConnections_.push_back(std::move(c));
}
log("I connected to myself! oops!\n");
return;
}
if (peerName == myName) {
std::lock_guard l(garbageMutex_);
for (auto& c : cptr->conns) {
garbageConnections_.push_back(std::move(c));
}
log("Peer with same name as me! Refusing connection!\n");
return;
}
PeerImpl& peer = getPeer(peerName);
{
std::lock_guard l(peer.idMutex_);
peer.id = peerId;
peer.hasId = true;
peer.findThisPeerIncrementingTimeoutMilliseconds.store(250, std::memory_order_relaxed);
}
if (&conn != &*cptr->conns.back()) {
fatal("onGreeting internal error conns mismatch");
}
conn.peer = &peer;
std::unique_ptr<RpcConnectionImplBase> oldconn;
{
auto& x = peer.connections_[index<API>];
std::lock_guard l(x.mutex);
x.isExplicit = cptr->isExplicit;
x.outgoing = cptr->outgoing;
x.addr = std::move(cptr->addr);
for (auto& c : cptr->conns) {
x.conns.push_back(std::move(c));
}
x.hasConn = true;
x.valid = true;
}
if (oldconn) {
std::lock_guard l(garbageMutex_);
garbageConnections_.push_back(std::move(oldconn));
}
{
std::lock_guard l(peer.idMutex_);
for (auto& v : info) {
for (size_t i = 0; i != connectionTypeName.size(); ++i) {
if (v.name == connectionTypeName[i]) {
auto& x = peer.connections_.at(i);
auto trimAddresses = [&](int n) {
if (x.remoteAddresses.size() > n) {
x.remoteAddresses.erase(
x.remoteAddresses.begin(), x.remoteAddresses.begin() + (x.remoteAddresses.size() - 24));
}
};
std::lock_guard l(x.mutex);
x.valid = true;
std::string addr;
if (API::addressIsIp && addressIsIp((ConnectionType)i)) {
addr = conn.remoteAddr();
}
if (!addr.empty()) {
auto remote = decodeIpAddress(addr);
bool remoteIpv6 = remote.first.find(':') != std::string_view::npos;
for (auto& v2 : v.addr) {
auto v3 = decodeIpAddress(v2);
bool ipv6 = v3.first.find(':') != std::string_view::npos;
if (ipv6 != remoteIpv6) {
continue;
}
std::string newAddr = (ipv6 ? "[" + std::string(remote.first) + "]" : std::string(remote.first)) +
":" + std::to_string(v3.second);
if (std::find(x.remoteAddresses.begin(), x.remoteAddresses.end(), newAddr) ==
x.remoteAddresses.end()) {
x.remoteAddresses.push_back(persistentString(newAddr));
trimAddresses(48);
}
}
} else if (!addressIsIp((ConnectionType)i)) {
for (auto& v2 : v.addr) {
if (std::find(x.remoteAddresses.begin(), x.remoteAddresses.end(), v2) == x.remoteAddresses.end()) {
x.remoteAddresses.push_back(persistentString(v2));
trimAddresses(48);
}
}
}
for (auto& v : x.remoteAddresses) {
log(" -- %s -- has a remote address %s\n", std::string(peer.name).c_str(), std::string(v).c_str());
}
trimAddresses(24);
}
}
}
}
for (auto& b : outgoing_) {
std::lock_guard l(b.mutex);
for (auto& v : b.map) {
auto& o = v.second;
if (o.peer == &peer && !o.resend.connection) {
// log("poking on newly established connection\n");
BufferHandle buffer;
serializeToBuffer(buffer, o.rid, Rpc::reqPoke);
conn.send(std::move(buffer), defer);
}
}
}
}
}
}