int stop_and_join()

in src/bthread/fd.cpp [158:404]


    int stop_and_join() {
        if (!started()) {
            return 0;
        }
        // No matter what this function returns, _epfd will be set to -1
        // (making started() false) to avoid latter stop_and_join() to
        // enter again.
        const int saved_epfd = _epfd;
        _epfd = -1;

        // epoll_wait cannot be woken up by closing _epfd. We wake up
        // epoll_wait by inserting a fd continuously triggering EPOLLOUT.
        // Visibility of _stop: constant EPOLLOUT forces epoll_wait to see
        // _stop (to be true) finally.
        _stop = true;
        int closing_epoll_pipe[2];
        if (pipe(closing_epoll_pipe)) {
            PLOG(FATAL) << "Fail to create closing_epoll_pipe";
            return -1;
        }
#if defined(OS_LINUX)
        epoll_event evt = { EPOLLOUT, { NULL } };
        if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
                      closing_epoll_pipe[1], &evt) < 0) {
#elif defined(OS_MACOSX)
        struct kevent kqueue_event;
        EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
                0, 0, NULL);
        if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
#endif
            PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
                        << saved_epfd;
            return -1;
        }

        const int rc = bthread_join(_tid, NULL);
        if (rc) {
            LOG(FATAL) << "Fail to join EpollThread, " << berror(rc);
            return -1;
        }
        close(closing_epoll_pipe[0]);
        close(closing_epoll_pipe[1]);
        close(saved_epfd);
        return 0;
    }

    int fd_wait(int fd, unsigned events, const timespec* abstime) {
        butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
        if (NULL == p) {
            errno = ENOMEM;
            return -1;
        }

        EpollButex* butex = p->load(butil::memory_order_consume);
        if (NULL == butex) {
            // It is rare to wait on one file descriptor from multiple threads
            // simultaneously. Creating singleton by optimistic locking here
            // saves mutexes for each butex.
            butex = butex_create_checked<EpollButex>();
            butex->store(0, butil::memory_order_relaxed);
            EpollButex* expected = NULL;
            if (!p->compare_exchange_strong(expected, butex,
                                            butil::memory_order_release,
                                            butil::memory_order_consume)) {
                butex_destroy(butex);
                butex = expected;
            }
        }
        
        while (butex == CLOSING_GUARD) {  // bthread_close() is running.
            if (sched_yield() < 0) {
                return -1;
            }
            butex = p->load(butil::memory_order_consume);
        }
        // Save value of butex before adding to epoll because the butex may
        // be changed before butex_wait. No memory fence because EPOLL_CTL_MOD
        // and EPOLL_CTL_ADD shall have release fence.
        const int expected_val = butex->load(butil::memory_order_relaxed);

#if defined(OS_LINUX)
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
        epoll_event evt = { events | EPOLLONESHOT, { butex } };
        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
            if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
                    errno != EEXIST) {
                PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
                return -1;
            }
        }
# else
        epoll_event evt;
        evt.events = events;
        evt.data.fd = fd;
        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
            errno != EEXIST) {
            PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
            return -1;
        }
# endif
#elif defined(OS_MACOSX)
        struct kevent kqueue_event;
        EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
                0, 0, butex);
        if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
            PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
            return -1;
        }
#endif
        while (butex->load(butil::memory_order_relaxed) == expected_val) {
            if (butex_wait(butex, expected_val, abstime) < 0 &&
                errno != EWOULDBLOCK && errno != EINTR) {
                return -1;
            }
        }
        return 0;
    }

    int fd_close(int fd) {
        if (fd < 0) {
            // what close(-1) returns
            errno = EBADF;
            return -1;
        }
        butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd);
        if (NULL == pbutex) {
            // Did not call bthread_fd functions, close directly.
            return close(fd);
        }
        EpollButex* butex = pbutex->exchange(
            CLOSING_GUARD, butil::memory_order_relaxed);
        if (butex == CLOSING_GUARD) {
            // concurrent double close detected.
            errno = EBADF;
            return -1;
        }
        if (butex != NULL) {
            butex->fetch_add(1, butil::memory_order_relaxed);
            butex_wake_all(butex);
        }
#if defined(OS_LINUX)
        epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
#elif defined(OS_MACOSX)
        struct kevent evt;
        EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
        kevent(_epfd, &evt, 1, NULL, 0, NULL);
        EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
        kevent(_epfd, &evt, 1, NULL, 0, NULL);
#endif
        const int rc = close(fd);
        pbutex->exchange(butex, butil::memory_order_relaxed);
        return rc;
    }

    bool started() const {
        return _epfd >= 0;
    }

private:
    static void* run_this(void* arg) {
        return static_cast<EpollThread*>(arg)->run();
    }

    void* run() {
        const int initial_epfd = _epfd;
        const size_t MAX_EVENTS = 32;
#if defined(OS_LINUX)
        epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
#elif defined(OS_MACOSX)
        typedef struct kevent KEVENT;
        struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
#endif
        if (NULL == e) {
            LOG(FATAL) << "Fail to new epoll_event";
            return NULL;
        }

#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
        DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
# endif
#endif
        while (!_stop) {
            const int epfd = _epfd;
#if defined(OS_LINUX)
            const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
#elif defined(OS_MACOSX)
            const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
#endif
            if (_stop) {
                break;
            }

            if (n < 0) {
                if (errno == EINTR) {
#ifndef NDEBUG
                    break_nums.fetch_add(1, butil::memory_order_relaxed);
                    int* p = &errno;
                    const char* b = berror();
                    const char* b2 = berror(errno);
                    DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", "
                                << errno << " " << p << " " <<  b << " " <<  b2;
#endif
                    continue;
                }

                PLOG(INFO) << "Fail to epoll epfd=" << epfd;
                break;
            }

#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
            for (int i = 0; i < n; ++i) {
                epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
            }
# endif
#endif
            for (int i = 0; i < n; ++i) {
#if defined(OS_LINUX)
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
                EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
# else
                butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
                EpollButex* butex = pbutex ?
                    pbutex->load(butil::memory_order_consume) : NULL;
# endif
#elif defined(OS_MACOSX)
                EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
#endif
                if (butex != NULL && butex != CLOSING_GUARD) {
                    butex->fetch_add(1, butil::memory_order_relaxed);
                    butex_wake_all(butex);
                }
            }
        }

        delete [] e;
        DLOG(INFO) << "EpollThread=" << _tid << "(epfd="
                   << initial_epfd << ") is about to stop";
        return NULL;
    }

    int _epfd;
    bool _stop;
    bthread_t _tid;
    butil::Mutex _start_mutex;
};