int ntttcp_server_epoll()

in src/tcpstream.c [373:531]


int ntttcp_server_epoll(struct ntttcp_stream_server *ss)
{
	int err_code = NO_ERROR;
	char *log = NULL;

	int efd = 0, n_fds = 0, newfd = 0, current_fd = 0;
	char *buffer; /* receive buffer */
	uint64_t nbytes; /* bytes read */
	int bytes_to_be_read = 0; /* read bytes from socket */
	struct epoll_event event, *events;

	struct sockaddr_storage peer_addr, local_addr; /* for remote peer, and local address */
	socklen_t peer_addr_size, local_addr_size;
	char *ip_address_str;
	int ip_addr_max_size;
	int i = 0;
	int max_io = 0;

	if ((buffer = (char *)malloc(ss->recv_buf_size)) == (char *)NULL) {
		PRINT_ERR("cannot allocate memory for receive buffer");
		return ERROR_MEMORY_ALLOC;
	}
	ip_addr_max_size = (ss->domain == AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN);
	if ((ip_address_str = (char *)malloc(ip_addr_max_size)) == (char *)NULL) {
		PRINT_ERR("cannot allocate memory for ip address of peer");
		free(buffer);
		return ERROR_MEMORY_ALLOC;
	}

	efd = epoll_create1(0);
	if (efd == -1) {
		PRINT_ERR("epoll_create1 failed");
		free(buffer);
		free(ip_address_str);
		return ERROR_EPOLL;
	}

	event.data.fd = ss->listener;
	event.events = EPOLLIN;
	if (epoll_ctl(efd, EPOLL_CTL_ADD, ss->listener, &event) != 0) {
		PRINT_ERR("epoll_ctl failed");
		free(buffer);
		free(ip_address_str);
		close(efd);
		return ERROR_EPOLL;
	}

	/* Buffer where events are returned */
	events = calloc(MAX_EPOLL_EVENTS, sizeof event);

	while (1) {
		if (ss->endpoint->receiver_exit_after_done &&
		    ss->endpoint->state == TEST_FINISHED)
			break;

		n_fds = epoll_wait(efd, events, MAX_EPOLL_EVENTS, -1);
		for (i = 0; i < n_fds; i++) {
			current_fd = events[i].data.fd;

			if ((events[i].events & EPOLLERR) ||
			    (events[i].events & EPOLLHUP) ||
			    (!(events[i].events & EPOLLIN))) {
				/* An error has occurred on this fd, or the socket is not ready for reading */
				PRINT_ERR("error happened on the associated connection");
				close(current_fd);
				continue;
			}

			/* then, we got one fd to handle */
			/* a NEW connection coming */
			if (current_fd == ss->listener) {
				/* We have a notification on the listening socket, which means one or more incoming connections. */
				while (1) {
					peer_addr_size = sizeof(peer_addr);
					newfd = accept(ss->listener, (struct sockaddr *)&peer_addr, &peer_addr_size);
					if (newfd == -1) {
						if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
							/* We have processed all incoming connections. */
							break;
						} else {
							ASPRINTF(&log, "error to accept new connections. errno = %d", errno)
							PRINT_ERR_FREE(log);
							break;
						}
					}

					if (set_socket_non_blocking(newfd) == -1) {
						ASPRINTF(&log, "cannot set the new socket as non-blocking: %d", newfd);
						PRINT_DBG_FREE(log);
					}

					local_addr_size = sizeof(local_addr);
					if (getsockname(newfd, (struct sockaddr *)&local_addr, &local_addr_size) != 0) {
						ASPRINTF(&log, "failed to get local address information for the new socket: %d", newfd);
						PRINT_DBG_FREE(log);
					} else {
						ASPRINTF(&log,
							"New connection: %s:%d --> local:%d [socket %d]",
							ip_address_str = retrive_ip_address_str(&peer_addr, ip_address_str, ip_addr_max_size),
							ntohs(ss->domain == AF_INET ?
							     ((struct sockaddr_in *)&peer_addr)->sin_port :
							     ((struct sockaddr_in6 *)&peer_addr)->sin6_port),
							ntohs(ss->domain == AF_INET ?
							     ((struct sockaddr_in *)&local_addr)->sin_port :
							     ((struct sockaddr_in6 *)&local_addr)->sin6_port),
							newfd);
						PRINT_DBG_FREE(log);
					}

					event.data.fd = newfd;
					event.events = EPOLLIN;
					if (epoll_ctl(efd, EPOLL_CTL_ADD, newfd, &event) != 0)
						PRINT_ERR("epoll_ctl failed");

					/* if there is no synch thread, if any new connection coming, indicates ss started */
					if (ss->no_synch)
						turn_on_light();
					/* else, leave the sync thread to fire the trigger */
				}
			}
			/* handle data from an EXISTING client */
			else {
				for (max_io = 0; max_io < MAX_IO_PER_POLL; max_io++) {
					bytes_to_be_read = ss->is_sync_thread ? 1 : ss->recv_buf_size;

					/* got error or connection closed by client */
					errno = 0;
					nbytes = n_recv(current_fd, buffer, bytes_to_be_read);
					if (nbytes <= 0) {
						if (errno != EAGAIN) {
							if (nbytes == 0) {
								ASPRINTF(&log, "socket closed: %d", i);
								PRINT_DBG_FREE(log);
							} else {
								ASPRINTF(&log, "error: cannot read data from socket: %d", i);
								PRINT_INFO_FREE(log);
								err_code = ERROR_NETWORK_READ;
								/* need to continue ss and check other socket, so don't end the ss */
							}
							close(current_fd);
						}
						break;
					}
					/* report how many bytes received */
					else {
						__sync_fetch_and_add(&(ss->total_bytes_transferred), nbytes);
					}
				}
			}
		}
	}

	free(buffer);
	free(ip_address_str);
	free(events);
	close(efd);
	close(ss->listener);
	return err_code;
}