in src/main.c [188:310]
int run_ntttcp_receiver(struct ntttcp_test_endpoint *tep)
{
int err_code = NO_ERROR;
struct ntttcp_test *test = tep->test;
char *log = NULL;
uint t, threads_created = 0;
struct ntttcp_stream_server *ss;
int rc;
if (!check_is_ip_addr_valid_local(test->domain, test->bind_address)) {
PRINT_ERR("cannot listen on the IP address specified");
return ERROR_ARGS;
}
/* create test threads */
for (t = 0; t < test->server_ports; t++) {
ss = tep->server_streams[t];
ss->server_port = test->server_base_port + t;
if (test->protocol == TCP) {
rc = pthread_create(&tep->threads[t], NULL, run_ntttcp_receiver_tcp_stream, (void *)ss);
} else {
rc = pthread_create(&tep->threads[t], NULL, run_ntttcp_receiver_udp_stream, (void *)ss);
}
if (rc) {
PRINT_ERR("pthread_create() create thread failed");
err_code = ERROR_PTHREAD_CREATE;
continue;
}
threads_created++;
}
/* create synch thread; and put it to the end of the thread array */
if (test->no_synch == false) {
/*
* ss struct is not used in sync thread, because:
* we are only allowed to pass one param to the thread in pthread_create();
* but the information stored in ss, is not enough to be used for synch;
* so we pass *tep to the pthread_create().
* notes:
* 1) we will calculate the tcp port for synch stream in create_receiver_sync_socket().
* the synch_port = base_port -1
* 2) we will assign the protocol for synch stream to TCP, always, in create_receiver_sync_socket()
*/
ss = tep->server_streams[test->server_ports];
ss->server_port = test->server_base_port - 1; /* just for bookkeeping */
ss->protocol = TCP; /* just for bookkeeping */
ss->is_sync_thread = true;
rc = pthread_create(&tep->threads[t], NULL, create_receiver_sync_socket, (void *)tep);
if (rc) {
PRINT_ERR("pthread_create() create thread failed");
err_code = ERROR_PTHREAD_CREATE;
} else {
threads_created++;
}
}
ASPRINTF(&log, "%d threads created", threads_created);
PRINT_INFO_FREE(log);
while (1) {
/*
* for receiver, there are two ways to trigger test start:
* a) if synch enabled, then sync thread will trigger turn_on_light() after sync completed;
* see create_receiver_sync_socket()
* b) if no synch enabled, then any tcp server accept client connections, the turn_on_light() will be triggered;
* see ntttcp_server_epoll(), or ntttcp_server_select()
*/
wait_light_on();
/*
* reset the counter?
* yes. we need to reset server side perf counters at the beginning, after light-is-on;
* this is to handle these cases when:
* a) receiver in sync mode, but sender connected as no_sync mode;
* in this case, before light-is-on, the threads have some data counted already.
* b) receiver is running in a loop; the previous test has finished but the sockets are still working and
* receiving data (data arrived with latency); so we need to reset the counter before a new test starting.
*
* this "reset" is implemented by using __sync_lock_test_and_set().
* reference: https://gcc.gnu.org/onlinedocs/gcc-4.4.7/gcc/Atomic-Builtins.html
*/
for (t = 0; t < threads_created; t++)
/* discard the bytes received before test starting */
(uint64_t) __sync_lock_test_and_set(&(tep->server_streams[t]->total_bytes_transferred), 0);
/* in the case of running in continuous_mode */
if (tep->negotiated_test_cycle_time == 0) {
sleep(UINT_MAX);
/* either sleep has elapsed, or sleep was interrupted by a signal */
return err_code;
}
/*
* manage the test cycle
* will return after light is turned off
* (calling wait_light_off() inside of below function)
*/
run_ntttcp_throughput_management(tep);
process_test_results(tep);
print_test_results(tep);
/* reset this variable, in case receiver is running as '-H' (receiver is running in loop) */
tep->num_remote_endpoints = 0;
for (t = 0; t < MAX_REMOTE_ENDPOINTS; t++)
tep->remote_endpoints[t] = -1;
if (tep->receiver_exit_after_done)
break;
}
for (t = 0; t < threads_created; t++) {
if (pthread_join(tep->threads[t], NULL) != 0) {
PRINT_ERR("receiver: error when pthread_join");
continue;
}
}
return err_code;
}