in src/tcpstream.c [533:661]
int ntttcp_server_select(struct ntttcp_stream_server *ss)
{
int err_code = NO_ERROR;
char *log = NULL;
int n_fds = 0, newfd, current_fd = 0;
char *buffer; /* receive buffer */
uint64_t nbytes; /* bytes read */
int bytes_to_be_read = 0; /* read bytes from socket */
fd_set read_set, write_set;
struct sockaddr_storage peer_addr, local_addr; /* for remote peer, and local address */
socklen_t peer_addr_size, local_addr_size;
char *ip_address_str;
int ip_addr_max_size;
int max_io = 0;
if ((buffer = (char *)malloc(ss->recv_buf_size)) == (char *)NULL) {
PRINT_ERR("cannot allocate memory for receive buffer");
return ERROR_MEMORY_ALLOC;
}
ip_addr_max_size = (ss->domain == AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN);
if ((ip_address_str = (char *)malloc(ip_addr_max_size)) == (char *)NULL) {
PRINT_ERR("cannot allocate memory for ip address of peer");
free(buffer);
return ERROR_MEMORY_ALLOC;
}
/* accept new client, receive data from client */
while (1) {
if (ss->endpoint->receiver_exit_after_done &&
ss->endpoint->state == TEST_FINISHED)
break;
memcpy(&read_set, &ss->read_set, sizeof(fd_set));
memcpy(&write_set, &ss->write_set, sizeof(fd_set));
/* we are notified by select() */
n_fds = select(ss->max_fd + 1, &read_set, NULL, NULL, NULL);
if (n_fds < 0 && errno != EINTR) {
PRINT_ERR("error happened when select()");
err_code = ERROR_SELECT;
continue;
}
/*run through the existing connections looking for data to be read*/
for (current_fd = 0; current_fd <= ss->max_fd; current_fd++) {
if (!FD_ISSET(current_fd, &read_set))
continue;
/* then, we got one fd to handle */
/* a NEW connection coming */
if (current_fd == ss->listener) {
/* handle new connections */
peer_addr_size = sizeof(peer_addr);
if ((newfd = accept(ss->listener, (struct sockaddr *)&peer_addr, &peer_addr_size)) < 0) {
err_code = ERROR_ACCEPT;
break;
}
/* then we got a new connection */
if (set_socket_non_blocking(newfd) == -1) {
ASPRINTF(&log, "cannot set the new socket as non-blocking: %d", newfd);
PRINT_DBG_FREE(log);
}
FD_SET(newfd, &ss->read_set); /* add the new one to read_set */
if (newfd > ss->max_fd) {
/* update the maximum */
ss->max_fd = newfd;
}
/* print out new connection info */
local_addr_size = sizeof(local_addr);
if (getsockname(newfd, (struct sockaddr *)&local_addr, &local_addr_size) != 0) {
ASPRINTF(&log, "failed to get local address information for the new socket: %d", newfd);
PRINT_DBG_FREE(log);
} else {
ASPRINTF(&log,
"New connection: %s:%d --> local:%d [socket %d]",
ip_address_str = retrive_ip_address_str(&peer_addr, ip_address_str, ip_addr_max_size),
ntohs(ss->domain == AF_INET ? ((struct sockaddr_in *)&peer_addr)->sin_port : ((struct sockaddr_in6 *)&peer_addr)->sin6_port),
ntohs(ss->domain == AF_INET ? ((struct sockaddr_in *)&local_addr)->sin_port : ((struct sockaddr_in6 *)&local_addr)->sin6_port),
newfd);
PRINT_DBG_FREE(log);
}
/* if there is no synch thread, if any new connection coming, indicates ss started */
if (ss->no_synch)
turn_on_light();
/* else, leave the sync thread to fire the trigger */
}
/* handle data from an EXISTING client */
else {
for (max_io = 0; max_io < MAX_IO_PER_POLL; max_io++) {
bytes_to_be_read = ss->is_sync_thread ? 1 : ss->recv_buf_size;
/* got error or connection closed by client */
errno = 0;
nbytes = n_recv(current_fd, buffer, bytes_to_be_read);
if (nbytes <= 0) {
if (errno != EAGAIN) {
if (nbytes == 0) {
ASPRINTF(&log, "socket closed: %d", current_fd);
PRINT_DBG_FREE(log);
} else {
ASPRINTF(&log, "error: cannot read data from socket: %d", current_fd);
PRINT_INFO_FREE(log);
err_code = ERROR_NETWORK_READ;
/* need to continue test and check other socket, so don't end the test */
}
close(current_fd);
FD_CLR(current_fd, &ss->read_set); /* remove from master set when finished */
}
break;
}
/* report how many bytes received */
else {
__sync_fetch_and_add(&(ss->total_bytes_transferred), nbytes);
}
}
}
}
}
free(buffer);
free(ip_address_str);
close(ss->listener);
return err_code;
}