in erts/epmd/src/epmd_srv.c [232:565]
void run(EpmdVars *g)
{
struct EPMD_SOCKADDR_IN iserv_addr[MAX_LISTEN_SOCKETS];
int listensock[MAX_LISTEN_SOCKETS];
#if defined(EPMD6)
char socknamebuf[INET6_ADDRSTRLEN];
#else
char socknamebuf[INET_ADDRSTRLEN];
#endif
int num_sockets = 0;
int i;
int opt;
unsigned short sport = g->port;
int bound = 0;
node_init(g);
g->conn = conn_init(g);
#ifdef HAVE_SYSTEMD_DAEMON
if (g->is_systemd)
{
int n;
dbg_printf(g,2,"try to obtain sockets from systemd");
n = sd_listen_fds(0);
if (n < 0)
{
dbg_perror(g,"cannot obtain sockets from systemd");
epmd_cleanup_exit(g,1);
}
else if (n == 0)
{
dbg_tty_printf(g,0,"systemd provides no sockets");
epmd_cleanup_exit(g,1);
}
else if (n > MAX_LISTEN_SOCKETS)
{
dbg_tty_printf(g,0,"cannot listen on more than %d IP addresses", MAX_LISTEN_SOCKETS);
epmd_cleanup_exit(g,1);
}
num_sockets = n;
for (i = 0; i < num_sockets; i++)
{
g->listenfd[i] = listensock[i] = SD_LISTEN_FDS_START + i;
}
}
else
{
#endif /* HAVE_SYSTEMD_DAEMON */
dbg_printf(g,2,"try to initiate listening port %d", g->port);
if (g->addresses != NULL && /* String contains non-separator characters if: */
g->addresses[strspn(g->addresses," ,")] != '\000')
{
char *tmp = NULL;
char *token = NULL;
/* Always listen on the loopback. */
SET_ADDR(iserv_addr[num_sockets],htonl(INADDR_LOOPBACK),sport);
num_sockets++;
#if defined(EPMD6)
SET_ADDR6(iserv_addr[num_sockets],in6addr_loopback,sport);
num_sockets++;
#endif
if ((tmp = strdup(g->addresses)) == NULL)
{
dbg_perror(g,"cannot allocate memory");
epmd_cleanup_exit(g,1);
}
for(token = strtok(tmp,", ");
token != NULL;
token = strtok(NULL,", "))
{
struct in_addr addr;
#if defined(EPMD6)
struct in6_addr addr6;
struct sockaddr_storage *sa = &iserv_addr[num_sockets];
if (inet_pton(AF_INET6,token,&addr6) == 1)
{
SET_ADDR6(iserv_addr[num_sockets],addr6,sport);
}
else if (inet_pton(AF_INET,token,&addr) == 1)
{
SET_ADDR(iserv_addr[num_sockets],addr.s_addr,sport);
}
else
#else
if ((addr.s_addr = inet_addr(token)) != INADDR_NONE)
{
SET_ADDR(iserv_addr[num_sockets],addr.s_addr,sport);
}
else
#endif
{
dbg_tty_printf(g,0,"cannot parse IP address \"%s\"",token);
epmd_cleanup_exit(g,1);
}
#if defined(EPMD6)
if (sa->ss_family == AF_INET6 && IN6_IS_ADDR_LOOPBACK(&addr6))
continue;
if (sa->ss_family == AF_INET)
#endif
if (IS_ADDR_LOOPBACK(addr))
continue;
num_sockets++;
if (num_sockets >= MAX_LISTEN_SOCKETS)
{
dbg_tty_printf(g,0,"cannot listen on more than %d IP addresses",
MAX_LISTEN_SOCKETS);
epmd_cleanup_exit(g,1);
}
}
free(tmp);
}
else
{
SET_ADDR(iserv_addr[num_sockets],htonl(INADDR_ANY),sport);
num_sockets++;
#if defined(EPMD6)
SET_ADDR6(iserv_addr[num_sockets],in6addr_any,sport);
num_sockets++;
#endif
}
#ifdef HAVE_SYSTEMD_DAEMON
}
#endif /* HAVE_SYSTEMD_DAEMON */
#if !defined(__WIN32__)
/* We ignore the SIGPIPE signal that is raised when we call write
twice on a socket closed by the other end. */
signal(SIGPIPE, SIG_IGN);
#endif
/*
* Initialize number of active file descriptors.
* Stdin, stdout, and stderr are still open.
*/
g->active_conn = 3 + num_sockets;
g->max_conn -= num_sockets;
FD_ZERO(&g->orig_read_mask);
g->select_fd_top = 0;
#ifdef HAVE_SYSTEMD_DAEMON
if (g->is_systemd)
for (i = 0; i < num_sockets; i++)
select_fd_set(g, listensock[i]);
else
{
#endif /* HAVE_SYSTEMD_DAEMON */
for (i = 0; i < num_sockets; i++)
{
struct sockaddr *sa = (struct sockaddr *)&iserv_addr[i];
#if defined(EPMD6)
size_t salen = (sa->sa_family == AF_INET6 ?
sizeof(struct sockaddr_in6) :
sizeof(struct sockaddr_in));
#else
size_t salen = sizeof(struct sockaddr_in);
#endif
if ((listensock[i] = socket(sa->sa_family,SOCK_STREAM,0)) < 0)
{
switch (errno) {
case EAFNOSUPPORT:
case EPROTONOSUPPORT:
continue;
default:
dbg_perror(g,"error opening stream socket");
epmd_cleanup_exit(g,1);
}
}
g->listenfd[bound++] = listensock[i];
#if HAVE_DECL_IPV6_V6ONLY
opt = 1;
if (sa->sa_family == AF_INET6 &&
setsockopt(listensock[i],IPPROTO_IPV6,IPV6_V6ONLY,&opt,
sizeof(opt)) <0)
{
dbg_perror(g,"can't set IPv6 only socket option");
epmd_cleanup_exit(g,1);
}
#endif
/*
* Note that we must not enable the SO_REUSEADDR on Windows,
* because addresses will be reused even if they are still in use.
*/
#if !defined(__WIN32__)
opt = 1;
if (setsockopt(listensock[i],SOL_SOCKET,SO_REUSEADDR,(char* ) &opt,
sizeof(opt)) <0)
{
dbg_perror(g,"can't set sockopt");
epmd_cleanup_exit(g,1);
}
#endif
/* In rare cases select returns because there is someone
to accept but the request is withdrawn before the
accept function is called. We set the listen socket
to be non blocking to prevent us from being hanging
in accept() waiting for the next request. */
#if (defined(__WIN32__) || defined(NO_FCNTL))
opt = 1;
if (ioctl(listensock[i], FIONBIO, &opt) != 0)
#else
opt = fcntl(listensock[i], F_GETFL, 0);
if (fcntl(listensock[i], F_SETFL, opt | O_NONBLOCK) == -1)
#endif /* __WIN32__ */
dbg_perror(g,"failed to set non-blocking mode of listening socket %d on ipaddr %s",
listensock[i], epmd_ntop(&iserv_addr[num_sockets],
socknamebuf, sizeof(socknamebuf)));
if (bind(listensock[i], sa, salen) < 0)
{
if (errno == EADDRINUSE)
{
dbg_tty_printf(g,1,"there is already a epmd running at port %d on ipaddr %s",
g->port, epmd_ntop(&iserv_addr[num_sockets],
socknamebuf, sizeof(socknamebuf)));
epmd_cleanup_exit(g,0);
}
else
{
dbg_perror(g,"failed to bind on ipaddr %s",
epmd_ntop(&iserv_addr[num_sockets],
socknamebuf, sizeof(socknamebuf)));
epmd_cleanup_exit(g,1);
}
}
if(listen(listensock[i], SOMAXCONN) < 0) {
dbg_perror(g,"failed to listen on ipaddr %s",
epmd_ntop(&iserv_addr[num_sockets],
socknamebuf, sizeof(socknamebuf)));
epmd_cleanup_exit(g,1);
}
select_fd_set(g, listensock[i]);
}
if (bound == 0) {
dbg_perror(g,"unable to bind any address");
epmd_cleanup_exit(g,1);
}
num_sockets = bound;
#ifdef HAVE_SYSTEMD_DAEMON
}
if (g->is_systemd) {
sd_notifyf(0, "READY=1\n"
"STATUS=Processing port mapping requests...\n"
"MAINPID=%lu", (unsigned long) getpid());
}
#endif /* HAVE_SYSTEMD_DAEMON */
dbg_tty_printf(g,2,"entering the main select() loop");
select_again:
while(1)
{
fd_set read_mask = g->orig_read_mask;
struct timeval timeout;
int ret;
/* If we are idle we time out now and then to enable the code
below to close connections that are old and probably
hanging. Make sure that select will return often enough. */
timeout.tv_sec = (g->packet_timeout < IDLE_TIMEOUT) ? 1 : IDLE_TIMEOUT;
timeout.tv_usec = 0;
if ((ret = select(g->select_fd_top,
&read_mask, (fd_set *)0,(fd_set *)0,&timeout)) < 0) {
dbg_perror(g,"error in select ");
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
epmd_cleanup_exit(g,1);
}
}
else {
time_t now;
if (ret == 0) {
FD_ZERO(&read_mask);
}
if (g->delay_accept) { /* Test of busy server */
sleep(g->delay_accept);
}
for (i = 0; i < num_sockets; i++)
if (FD_ISSET(g->listenfd[i],&read_mask)) {
if (do_accept(g, g->listenfd[i]) && g->active_conn < g->max_conn) {
/*
* The accept() succeeded, and we have at least one file
* descriptor still free, which means that another accept()
* could succeed. Go do do another select(), in case there
* are more incoming connections waiting to be accepted.
*/
goto select_again;
}
}
/* Check all open streams marked by select for data or a
close. We also close all open sockets except ALIVE
with no activity for a long period */
now = current_time(g);
for (i = 0; i < g->max_conn; i++) {
if (g->conn[i].open == EPMD_TRUE) {
if (FD_ISSET(g->conn[i].fd,&read_mask))
do_read(g,&g->conn[i]);
else if ((g->conn[i].keep == EPMD_FALSE) &&
((g->conn[i].mod_time + g->packet_timeout) < now)) {
dbg_tty_printf(g,1,"closing because timed out on receive");
epmd_conn_close(g,&g->conn[i]);
}
}
}
}
}
}