in c/src/proactor/epoll.c [1543:1630]
void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog)
{
// TODO: check listener not already listening for this or another proactor
lock(&l->task.mutex);
l->task.proactor = p;
l->pending_accepteds = (accepted_t*)calloc(backlog, sizeof(accepted_t));
assert(l->pending_accepteds);
l->backlog = backlog;
pni_parse_addr(addr, l->addr_buf, sizeof(l->addr_buf), &l->host, &l->port);
struct addrinfo *addrinfo = NULL;
int gai_err = pgetaddrinfo(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo);
if (!gai_err) {
/* Count addresses, allocate enough space for sockets */
size_t len = 0;
for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
++len;
}
assert(len > 0); /* guaranteed by getaddrinfo */
l->acceptors = (acceptor_t*)calloc(len, sizeof(acceptor_t));
assert(l->acceptors); /* TODO aconway 2017-05-05: memory safety */
l->acceptors_size = 0;
uint16_t dynamic_port = 0; /* Record dynamic port from first bind(0) */
/* Find working listen addresses */
for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
if (dynamic_port) set_port(ai->ai_addr, dynamic_port);
int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
static int on = 1;
if (fd >= 0) {
configure_socket(fd);
if (!setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) &&
/* We listen to v4/v6 on separate sockets, don't let v6 listen for v4 */
(ai->ai_family != AF_INET6 ||
!setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on))) &&
!bind(fd, ai->ai_addr, ai->ai_addrlen) &&
!listen(fd, backlog))
{
acceptor_t *acceptor = &l->acceptors[l->acceptors_size++];
/* Get actual address */
socklen_t len = pn_netaddr_socklen(&acceptor->addr);
(void)getsockname(fd, (struct sockaddr*)(&acceptor->addr.ss), &len);
if (acceptor == l->acceptors) { /* First acceptor, check for dynamic port */
dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(&acceptor->addr));
} else { /* Link addr to previous addr */
(acceptor-1)->addr.next = &acceptor->addr;
}
acceptor->listener = l;
psocket_t *ps = &acceptor->psocket;
psocket_init(ps, LISTENER_IO);
ps->epoll_io.fd = fd;
ps->epoll_io.wanted = EPOLLIN;
ps->epoll_io.polling = false;
start_polling(&ps->epoll_io, l->task.proactor->epollfd); // TODO: check for error
l->active_count++;
acceptor->armed = true;
} else {
close(fd);
}
}
}
}
if (addrinfo) {
freeaddrinfo(addrinfo);
}
bool notify = schedule(&l->task);
if (l->acceptors_size == 0) { /* All failed, create dummy socket with an error */
l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t));
l->acceptors_size = 1;
memset(l->acceptors, 0, sizeof(acceptor_t));
psocket_init(&l->acceptors[0].psocket, LISTENER_IO);
l->acceptors[0].listener = l;
if (gai_err) {
psocket_gai_error(&l->acceptors[0].psocket, gai_err, "listen on");
} else {
psocket_error(&l->acceptors[0].psocket, errno, "listen on");
}
} else {
pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_OPEN);
}
proactor_add(&l->task);
unlock(&l->task.mutex);
if (notify) notify_poller(p);
return;
}