void QuicServer::bindWorkersToSocket()

in quic/server/QuicServer.cpp [216:307]


void QuicServer::bindWorkersToSocket(
    const folly::SocketAddress& address,
    const std::vector<folly::EventBase*>& evbs) {
  auto numWorkers = evbs.size();
  CHECK(!initialized_);
  boundAddress_ = address;
  auto usingCCP = isUsingCCP();
  if (!usingCCP) {
    LOG(INFO) << "NOT using CCP";
  }
  auto ccpInitFailed = false;
  for (size_t i = 0; i < numWorkers; ++i) {
    auto workerEvb = evbs[i];
    workerEvb->runImmediatelyOrRunInEventBaseThreadAndWait(
        [self = this->shared_from_this(),
         workerEvb,
         numWorkers,
         usingCCP,
         &ccpInitFailed,
         processId = processId_,
         idx = i] {
          std::lock_guard<std::mutex> guard(self->startMutex_);
          if (self->shutdown_) {
            return;
          }
          auto workerSocket = self->listenerSocketFactory_->make(workerEvb, -1);
          auto it = self->evbToWorkers_.find(workerEvb);
          CHECK(it != self->evbToWorkers_.end());
          auto worker = it->second;
          int takeoverOverFd = -1;
          if (self->listeningFDs_.size() > idx) {
            takeoverOverFd = self->listeningFDs_[idx];
          }
          worker->setSocketOptions(&self->socketOptions_);
          // dup the takenover socket on only one worker and bind the rest
          if (takeoverOverFd >= 0) {
            workerSocket->setFD(
                folly::NetworkSocket::fromFd(::dup(takeoverOverFd)),
                // set ownership to OWNS to allow ::close()'ing of of the fd
                // when this server goes away
                folly::AsyncUDPSocket::FDOwnership::OWNS);
            worker->setSocket(std::move(workerSocket));
            if (idx == 0) {
              self->boundAddress_ = worker->getAddress();
            }
            VLOG(4) << "Set up dup()'ed fd for address=" << self->boundAddress_
                    << " on workerId=" << (int)worker->getWorkerId();
            worker->applyAllSocketOptions();
          } else {
            VLOG(4) << "No valid takenover fd found for address="
                    << self->boundAddress_ << ". binding on worker=" << worker
                    << " workerId=" << (int)worker->getWorkerId()
                    << " processId=" << (int)processId;
            worker->setSocket(std::move(workerSocket));
            worker->bind(self->boundAddress_, self->bindOptions_);
            if (idx == 0) {
              self->boundAddress_ = worker->getAddress();
            }
          }
          if (usingCCP) {
            auto serverId = self->boundAddress_.getIPAddress().hash() |
                self->boundAddress_.getPort();
            try {
              worker->getCcpReader()->try_initialize(
                  worker->getEventBase(),
                  self->ccpId_,
                  serverId,
                  worker->getWorkerId());
              worker->getCcpReader()->connect();
            } catch (const folly::AsyncSocketException& ex) {
              // probably means the unix socket failed to bind
              LOG(ERROR) << "exception while initializing ccp: " << ex.what()
                         << "\nshutting down...";
              // TODO also update counters
              ccpInitFailed = true;
            } catch (const std::exception& ex) {
              LOG(ERROR) << "exception initializing ccp: " << ex.what()
                         << "\nshutting down...";
              ccpInitFailed = true;
            }
          }
          if (idx == (numWorkers - 1)) {
            VLOG(4) << "Initialized all workers in the eventbase";
            self->initialized_ = true;
            self->startCv_.notify_all();
          }
        });
  }
  if (usingCCP && ccpInitFailed) {
    shutdown();
  }
}