in src/bthread/fd.cpp [158:404]
int stop_and_join() {
if (!started()) {
return 0;
}
// No matter what this function returns, _epfd will be set to -1
// (making started() false) to avoid latter stop_and_join() to
// enter again.
const int saved_epfd = _epfd;
_epfd = -1;
// epoll_wait cannot be woken up by closing _epfd. We wake up
// epoll_wait by inserting a fd continuously triggering EPOLLOUT.
// Visibility of _stop: constant EPOLLOUT forces epoll_wait to see
// _stop (to be true) finally.
_stop = true;
int closing_epoll_pipe[2];
if (pipe(closing_epoll_pipe)) {
PLOG(FATAL) << "Fail to create closing_epoll_pipe";
return -1;
}
#if defined(OS_LINUX)
epoll_event evt = { EPOLLOUT, { NULL } };
if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
closing_epoll_pipe[1], &evt) < 0) {
#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
0, 0, NULL);
if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
#endif
PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
<< saved_epfd;
return -1;
}
const int rc = bthread_join(_tid, NULL);
if (rc) {
LOG(FATAL) << "Fail to join EpollThread, " << berror(rc);
return -1;
}
close(closing_epoll_pipe[0]);
close(closing_epoll_pipe[1]);
close(saved_epfd);
return 0;
}
int fd_wait(int fd, unsigned events, const timespec* abstime) {
butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
if (NULL == p) {
errno = ENOMEM;
return -1;
}
EpollButex* butex = p->load(butil::memory_order_consume);
if (NULL == butex) {
// It is rare to wait on one file descriptor from multiple threads
// simultaneously. Creating singleton by optimistic locking here
// saves mutexes for each butex.
butex = butex_create_checked<EpollButex>();
butex->store(0, butil::memory_order_relaxed);
EpollButex* expected = NULL;
if (!p->compare_exchange_strong(expected, butex,
butil::memory_order_release,
butil::memory_order_consume)) {
butex_destroy(butex);
butex = expected;
}
}
while (butex == CLOSING_GUARD) { // bthread_close() is running.
if (sched_yield() < 0) {
return -1;
}
butex = p->load(butil::memory_order_consume);
}
// Save value of butex before adding to epoll because the butex may
// be changed before butex_wait. No memory fence because EPOLL_CTL_MOD
// and EPOLL_CTL_ADD shall have release fence.
const int expected_val = butex->load(butil::memory_order_relaxed);
#if defined(OS_LINUX)
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
epoll_event evt = { events | EPOLLONESHOT, { butex } };
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
errno != EEXIST) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
return -1;
}
}
# else
epoll_event evt;
evt.events = events;
evt.data.fd = fd;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
errno != EEXIST) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
return -1;
}
# endif
#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
0, 0, butex);
if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
return -1;
}
#endif
while (butex->load(butil::memory_order_relaxed) == expected_val) {
if (butex_wait(butex, expected_val, abstime) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return -1;
}
}
return 0;
}
int fd_close(int fd) {
if (fd < 0) {
// what close(-1) returns
errno = EBADF;
return -1;
}
butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd);
if (NULL == pbutex) {
// Did not call bthread_fd functions, close directly.
return close(fd);
}
EpollButex* butex = pbutex->exchange(
CLOSING_GUARD, butil::memory_order_relaxed);
if (butex == CLOSING_GUARD) {
// concurrent double close detected.
errno = EBADF;
return -1;
}
if (butex != NULL) {
butex->fetch_add(1, butil::memory_order_relaxed);
butex_wake_all(butex);
}
#if defined(OS_LINUX)
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
#endif
const int rc = close(fd);
pbutex->exchange(butex, butil::memory_order_relaxed);
return rc;
}
bool started() const {
return _epfd >= 0;
}
private:
static void* run_this(void* arg) {
return static_cast<EpollThread*>(arg)->run();
}
void* run() {
const int initial_epfd = _epfd;
const size_t MAX_EVENTS = 32;
#if defined(OS_LINUX)
epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
#elif defined(OS_MACOSX)
typedef struct kevent KEVENT;
struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
#endif
if (NULL == e) {
LOG(FATAL) << "Fail to new epoll_event";
return NULL;
}
#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
# endif
#endif
while (!_stop) {
const int epfd = _epfd;
#if defined(OS_LINUX)
const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
#elif defined(OS_MACOSX)
const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
#endif
if (_stop) {
break;
}
if (n < 0) {
if (errno == EINTR) {
#ifndef NDEBUG
break_nums.fetch_add(1, butil::memory_order_relaxed);
int* p = &errno;
const char* b = berror();
const char* b2 = berror(errno);
DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", "
<< errno << " " << p << " " << b << " " << b2;
#endif
continue;
}
PLOG(INFO) << "Fail to epoll epfd=" << epfd;
break;
}
#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
for (int i = 0; i < n; ++i) {
epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
}
# endif
#endif
for (int i = 0; i < n; ++i) {
#if defined(OS_LINUX)
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
# else
butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
EpollButex* butex = pbutex ?
pbutex->load(butil::memory_order_consume) : NULL;
# endif
#elif defined(OS_MACOSX)
EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
#endif
if (butex != NULL && butex != CLOSING_GUARD) {
butex->fetch_add(1, butil::memory_order_relaxed);
butex_wake_all(butex);
}
}
}
delete [] e;
DLOG(INFO) << "EpollThread=" << _tid << "(epfd="
<< initial_epfd << ") is about to stop";
return NULL;
}
int _epfd;
bool _stop;
bthread_t _tid;
butil::Mutex _start_mutex;
};