in quic/server/QuicServer.cpp [216:307]
void QuicServer::bindWorkersToSocket(
const folly::SocketAddress& address,
const std::vector<folly::EventBase*>& evbs) {
auto numWorkers = evbs.size();
CHECK(!initialized_);
boundAddress_ = address;
auto usingCCP = isUsingCCP();
if (!usingCCP) {
LOG(INFO) << "NOT using CCP";
}
auto ccpInitFailed = false;
for (size_t i = 0; i < numWorkers; ++i) {
auto workerEvb = evbs[i];
workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait(
[self = this->shared_from_this(),
workerEvb,
numWorkers,
usingCCP,
&ccpInitFailed,
processId = processId_,
idx = i] {
std::lock_guard<std::mutex> guard(self->startMutex_);
if (self->shutdown_) {
return;
}
auto workerSocket = self->listenerSocketFactory_->make(workerEvb, -1);
auto it = self->evbToWorkers_.find(workerEvb);
CHECK(it != self->evbToWorkers_.end());
auto worker = it->second;
int takeoverOverFd = -1;
if (self->listeningFDs_.size() > idx) {
takeoverOverFd = self->listeningFDs_[idx];
}
worker->setSocketOptions(&self->socketOptions_);
// dup the takenover socket on only one worker and bind the rest
if (takeoverOverFd >= 0) {
workerSocket->setFD(
folly::NetworkSocket::fromFd(::dup(takeoverOverFd)),
// set ownership to OWNS to allow ::close()'ing of of the fd
// when this server goes away
folly::AsyncUDPSocket::FDOwnership::OWNS);
worker->setSocket(std::move(workerSocket));
if (idx == 0) {
self->boundAddress_ = worker->getAddress();
}
VLOG(4) << "Set up dup()'ed fd for address=" << self->boundAddress_
<< " on workerId=" << (int)worker->getWorkerId();
worker->applyAllSocketOptions();
} else {
VLOG(4) << "No valid takenover fd found for address="
<< self->boundAddress_ << ". binding on worker=" << worker
<< " workerId=" << (int)worker->getWorkerId()
<< " processId=" << (int)processId;
worker->setSocket(std::move(workerSocket));
worker->bind(self->boundAddress_, self->bindOptions_);
if (idx == 0) {
self->boundAddress_ = worker->getAddress();
}
}
if (usingCCP) {
auto serverId = self->boundAddress_.getIPAddress().hash() |
self->boundAddress_.getPort();
try {
worker->getCcpReader()->try_initialize(
worker->getEventBase(),
self->ccpId_,
serverId,
worker->getWorkerId());
worker->getCcpReader()->connect();
} catch (const folly::AsyncSocketException& ex) {
// probably means the unix socket failed to bind
LOG(ERROR) << "exception while initializing ccp: " << ex.what()
<< "\nshutting down...";
// TODO also update counters
ccpInitFailed = true;
} catch (const std::exception& ex) {
LOG(ERROR) << "exception initializing ccp: " << ex.what()
<< "\nshutting down...";
ccpInitFailed = true;
}
}
if (idx == (numWorkers - 1)) {
VLOG(4) << "Initialized all workers in the eventbase";
self->initialized_ = true;
self->startCv_.notify_all();
}
});
}
if (usingCCP && ccpInitFailed) {
shutdown();
}
}