void TNonblockingIOThread::createNotificationPipe()

in lib/cpp/src/thrift/server/TNonblockingServer.cpp [1211:1524]


void TNonblockingIOThread::createNotificationPipe() {
  if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
    throw TException("can't create notification pipe");
  }
  if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
      || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
    throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
  }
  for (auto notificationPipeFD : notificationPipeFDs_) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
    int flags;
    if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
        || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
    if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
#endif
      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
      throw TException(
          "TNonblockingServer::createNotificationPipe() "
          "FD_CLOEXEC");
    }
  }
}

/**
 * Register the core libevent events onto the proper base.
 */
void TNonblockingIOThread::registerEvents() {
  threadId_ = Thread::get_current();

  assert(eventBase_ == nullptr);
  eventBase_ = getServer()->getUserEventBase();
  if (eventBase_ == nullptr) {
    eventBase_ = event_base_new();
    ownEventBase_ = true;
  }

  // Print some libevent stats
  if (number_ == 0) {
    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
                        event_get_version(),
                        event_base_get_method(eventBase_));
  }

  if (listenSocket_ != THRIFT_INVALID_SOCKET) {
    // Register the server event
    event_set(&serverEvent_,
              listenSocket_,
              EV_READ | EV_PERSIST,
              TNonblockingIOThread::listenHandler,
              server_);
    event_base_set(eventBase_, &serverEvent_);

    // Add the event and start up the server
    if (-1 == event_add(&serverEvent_, nullptr)) {
      throw TException(
          "TNonblockingServer::serve(): "
          "event_add() failed on server listen event");
    }
    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
  }

  createNotificationPipe();

  // Create an event to be notified when a task finishes
  event_set(&notificationEvent_,
            getNotificationRecvFD(),
            EV_READ | EV_PERSIST,
            TNonblockingIOThread::notifyHandler,
            this);

  // Attach to the base
  event_base_set(eventBase_, &notificationEvent_);

  // Add the event and start up the server
  if (-1 == event_add(&notificationEvent_, nullptr)) {
    throw TException(
        "TNonblockingServer::serve(): "
        "event_add() failed on task-done notification event");
  }
  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}

bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
  auto fd = getNotificationSendFD();
  if (fd < 0) {
    return false;
  }

  int ret = -1;
  long kSize = sizeof(conn);
  const char * pos = (const char *)const_cast_sockopt(&conn);

#if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
  struct pollfd pfd = {fd, POLLOUT, 0};

  while (kSize > 0) {
    pfd.revents = 0;
    ret = poll(&pfd, 1, -1);
    if (ret < 0) {
      return false;
    } else if (ret == 0) {
      continue;
    }

    if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
      ::THRIFT_CLOSESOCKET(fd);
      return false;
    }

    if (pfd.revents & POLLOUT) {
      ret = send(fd, pos, kSize, 0);
      if (ret < 0) {
        if (errno == EAGAIN) {
          continue;
        }

        ::THRIFT_CLOSESOCKET(fd);
        return false;
      }

      kSize -= ret;
      pos += ret;
    }
  }
#else
  fd_set wfds, efds;

  while (kSize > 0) {
    FD_ZERO(&wfds);
    FD_ZERO(&efds);
    FD_SET(fd, &wfds);
    FD_SET(fd, &efds);
    ret = select(static_cast<int>(fd + 1), nullptr, &wfds, &efds, nullptr);
    if (ret < 0) {
      return false;
    } else if (ret == 0) {
      continue;
    }

    if (FD_ISSET(fd, &efds)) {
      ::THRIFT_CLOSESOCKET(fd);
      return false;
    }

    if (FD_ISSET(fd, &wfds)) {
      ret = send(fd, pos, kSize, 0);
      if (ret < 0) {
        if (errno == EAGAIN) {
          continue;
        }

        ::THRIFT_CLOSESOCKET(fd);
        return false;
      }

      kSize -= ret;
      pos += ret;
    }
  }
#endif

  return true;
}

/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
  auto* ioThread = (TNonblockingIOThread*)v;
  assert(ioThread);
  (void)which;

  while (true) {
    TNonblockingServer::TConnection* connection = nullptr;
    const int kSize = sizeof(connection);
    long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
    if (nBytes == kSize) {
      if (connection == nullptr) {
        // this is the command to stop our thread, exit the handler!
        ioThread->breakLoop(false);
        return;
      }
      connection->transition();
    } else if (nBytes > 0) {
      // throw away these bytes and hope that next time we get a solid read
      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
      ioThread->breakLoop(true);
      return;
    } else if (nBytes == 0) {
      GlobalOutput.printf("notifyHandler: Notify socket closed!");
      ioThread->breakLoop(false);
      // exit the loop
      break;
    } else { // nBytes < 0
      if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
          && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
        GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
        ioThread->breakLoop(true);
        return;
      }
      // exit the loop
      break;
    }
  }
}

void TNonblockingIOThread::breakLoop(bool error) {
  if (error) {
    GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
    // TODO: figure out something better to do here, but for now kill the
    // whole process.
    GlobalOutput.printf("TNonblockingServer: aborting process.");
    ::abort();
  }

  // If we're running in the same thread, we can't use the notify(0)
  // mechanism to stop the thread, but happily if we're running in the
  // same thread, this means the thread can't be blocking in the event
  // loop either.
  if (!Thread::is_current(threadId_)) {
    notify(nullptr);
  } else {
    // cause the loop to stop ASAP - even if it has things to do in it
    event_base_loopbreak(eventBase_);
  }
}

void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
  // Start out with a standard, low-priority setup for the sched params.
  struct sched_param sp;
  memset(static_cast<void*>(&sp), 0, sizeof(sp));
  int policy = SCHED_OTHER;

  // If desired, set up high-priority sched params structure.
  if (value) {
    // FIFO scheduler, ranked above default SCHED_OTHER queue
    policy = SCHED_FIFO;
    // The priority only compares us to other SCHED_FIFO threads, so we
    // just pick a random priority halfway between min & max.
    const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;

    sp.sched_priority = priority;
  }

  // Actually set the sched params for the current thread.
  if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
    GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
  } else {
    GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
  }
#else
  THRIFT_UNUSED_VARIABLE(value);
#endif
}

void TNonblockingIOThread::run() {
  if (eventBase_ == nullptr) {
    registerEvents();
  }
  if (useHighPriority_) {
    setCurrentThreadHighPriority(true);
  }

  if (eventBase_ != nullptr)
  {
    GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
    // Run libevent engine, never returns, invokes calls to eventHandler
    event_base_loop(eventBase_, 0);

    if (useHighPriority_) {
      setCurrentThreadHighPriority(false);
    }

    // cleans up our registered events
    cleanupEvents();
  }

  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}

void TNonblockingIOThread::cleanupEvents() {
  // stop the listen socket, if any
  if (listenSocket_ != THRIFT_INVALID_SOCKET) {
    if (event_del(&serverEvent_) == -1) {
      GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
    }
  }

  event_del(&notificationEvent_);
}

void TNonblockingIOThread::stop() {
  // This should cause the thread to fall out of its event loop ASAP.
  breakLoop(false);
}

void TNonblockingIOThread::join() {
  // If this was a thread created by a factory (not the thread that called
  // serve()), we join() it to make sure we shut down fully.
  if (thread_) {
    try {
      // Note that it is safe to both join() ourselves twice, as well as join
      // the current thread as the pthread implementation checks for deadlock.
      thread_->join();
    } catch (...) {
      // swallow everything
    }
  }
}
}