int run_ntttcp_sender()

in src/main.c [12:186]


int run_ntttcp_sender(struct ntttcp_test_endpoint *tep)
{
	int err_code = NO_ERROR;
	struct ntttcp_test *test = tep->test;
	char *log = NULL;

	pthread_attr_t pth_attrs;
	uint n, t, threads_created = 0;
	uint conns_created = 0, conns_total = 0;
	struct ntttcp_stream_client *cs;
	int rc, reply_received;
	void *p_retval;
	struct timeval start_time, now;
	long int conns_creation_time_usec = 0;

	if (test->no_synch == false) {
		/*
		 * Negotiate with receiver on:
		 * 1) receiver state: is receiver busy with another test?
		 * 2) submit sender's test duration time to receiver to negotiate
		 */
		reply_received = create_sender_sync_socket(tep);
		if (reply_received == 0) {
			PRINT_ERR("sender: failed to create sync socket");
			return ERROR_GENERAL;
		}
		tep->synch_socket = reply_received;
		reply_received = query_receiver_busy_state(tep->synch_socket);
		if (reply_received == -1) {
			PRINT_ERR("sender: failed to query receiver state");
			return ERROR_GENERAL;
		}
		if (reply_received == 1) {
			PRINT_ERR("sender: receiver is busy with another test");
			return ERROR_GENERAL;
		}

		reply_received = negotiate_test_cycle_time(tep->synch_socket,
							   test->warmup + test->duration + test->cooldown);
		if (reply_received == -1) {
			PRINT_ERR("sender: failed to negotiate test cycle time with receiver");
			return ERROR_GENERAL;
		}
		if (reply_received != test->duration) {
			if (reply_received == 0) {
				PRINT_INFO("test is negotiated to run with continuous mode");
				set_ntttcp_test_endpoint_test_continuous(tep);
			} else {
				ASPRINTF(&log, "Test cycle time negotiated is: %d seconds", reply_received);
				PRINT_INFO_FREE(log);
			}
		}
		tep->negotiated_test_cycle_time = reply_received;
	} else {
		PRINT_INFO("Starting sender activity (no sync) ...");
	}

	/* prepare to create threads */
	pthread_attr_init(&pth_attrs);
	pthread_attr_setstacksize(&pth_attrs, THREAD_STACK_SIZE);
	gettimeofday(&start_time, NULL);
	/* create test threads */
	for (t = 0; t < test->server_ports; t++) {
		for (n = 0; n < test->threads_per_server_port; n++) {
			cs = tep->client_streams[t * test->threads_per_server_port + n];
			/*
			 * in client side, multiple connections will (one thread for one connection)
			 * connect to same port on server
			 */
			cs->server_port = test->server_base_port + t;

			/* If sender side is being asked to pin the client source port */
			if (test->client_base_port > 0)
				cs->client_port = test->client_base_port
						  + (t * test->threads_per_server_port + n) * test->conns_per_thread;

			if (test->protocol == TCP) {
				rc = pthread_create(&tep->threads[threads_created],
							&pth_attrs,
							run_ntttcp_sender_tcp_stream,
							(void *)cs);
			} else {
				rc = pthread_create(&tep->threads[threads_created],
							&pth_attrs,
							run_ntttcp_sender_udp_stream,
							(void *)cs);
			}

			if (rc) {
				ASPRINTF(&log, "pthread_create() create thread failed. errno = %d", errno);
				PRINT_ERR_FREE(log);
				err_code = ERROR_PTHREAD_CREATE;
				continue;
			} else {
				threads_created++;
			}
		}
	}
	pthread_attr_destroy(&pth_attrs);

	ASPRINTF(&log, "%d threads created", threads_created);
	PRINT_INFO_FREE(log);

	/* wait for all connections created (timeout: CONNS_ESTAB_TIMEOUT seconds) */
	conns_total = test->server_ports * test->threads_per_server_port * test->conns_per_thread;
	while (conns_creation_time_usec < CONNS_ESTAB_TIMEOUT * SEC_TO_USEC) {
		conns_created = 0;
		usleep(TEST_STATUS_POLL_INTERVAL_U_SEC);

		gettimeofday(&now, NULL);
		conns_creation_time_usec = (now.tv_sec - start_time.tv_sec) * SEC_TO_USEC + now.tv_usec - start_time.tv_usec;
		for (t = 0; t < test->server_ports; t++) {
			for (n = 0; n < test->threads_per_server_port; n++) {
				cs = tep->client_streams[t * test->threads_per_server_port + n];
				conns_created += cs->num_conns_created;
			}
		}
		if (conns_created == conns_total) {
			ASPRINTF(&log, "%d connections created in %ld microseconds", conns_created, conns_creation_time_usec);
			PRINT_INFO_FREE(log);
			break;
		}
	}
	if (conns_created != conns_total) {
		ASPRINTF(&log,
			"in %ld microseconds, only %d connections created (expected: %d)",
			conns_creation_time_usec, conns_created, conns_total);
		PRINT_ERR_FREE(log);
	}

	if (test->no_synch == false) {
		/* request receiver to start the test */
		reply_received = request_to_start(tep->synch_socket,
						  tep->test->last_client ? (int)'L' : (int)'R');
		if (reply_received == -1) {
			PRINT_ERR("sender: failed to sync with receiver to start test");
			return ERROR_GENERAL;
		}
		if (reply_received == 0) {
			PRINT_ERR("sender: receiver refuse to start test right now");
			return ERROR_GENERAL;
		}

		/* if we go here, the pre-test sync has completed */
		PRINT_INFO("Network activity progressing...");
	}

	turn_on_light();

	/* in the case of running in continuous_mode */
	if (tep->negotiated_test_cycle_time == 0) {
		sleep(UINT_MAX);
		/* either sleep has elapsed, or sleep was interrupted by a signal */
		return err_code;
	}

	/*
	 * manage the test cycle
	 * will return after light is turned off
	 * (calling wait_light_off() inside of below
	 */
	run_ntttcp_throughput_management(tep);

	for (n = 0; n < threads_created; n++) {
		if (pthread_join(tep->threads[n], &p_retval) != 0) {
			PRINT_ERR("sender: error when pthread_join");
			continue;
		}
	}

	process_test_results(tep);
	print_test_results(tep);

	return err_code;
}