void pn_proactor_listen()

in c/src/proactor/epoll.c [1543:1630]


void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog)
{
  // TODO: check listener not already listening for this or another proactor
  lock(&l->task.mutex);
  l->task.proactor = p;

  l->pending_accepteds = (accepted_t*)calloc(backlog, sizeof(accepted_t));
  assert(l->pending_accepteds);
  l->backlog = backlog;

  pni_parse_addr(addr, l->addr_buf, sizeof(l->addr_buf), &l->host, &l->port);

  struct addrinfo *addrinfo = NULL;
  int gai_err = pgetaddrinfo(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo);
  if (!gai_err) {
    /* Count addresses, allocate enough space for sockets */
    size_t len = 0;
    for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
      ++len;
    }
    assert(len > 0);            /* guaranteed by getaddrinfo */
    l->acceptors = (acceptor_t*)calloc(len, sizeof(acceptor_t));
    assert(l->acceptors);      /* TODO aconway 2017-05-05: memory safety */
    l->acceptors_size = 0;
    uint16_t dynamic_port = 0;  /* Record dynamic port from first bind(0) */
    /* Find working listen addresses */
    for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
      if (dynamic_port) set_port(ai->ai_addr, dynamic_port);
      int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
      static int on = 1;
      if (fd >= 0) {
        configure_socket(fd);
        if (!setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) &&
            /* We listen to v4/v6 on separate sockets, don't let v6 listen for v4 */
            (ai->ai_family != AF_INET6 ||
             !setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on))) &&
            !bind(fd, ai->ai_addr, ai->ai_addrlen) &&
            !listen(fd, backlog))
        {
          acceptor_t *acceptor = &l->acceptors[l->acceptors_size++];
          /* Get actual address */
          socklen_t len = pn_netaddr_socklen(&acceptor->addr);
          (void)getsockname(fd, (struct sockaddr*)(&acceptor->addr.ss), &len);
          if (acceptor == l->acceptors) { /* First acceptor, check for dynamic port */
            dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(&acceptor->addr));
          } else {              /* Link addr to previous addr */
            (acceptor-1)->addr.next = &acceptor->addr;
          }

          acceptor->listener = l;
          psocket_t *ps = &acceptor->psocket;
          psocket_init(ps, LISTENER_IO);
          ps->epoll_io.fd = fd;
          ps->epoll_io.wanted = EPOLLIN;
          ps->epoll_io.polling = false;
          start_polling(&ps->epoll_io, l->task.proactor->epollfd);  // TODO: check for error
          l->active_count++;
          acceptor->armed = true;
        } else {
          close(fd);
        }
      }
    }
  }
  if (addrinfo) {
    freeaddrinfo(addrinfo);
  }
  bool notify = schedule(&l->task);

  if (l->acceptors_size == 0) { /* All failed, create dummy socket with an error */
    l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t));
    l->acceptors_size = 1;
    memset(l->acceptors, 0, sizeof(acceptor_t));
    psocket_init(&l->acceptors[0].psocket, LISTENER_IO);
    l->acceptors[0].listener = l;
    if (gai_err) {
      psocket_gai_error(&l->acceptors[0].psocket, gai_err, "listen on");
    } else {
      psocket_error(&l->acceptors[0].psocket, errno, "listen on");
    }
  } else {
    pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_OPEN);
  }
  proactor_add(&l->task);
  unlock(&l->task.mutex);
  if (notify) notify_poller(p);
  return;
}