int pthread_timed_connect()

in src/butil/endpoint.cpp [466:671]


int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
                          socklen_t addrlen, const timespec* abstime) {
    bool is_blocking = butil::is_blocking(sockfd);
    if (is_blocking) {
        butil::make_non_blocking(sockfd);
    }
    // Scoped non-blocking.
    BRPC_SCOPE_EXIT {
        if (is_blocking) {
            butil::make_blocking(sockfd);
        }
    };

    const int rc = ::connect(sockfd, serv_addr, addrlen);
    if (rc == 0 || errno != EINPROGRESS) {
        return rc;
    }
#if defined(OS_LINUX)
    if (pthread_fd_wait(sockfd, EPOLLOUT, abstime) < 0) {
#elif defined(OS_MACOSX)
    if (pthread_fd_wait(sockfd, EVFILT_WRITE, abstime) < 0) {
#endif
        return -1;
    }

    if (is_connected(sockfd) != 0) {
        return -1;
    }
    return 0;
}

int tcp_connect(EndPoint server, int* self_port) {
    return tcp_connect(server, self_port, -1);
}

int tcp_connect(const EndPoint& server, int* self_port, int connect_timeout_ms) {
    struct sockaddr_storage serv_addr{};
    socklen_t serv_addr_size = 0;
    if (endpoint2sockaddr(server, &serv_addr, &serv_addr_size) != 0) {
        return -1;
    }
    fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
    if (sockfd < 0) {
        return -1;
    }
    timespec abstime{};
    timespec* abstime_ptr = NULL;
    if (connect_timeout_ms > 0) {
        abstime = butil::milliseconds_from_now(connect_timeout_ms);
        abstime_ptr = &abstime;
    }
    int rc;
    if (bthread_timed_connect != NULL) {
        rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
                                   serv_addr_size, abstime_ptr);
    } else {
        rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
                                   serv_addr_size, abstime_ptr);
    }
    if (rc < 0) {
        return -1;
    }
    if (self_port != NULL) {
        EndPoint pt;
        if (get_local_side(sockfd, &pt) == 0) {
            *self_port = pt.port;
        } else {
            CHECK(false) << "Fail to get the local port of sockfd=" << sockfd;
        }
    }
    return sockfd.release();
}

int tcp_listen(EndPoint point) {
    struct sockaddr_storage serv_addr;
    socklen_t serv_addr_size = 0;
    if (endpoint2sockaddr(point, &serv_addr, &serv_addr_size) != 0) {
        return -1;
    }
    fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
    if (sockfd < 0) {
        return -1;
    }

    if (FLAGS_reuse_addr) {
#if defined(SO_REUSEADDR)
        const int on = 1;
        if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
                       &on, sizeof(on)) != 0) {
            return -1;
        }
#else
        LOG(ERROR) << "Missing def of SO_REUSEADDR while -reuse_addr is on";
        return -1;
#endif
    }

    if (FLAGS_reuse_port) {
#if defined(SO_REUSEPORT)
        const int on = 1;
        if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
                       &on, sizeof(on)) != 0) {
            LOG(WARNING) << "Fail to setsockopt SO_REUSEPORT of sockfd=" << sockfd;
        }
#else
        LOG(ERROR) << "Missing def of SO_REUSEPORT while -reuse_port is on";
        return -1;
#endif
    }

    if (FLAGS_reuse_uds_path && serv_addr.ss_family == AF_UNIX) {
        ::unlink(((sockaddr_un*) &serv_addr)->sun_path);
    }

    if (bind(sockfd, (struct sockaddr*)& serv_addr, serv_addr_size) != 0) {
        return -1;
    }
    if (listen(sockfd, 65535) != 0) {
        //             ^^^ kernel would silently truncate backlog to the value
        //             defined in /proc/sys/net/core/somaxconn if it is less
        //             than 65535
        return -1;
    }
    return sockfd.release();
}

int get_local_side(int fd, EndPoint *out) {
    struct sockaddr_storage addr;
    socklen_t socklen = sizeof(addr);
    const int rc = getsockname(fd, (struct sockaddr*)&addr, &socklen);
    if (rc != 0) {
        return rc;
    }
    if (out) {
        return sockaddr2endpoint(&addr, socklen, out);
    }
    return 0;
}

int get_remote_side(int fd, EndPoint *out) {
    struct sockaddr_storage addr;
    bzero(&addr, sizeof(addr));
    socklen_t socklen = sizeof(addr);
    const int rc = getpeername(fd, (struct sockaddr*)&addr, &socklen);
    if (rc != 0) {
        return rc;
    }
    if (out) {
        return sockaddr2endpoint(&addr, socklen, out);
    }
    return 0;
}

int endpoint2sockaddr(const EndPoint& point, struct sockaddr_storage* ss, socklen_t* size) {
    bzero(ss, sizeof(*ss));
    if (ExtendedEndPoint::is_extended(point)) {
        ExtendedEndPoint* eep = ExtendedEndPoint::address(point);
        if (!eep) {
            return -1;
        }
        int ret = eep->to(ss);
        if (ret < 0) {
            return -1;
        }
        if (size) {
            *size = static_cast<socklen_t>(ret);
        }
        return 0;
    }
    struct sockaddr_in* in4 = (struct sockaddr_in*) ss;
    in4->sin_family = AF_INET;
    in4->sin_addr = point.ip;
    in4->sin_port = htons(point.port);
    if (size) {
        *size = sizeof(*in4);
    }
    return 0;
}

int sockaddr2endpoint(struct sockaddr_storage* ss, socklen_t size, EndPoint* point) {
    if (ss->ss_family == AF_INET) {
        *point = EndPoint(*(sockaddr_in*)ss);
        return 0;
    }
    if (ExtendedEndPoint::create(ss, size, point)) {
        return 0;
    }
    return -1;
}

sa_family_t get_endpoint_type(const EndPoint& point) {
    if (ExtendedEndPoint::is_extended(point)) {
        ExtendedEndPoint* eep = ExtendedEndPoint::address(point);
        if (eep) {
            return eep->family();
        }
        return AF_UNSPEC;
    }
    return AF_INET;
}

bool is_endpoint_extended(const EndPoint& point) {
    return ExtendedEndPoint::is_extended(point);
}

}  // namespace butil