int run_ntttcp_receiver()

in src/main.c [188:310]


int run_ntttcp_receiver(struct ntttcp_test_endpoint *tep)
{
	int err_code = NO_ERROR;
	struct ntttcp_test *test = tep->test;
	char *log = NULL;

	uint t, threads_created = 0;
	struct ntttcp_stream_server *ss;
	int rc;

	if (!check_is_ip_addr_valid_local(test->domain, test->bind_address)) {
		PRINT_ERR("cannot listen on the IP address specified");
		return ERROR_ARGS;
	}

	/* create test threads */
	for (t = 0; t < test->server_ports; t++) {
		ss = tep->server_streams[t];
		ss->server_port = test->server_base_port + t;

		if (test->protocol == TCP) {
			rc = pthread_create(&tep->threads[t], NULL, run_ntttcp_receiver_tcp_stream, (void *)ss);
		} else {
			rc = pthread_create(&tep->threads[t], NULL, run_ntttcp_receiver_udp_stream, (void *)ss);
		}

		if (rc) {
			PRINT_ERR("pthread_create() create thread failed");
			err_code = ERROR_PTHREAD_CREATE;
			continue;
		}
		threads_created++;
	}

	/* create synch thread; and put it to the end of the thread array */
	if (test->no_synch == false) {
		/*
		 * ss struct is not used in sync thread, because:
		 * we are only allowed to pass one param to the thread in pthread_create();
		 * but the information stored in ss, is not enough to be used for synch;
		 * so we pass *tep to the pthread_create().
		 * notes:
		 * 1) we will calculate the tcp port for synch stream in create_receiver_sync_socket().
		 *   the synch_port = base_port -1
		 * 2) we will assign the protocol for synch stream to TCP, always, in create_receiver_sync_socket()
		 */
		ss = tep->server_streams[test->server_ports];
		ss->server_port = test->server_base_port - 1;	 /* just for bookkeeping */
		ss->protocol = TCP;	 /* just for bookkeeping */
		ss->is_sync_thread = true;

		rc = pthread_create(&tep->threads[t], NULL, create_receiver_sync_socket, (void *)tep);
		if (rc) {
			PRINT_ERR("pthread_create() create thread failed");
			err_code = ERROR_PTHREAD_CREATE;
		} else {
			threads_created++;
		}
	}

	ASPRINTF(&log, "%d threads created", threads_created);
	PRINT_INFO_FREE(log);

	while (1) {
		/*
		 * for receiver, there are two ways to trigger test start:
		 * a) if synch enabled, then sync thread will trigger turn_on_light() after sync completed;
		 *	see create_receiver_sync_socket()
		 * b) if no synch enabled, then any tcp server accept client connections, the turn_on_light() will be triggered;
		 *	see ntttcp_server_epoll(), or ntttcp_server_select()
		 */
		wait_light_on();

		/*
		 * reset the counter?
		 * yes. we need to reset server side perf counters at the beginning, after light-is-on;
		 * this is to handle these cases when:
		 * a) receiver in sync mode, but sender connected as no_sync mode;
		 * in this case, before light-is-on, the threads have some data counted already.
		 * b) receiver is running in a loop; the previous test has finished but the sockets are still working and
		 * receiving data (data arrived with latency); so we need to reset the counter before a new test starting.
		 *
		 * this "reset" is implemented by using __sync_lock_test_and_set().
		 * reference: https://gcc.gnu.org/onlinedocs/gcc-4.4.7/gcc/Atomic-Builtins.html
		 */
		for (t = 0; t < threads_created; t++)
			/* discard the bytes received before test starting */
			(uint64_t) __sync_lock_test_and_set(&(tep->server_streams[t]->total_bytes_transferred), 0);

		/* in the case of running in continuous_mode */
		if (tep->negotiated_test_cycle_time == 0) {
			sleep(UINT_MAX);
			/* either sleep has elapsed, or sleep was interrupted by a signal */
			return err_code;
		}

		/*
		 * manage the test cycle
		 * will return after light is turned off
		 * (calling wait_light_off() inside of below function)
		 */
		run_ntttcp_throughput_management(tep);
		process_test_results(tep);
		print_test_results(tep);

		/* reset this variable, in case receiver is running as '-H' (receiver is running in loop) */
		tep->num_remote_endpoints = 0;
		for (t = 0; t < MAX_REMOTE_ENDPOINTS; t++)
			tep->remote_endpoints[t] = -1;

		if (tep->receiver_exit_after_done)
			break;
	}

	for (t = 0; t < threads_created; t++) {
		if (pthread_join(tep->threads[t], NULL) != 0) {
			PRINT_ERR("receiver: error when pthread_join");
			continue;
		}
	}

	return err_code;
}