in src/main.c [12:186]
int run_ntttcp_sender(struct ntttcp_test_endpoint *tep)
{
int err_code = NO_ERROR;
struct ntttcp_test *test = tep->test;
char *log = NULL;
pthread_attr_t pth_attrs;
uint n, t, threads_created = 0;
uint conns_created = 0, conns_total = 0;
struct ntttcp_stream_client *cs;
int rc, reply_received;
void *p_retval;
struct timeval start_time, now;
long int conns_creation_time_usec = 0;
if (test->no_synch == false) {
/*
* Negotiate with receiver on:
* 1) receiver state: is receiver busy with another test?
* 2) submit sender's test duration time to receiver to negotiate
*/
reply_received = create_sender_sync_socket(tep);
if (reply_received == 0) {
PRINT_ERR("sender: failed to create sync socket");
return ERROR_GENERAL;
}
tep->synch_socket = reply_received;
reply_received = query_receiver_busy_state(tep->synch_socket);
if (reply_received == -1) {
PRINT_ERR("sender: failed to query receiver state");
return ERROR_GENERAL;
}
if (reply_received == 1) {
PRINT_ERR("sender: receiver is busy with another test");
return ERROR_GENERAL;
}
reply_received = negotiate_test_cycle_time(tep->synch_socket,
test->warmup + test->duration + test->cooldown);
if (reply_received == -1) {
PRINT_ERR("sender: failed to negotiate test cycle time with receiver");
return ERROR_GENERAL;
}
if (reply_received != test->duration) {
if (reply_received == 0) {
PRINT_INFO("test is negotiated to run with continuous mode");
set_ntttcp_test_endpoint_test_continuous(tep);
} else {
ASPRINTF(&log, "Test cycle time negotiated is: %d seconds", reply_received);
PRINT_INFO_FREE(log);
}
}
tep->negotiated_test_cycle_time = reply_received;
} else {
PRINT_INFO("Starting sender activity (no sync) ...");
}
/* prepare to create threads */
pthread_attr_init(&pth_attrs);
pthread_attr_setstacksize(&pth_attrs, THREAD_STACK_SIZE);
gettimeofday(&start_time, NULL);
/* create test threads */
for (t = 0; t < test->server_ports; t++) {
for (n = 0; n < test->threads_per_server_port; n++) {
cs = tep->client_streams[t * test->threads_per_server_port + n];
/*
* in client side, multiple connections will (one thread for one connection)
* connect to same port on server
*/
cs->server_port = test->server_base_port + t;
/* If sender side is being asked to pin the client source port */
if (test->client_base_port > 0)
cs->client_port = test->client_base_port
+ (t * test->threads_per_server_port + n) * test->conns_per_thread;
if (test->protocol == TCP) {
rc = pthread_create(&tep->threads[threads_created],
&pth_attrs,
run_ntttcp_sender_tcp_stream,
(void *)cs);
} else {
rc = pthread_create(&tep->threads[threads_created],
&pth_attrs,
run_ntttcp_sender_udp_stream,
(void *)cs);
}
if (rc) {
ASPRINTF(&log, "pthread_create() create thread failed. errno = %d", errno);
PRINT_ERR_FREE(log);
err_code = ERROR_PTHREAD_CREATE;
continue;
} else {
threads_created++;
}
}
}
pthread_attr_destroy(&pth_attrs);
ASPRINTF(&log, "%d threads created", threads_created);
PRINT_INFO_FREE(log);
/* wait for all connections created (timeout: CONNS_ESTAB_TIMEOUT seconds) */
conns_total = test->server_ports * test->threads_per_server_port * test->conns_per_thread;
while (conns_creation_time_usec < CONNS_ESTAB_TIMEOUT * SEC_TO_USEC) {
conns_created = 0;
usleep(TEST_STATUS_POLL_INTERVAL_U_SEC);
gettimeofday(&now, NULL);
conns_creation_time_usec = (now.tv_sec - start_time.tv_sec) * SEC_TO_USEC + now.tv_usec - start_time.tv_usec;
for (t = 0; t < test->server_ports; t++) {
for (n = 0; n < test->threads_per_server_port; n++) {
cs = tep->client_streams[t * test->threads_per_server_port + n];
conns_created += cs->num_conns_created;
}
}
if (conns_created == conns_total) {
ASPRINTF(&log, "%d connections created in %ld microseconds", conns_created, conns_creation_time_usec);
PRINT_INFO_FREE(log);
break;
}
}
if (conns_created != conns_total) {
ASPRINTF(&log,
"in %ld microseconds, only %d connections created (expected: %d)",
conns_creation_time_usec, conns_created, conns_total);
PRINT_ERR_FREE(log);
}
if (test->no_synch == false) {
/* request receiver to start the test */
reply_received = request_to_start(tep->synch_socket,
tep->test->last_client ? (int)'L' : (int)'R');
if (reply_received == -1) {
PRINT_ERR("sender: failed to sync with receiver to start test");
return ERROR_GENERAL;
}
if (reply_received == 0) {
PRINT_ERR("sender: receiver refuse to start test right now");
return ERROR_GENERAL;
}
/* if we go here, the pre-test sync has completed */
PRINT_INFO("Network activity progressing...");
}
turn_on_light();
/* in the case of running in continuous_mode */
if (tep->negotiated_test_cycle_time == 0) {
sleep(UINT_MAX);
/* either sleep has elapsed, or sleep was interrupted by a signal */
return err_code;
}
/*
* manage the test cycle
* will return after light is turned off
* (calling wait_light_off() inside of below
*/
run_ntttcp_throughput_management(tep);
for (n = 0; n < threads_created; n++) {
if (pthread_join(tep->threads[n], &p_retval) != 0) {
PRINT_ERR("sender: error when pthread_join");
continue;
}
}
process_test_results(tep);
print_test_results(tep);
return err_code;
}