in lib/cpp/src/thrift/server/TNonblockingServer.cpp [1211:1524]
void TNonblockingIOThread::createNotificationPipe() {
if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
throw TException("can't create notification pipe");
}
if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
|| evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
}
for (auto notificationPipeFD : notificationPipeFDs_) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
int flags;
if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
|| THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
#endif
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException(
"TNonblockingServer::createNotificationPipe() "
"FD_CLOEXEC");
}
}
}
/**
* Register the core libevent events onto the proper base.
*/
void TNonblockingIOThread::registerEvents() {
threadId_ = Thread::get_current();
assert(eventBase_ == nullptr);
eventBase_ = getServer()->getUserEventBase();
if (eventBase_ == nullptr) {
eventBase_ = event_base_new();
ownEventBase_ = true;
}
// Print some libevent stats
if (number_ == 0) {
GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
event_get_version(),
event_base_get_method(eventBase_));
}
if (listenSocket_ != THRIFT_INVALID_SOCKET) {
// Register the server event
event_set(&serverEvent_,
listenSocket_,
EV_READ | EV_PERSIST,
TNonblockingIOThread::listenHandler,
server_);
event_base_set(eventBase_, &serverEvent_);
// Add the event and start up the server
if (-1 == event_add(&serverEvent_, nullptr)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on server listen event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
}
createNotificationPipe();
// Create an event to be notified when a task finishes
event_set(¬ificationEvent_,
getNotificationRecvFD(),
EV_READ | EV_PERSIST,
TNonblockingIOThread::notifyHandler,
this);
// Attach to the base
event_base_set(eventBase_, ¬ificationEvent_);
// Add the event and start up the server
if (-1 == event_add(¬ificationEvent_, nullptr)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on task-done notification event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}
bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
auto fd = getNotificationSendFD();
if (fd < 0) {
return false;
}
int ret = -1;
long kSize = sizeof(conn);
const char * pos = (const char *)const_cast_sockopt(&conn);
#if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
struct pollfd pfd = {fd, POLLOUT, 0};
while (kSize > 0) {
pfd.revents = 0;
ret = poll(&pfd, 1, -1);
if (ret < 0) {
return false;
} else if (ret == 0) {
continue;
}
if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
::THRIFT_CLOSESOCKET(fd);
return false;
}
if (pfd.revents & POLLOUT) {
ret = send(fd, pos, kSize, 0);
if (ret < 0) {
if (errno == EAGAIN) {
continue;
}
::THRIFT_CLOSESOCKET(fd);
return false;
}
kSize -= ret;
pos += ret;
}
}
#else
fd_set wfds, efds;
while (kSize > 0) {
FD_ZERO(&wfds);
FD_ZERO(&efds);
FD_SET(fd, &wfds);
FD_SET(fd, &efds);
ret = select(static_cast<int>(fd + 1), nullptr, &wfds, &efds, nullptr);
if (ret < 0) {
return false;
} else if (ret == 0) {
continue;
}
if (FD_ISSET(fd, &efds)) {
::THRIFT_CLOSESOCKET(fd);
return false;
}
if (FD_ISSET(fd, &wfds)) {
ret = send(fd, pos, kSize, 0);
if (ret < 0) {
if (errno == EAGAIN) {
continue;
}
::THRIFT_CLOSESOCKET(fd);
return false;
}
kSize -= ret;
pos += ret;
}
}
#endif
return true;
}
/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
auto* ioThread = (TNonblockingIOThread*)v;
assert(ioThread);
(void)which;
while (true) {
TNonblockingServer::TConnection* connection = nullptr;
const int kSize = sizeof(connection);
long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
if (nBytes == kSize) {
if (connection == nullptr) {
// this is the command to stop our thread, exit the handler!
ioThread->breakLoop(false);
return;
}
connection->transition();
} else if (nBytes > 0) {
// throw away these bytes and hope that next time we get a solid read
GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
ioThread->breakLoop(true);
return;
} else if (nBytes == 0) {
GlobalOutput.printf("notifyHandler: Notify socket closed!");
ioThread->breakLoop(false);
// exit the loop
break;
} else { // nBytes < 0
if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
&& THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
ioThread->breakLoop(true);
return;
}
// exit the loop
break;
}
}
}
void TNonblockingIOThread::breakLoop(bool error) {
if (error) {
GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
// TODO: figure out something better to do here, but for now kill the
// whole process.
GlobalOutput.printf("TNonblockingServer: aborting process.");
::abort();
}
// If we're running in the same thread, we can't use the notify(0)
// mechanism to stop the thread, but happily if we're running in the
// same thread, this means the thread can't be blocking in the event
// loop either.
if (!Thread::is_current(threadId_)) {
notify(nullptr);
} else {
// cause the loop to stop ASAP - even if it has things to do in it
event_base_loopbreak(eventBase_);
}
}
void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
// Start out with a standard, low-priority setup for the sched params.
struct sched_param sp;
memset(static_cast<void*>(&sp), 0, sizeof(sp));
int policy = SCHED_OTHER;
// If desired, set up high-priority sched params structure.
if (value) {
// FIFO scheduler, ranked above default SCHED_OTHER queue
policy = SCHED_FIFO;
// The priority only compares us to other SCHED_FIFO threads, so we
// just pick a random priority halfway between min & max.
const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
sp.sched_priority = priority;
}
// Actually set the sched params for the current thread.
if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
} else {
GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
}
#else
THRIFT_UNUSED_VARIABLE(value);
#endif
}
void TNonblockingIOThread::run() {
if (eventBase_ == nullptr) {
registerEvents();
}
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
}
if (eventBase_ != nullptr)
{
GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
// Run libevent engine, never returns, invokes calls to eventHandler
event_base_loop(eventBase_, 0);
if (useHighPriority_) {
setCurrentThreadHighPriority(false);
}
// cleans up our registered events
cleanupEvents();
}
GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}
void TNonblockingIOThread::cleanupEvents() {
// stop the listen socket, if any
if (listenSocket_ != THRIFT_INVALID_SOCKET) {
if (event_del(&serverEvent_) == -1) {
GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
}
}
event_del(¬ificationEvent_);
}
void TNonblockingIOThread::stop() {
// This should cause the thread to fall out of its event loop ASAP.
breakLoop(false);
}
void TNonblockingIOThread::join() {
// If this was a thread created by a factory (not the thread that called
// serve()), we join() it to make sure we shut down fully.
if (thread_) {
try {
// Note that it is safe to both join() ourselves twice, as well as join
// the current thread as the pthread implementation checks for deadlock.
thread_->join();
} catch (...) {
// swallow everything
}
}
}
}