in src/butil/endpoint.cpp [466:671]
int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
socklen_t addrlen, const timespec* abstime) {
bool is_blocking = butil::is_blocking(sockfd);
if (is_blocking) {
butil::make_non_blocking(sockfd);
}
// Scoped non-blocking.
BRPC_SCOPE_EXIT {
if (is_blocking) {
butil::make_blocking(sockfd);
}
};
const int rc = ::connect(sockfd, serv_addr, addrlen);
if (rc == 0 || errno != EINPROGRESS) {
return rc;
}
#if defined(OS_LINUX)
if (pthread_fd_wait(sockfd, EPOLLOUT, abstime) < 0) {
#elif defined(OS_MACOSX)
if (pthread_fd_wait(sockfd, EVFILT_WRITE, abstime) < 0) {
#endif
return -1;
}
if (is_connected(sockfd) != 0) {
return -1;
}
return 0;
}
int tcp_connect(EndPoint server, int* self_port) {
return tcp_connect(server, self_port, -1);
}
int tcp_connect(const EndPoint& server, int* self_port, int connect_timeout_ms) {
struct sockaddr_storage serv_addr{};
socklen_t serv_addr_size = 0;
if (endpoint2sockaddr(server, &serv_addr, &serv_addr_size) != 0) {
return -1;
}
fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
if (sockfd < 0) {
return -1;
}
timespec abstime{};
timespec* abstime_ptr = NULL;
if (connect_timeout_ms > 0) {
abstime = butil::milliseconds_from_now(connect_timeout_ms);
abstime_ptr = &abstime;
}
int rc;
if (bthread_timed_connect != NULL) {
rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
serv_addr_size, abstime_ptr);
} else {
rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size, abstime_ptr);
}
if (rc < 0) {
return -1;
}
if (self_port != NULL) {
EndPoint pt;
if (get_local_side(sockfd, &pt) == 0) {
*self_port = pt.port;
} else {
CHECK(false) << "Fail to get the local port of sockfd=" << sockfd;
}
}
return sockfd.release();
}
int tcp_listen(EndPoint point) {
struct sockaddr_storage serv_addr;
socklen_t serv_addr_size = 0;
if (endpoint2sockaddr(point, &serv_addr, &serv_addr_size) != 0) {
return -1;
}
fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
if (sockfd < 0) {
return -1;
}
if (FLAGS_reuse_addr) {
#if defined(SO_REUSEADDR)
const int on = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
&on, sizeof(on)) != 0) {
return -1;
}
#else
LOG(ERROR) << "Missing def of SO_REUSEADDR while -reuse_addr is on";
return -1;
#endif
}
if (FLAGS_reuse_port) {
#if defined(SO_REUSEPORT)
const int on = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
&on, sizeof(on)) != 0) {
LOG(WARNING) << "Fail to setsockopt SO_REUSEPORT of sockfd=" << sockfd;
}
#else
LOG(ERROR) << "Missing def of SO_REUSEPORT while -reuse_port is on";
return -1;
#endif
}
if (FLAGS_reuse_uds_path && serv_addr.ss_family == AF_UNIX) {
::unlink(((sockaddr_un*) &serv_addr)->sun_path);
}
if (bind(sockfd, (struct sockaddr*)& serv_addr, serv_addr_size) != 0) {
return -1;
}
if (listen(sockfd, 65535) != 0) {
// ^^^ kernel would silently truncate backlog to the value
// defined in /proc/sys/net/core/somaxconn if it is less
// than 65535
return -1;
}
return sockfd.release();
}
int get_local_side(int fd, EndPoint *out) {
struct sockaddr_storage addr;
socklen_t socklen = sizeof(addr);
const int rc = getsockname(fd, (struct sockaddr*)&addr, &socklen);
if (rc != 0) {
return rc;
}
if (out) {
return sockaddr2endpoint(&addr, socklen, out);
}
return 0;
}
int get_remote_side(int fd, EndPoint *out) {
struct sockaddr_storage addr;
bzero(&addr, sizeof(addr));
socklen_t socklen = sizeof(addr);
const int rc = getpeername(fd, (struct sockaddr*)&addr, &socklen);
if (rc != 0) {
return rc;
}
if (out) {
return sockaddr2endpoint(&addr, socklen, out);
}
return 0;
}
int endpoint2sockaddr(const EndPoint& point, struct sockaddr_storage* ss, socklen_t* size) {
bzero(ss, sizeof(*ss));
if (ExtendedEndPoint::is_extended(point)) {
ExtendedEndPoint* eep = ExtendedEndPoint::address(point);
if (!eep) {
return -1;
}
int ret = eep->to(ss);
if (ret < 0) {
return -1;
}
if (size) {
*size = static_cast<socklen_t>(ret);
}
return 0;
}
struct sockaddr_in* in4 = (struct sockaddr_in*) ss;
in4->sin_family = AF_INET;
in4->sin_addr = point.ip;
in4->sin_port = htons(point.port);
if (size) {
*size = sizeof(*in4);
}
return 0;
}
int sockaddr2endpoint(struct sockaddr_storage* ss, socklen_t size, EndPoint* point) {
if (ss->ss_family == AF_INET) {
*point = EndPoint(*(sockaddr_in*)ss);
return 0;
}
if (ExtendedEndPoint::create(ss, size, point)) {
return 0;
}
return -1;
}
sa_family_t get_endpoint_type(const EndPoint& point) {
if (ExtendedEndPoint::is_extended(point)) {
ExtendedEndPoint* eep = ExtendedEndPoint::address(point);
if (eep) {
return eep->family();
}
return AF_UNSPEC;
}
return AF_INET;
}
bool is_endpoint_extended(const EndPoint& point) {
return ExtendedEndPoint::is_extended(point);
}
} // namespace butil