long run_lagscope_sender()

in src/main.c [38:315]


long run_lagscope_sender(struct lagscope_test_client *client)
{
	char *log = 0;
	bool verbose_log = false;
	struct lagscope_test_runtime *test_runtime;
	int sendbuff, recvbuff = 0;   //send buffer size
	char *buffer; //send buffer
	int msg_actual_size; //the buffer actual size = msg_size * sizeof(char)
	struct lagscope_test *test = client->test;
	int n = 0; //write n bytes to socket

	struct sockaddr_storage local_addr; //for local address
	socklen_t local_addr_size; //local address size, for getsockname(), to get local port
	char *ip_address_str; //used to get remote peer's ip address
	int ip_address_max_size;  //used to get remote peer's ip address
	char *port_str; //to get remote peer's port number for getaddrinfo()
	struct addrinfo hints, *serv_info, *p; //to get remote peer's sockaddr for connect()

	double send_time_us, recv_time_us = 0.0;
	double latency_ms = 0.0;
	int i = 0;
	int ret = 0; //hold function return value

	/* for ping statistics */
	unsigned long n_pings = 0; //number of pings
	double max_latency_ms = 0.0;
	double min_latency_ms = 60000000.0; //60 seconds = 60 micro-seconds * 1000 * 1000
	double sum_latency_ms = 0.0;

	int latencies_stats_err_check = 0;

	INIT_SOCKFD_VAR();

	verbose_log = test->verbose;
	test_runtime = new_test_runtime(test);

	ip_address_max_size = (test->domain == AF_INET? INET_ADDRSTRLEN : INET6_ADDRSTRLEN);
	if ((ip_address_str = (char *)malloc(ip_address_max_size)) == (char *)NULL) {
		PRINT_ERR("cannot allocate memory for ip address string");
		return 0;
	}

	/* get address of remote receiver */
	memset(&hints, 0, sizeof hints);
	hints.ai_family = test->domain;
	hints.ai_socktype = test->protocol;
	ASPRINTF(&port_str, "%d", test->server_port);
	if (getaddrinfo(test->bind_address, port_str, &hints, &serv_info) != 0) {
		PRINT_ERR("cannot get address info for receiver");
		free(ip_address_str);
		WSACLEAN();
		return 0;
	}
	free(port_str);

	/* only get the first entry of remote receiver to connect */
	for (p = serv_info; p != NULL; p = p->ai_next) {
		/* 1. create socket fd */
		if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) < 0) {
			PRINT_ERR("cannot create socket ednpoint");
			freeaddrinfo(serv_info);
			free(ip_address_str);
			WSACLEAN();
			return 0;
		}
		sendbuff = test->send_buf_size;
		if (setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &sendbuff, sizeof(sendbuff)) < 0) {
			ASPRINTF(&log, "cannot set socket send buffer size to: %d", sendbuff);
			PRINT_ERR_FREE(log);
			freeaddrinfo(serv_info);
			free(ip_address_str);
			WSACLEAN();
			return 0;
		}
		recvbuff = test->recv_buf_size;
		if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, (char *) &recvbuff, sizeof(recvbuff)) < 0) {
			ASPRINTF(&log, "cannot set socket receive buffer size to: %d", recvbuff);
			PRINT_ERR_FREE(log);
			freeaddrinfo(serv_info);
			free(ip_address_str);
			WSACLEAN();
			return 0;
		}

		/* 2. Set sender port = 0 to get suitable port number from system */
		memset(&local_addr, 0, sizeof(local_addr));
		if (test->domain == AF_INET) {
			(*(struct sockaddr_in*)&local_addr).sin_family = AF_INET;
			(*(struct sockaddr_in*)&local_addr).sin_port = htons(test->client_port);
		}
		else {
			(*(struct sockaddr_in6*)&local_addr).sin6_family = AF_INET6;
			(*(struct sockaddr_in6*)&local_addr).sin6_port = htons(test->client_port);
		}

		local_addr_size = sizeof(local_addr);
		if (( ret = bind(sockfd, (struct sockaddr *)&local_addr, local_addr_size)) < 0 ) {
			ASPRINTF(&log,
				"failed to bind socket[%d] to a local port: [%s:%d]. errno = %d. Ignored",
				sockfd,
				test->domain == AF_INET ? inet_ntoa((*(struct sockaddr_in*)&local_addr).sin_addr)
							: "::", //TODO - get the IPv6 addr string
				test->client_port,
				errno);
			PRINT_INFO_FREE(log);
		}

		/* 3. connect to receiver */
		ip_address_str = retrive_ip_address_str((struct sockaddr_storage *)p->ai_addr, ip_address_str, ip_address_max_size);
		if ((i = connect(sockfd, p->ai_addr, p->ai_addrlen)) < 0) {
			if (i == -1) {
				ASPRINTF(&log, "failed to connect to receiver: %s:%d on socket: %d. errno = %d", ip_address_str, test->server_port, sockfd, errno);
				PRINT_ERR_FREE(log);
			}
			else {
				ASPRINTF(&log, "failed to connect to receiver: %s:%d on socket: %d. error code = %d", ip_address_str, test->server_port, sockfd, i);
				PRINT_ERR_FREE(log);
			}
			freeaddrinfo(serv_info);
			free(ip_address_str);
			CLOSE(sockfd);
			WSACLEAN();
			return 0;
		}
		else {
			break; //connected
		}
	}

	/* get local TCP ephemeral port number assigned, for logging */
	if (getsockname(sockfd, (struct sockaddr *) &local_addr, &local_addr_size) != 0) {
		ASPRINTF(&log, "failed to get local address information for socket: %d", sockfd);
		PRINT_ERR_FREE(log);
	}

	ASPRINTF(&log, "New connection: local:%d [socket:%d] --> %s:%d",
			ntohs(test->domain == AF_INET?
					((struct sockaddr_in *)&local_addr)->sin_port:
					((struct sockaddr_in6 *)&local_addr)->sin6_port),
			sockfd,	ip_address_str, test->server_port);
	PRINT_INFO_FREE(log);

	freeaddrinfo(serv_info);

	msg_actual_size = test->msg_size * sizeof(char);
	if ((buffer = (char *)malloc(msg_actual_size)) == (char *)NULL) {
		PRINT_ERR("cannot allocate memory for send message");
		CLOSE(sockfd);
		WSACLEAN();
		return 0;
	}
	memset(buffer, 'A', msg_actual_size);

	/* Interop with latte.exe:
	 * Start test control byte of latte */
	buffer[0] = 0xd0;
	buffer[1] = 0x14;
	buffer[2] = 0x0;
	buffer[3] = 0x0;

	//begin ping test
	turn_on_light();
	if (test->test_mode == TIME_DURATION)
		run_test_timer(test->duration);

	test_runtime->start_time = time_in_usec();

	/* Interop with latte.exe:
	 * First send control byte */
	if (n_write_read(sockfd, buffer, msg_actual_size) < 0)
		goto finished;

	while (is_light_turned_on()) {
		/* Interop with latte.exe:
		 * latte needs iteration count in data */
		buffer[3] = (char)(n_pings >> 24);
		buffer[2] = (char)(n_pings >> 16);
		buffer[1] = (char)(n_pings >> 8);
		buffer[0] = (char)(n_pings /*>> 0*/);

		send_time_us = time_in_usec();

		if ((n = n_write_read(sockfd, buffer, msg_actual_size)) < 0)
			goto finished;

		recv_time_us = time_in_usec();

		latency_ms = recv_time_us - send_time_us;

		push(latency_ms);		// Push latency onto linked list

		ASPRINTF(&log, "Reply from %s: bytes=%d time=%.3fus",
				ip_address_str,
				n,
				latency_ms);
		PRINT_DBG_FREE(log);

		n_pings++;
		test_runtime->current_time = recv_time_us;
		test_runtime->ping_elapsed = n_pings;

		/* calculate max. avg. min. */
		sum_latency_ms += latency_ms;
		if (max_latency_ms < latency_ms)
			max_latency_ms = latency_ms;
		if (min_latency_ms > latency_ms)
			min_latency_ms = latency_ms;

		if (test->test_mode == PING_ITERATION)
			if (n_pings >= test->iteration)
				break;

		if (verbose_log == false)
			report_progress(test_runtime);

		if (test->interval !=0)
			SLEEP(test->interval); //sleep for ping interval, for example, 1 second
	}
	//SLEEP(60);
finished:
	PRINT_INFO("TEST COMPLETED.");

	/* print ping statistics */
	ASPRINTF(&log, "Ping statistics for %s:", ip_address_str);
	PRINT_INFO_FREE(log);
	ASPRINTF(&log, "\t  Number of successful Pings: %ld", n_pings);
	PRINT_INFO_FREE(log);
	if (n_pings > 0) {
		ASPRINTF(&log, "\t  Minimum = %.3fus, Maximum = %.3fus, Average = %.3fus",
			min_latency_ms,
			max_latency_ms,
			sum_latency_ms / n_pings);
		PRINT_INFO_FREE(log);
	}

	/* function call to dump latencies into a csv file */
	if(test->raw_dump) {
		ASPRINTF(&log, "Dumping all latencies into csv file: %s", test->csv_file_name);
		PRINT_INFO_FREE(log);
		create_latencies_csv(test->csv_file_name);
	}

	if (test->perc || test->hist) {
		latencies_stats_err_check = process_latencies(max_latency_ms);

		if (latencies_stats_err_check == NO_ERR) {
			/* function call to show percentiles */
			if(test->perc) {
				if(test->freq_table_dump) {
					ASPRINTF(&log, "Dumping latency frequency table into json file: %s", test->json_file_name);
					PRINT_INFO_FREE(log);
					create_freq_table_json((unsigned long) max_latency_ms, test->json_file_name);
				}

				show_percentile((unsigned long) max_latency_ms, n_pings);
			}

			/* function call to show histogram */
			if(test->hist) {
				show_histogram(test->hist_start, test->hist_len, test->hist_count, (unsigned long) max_latency_ms);
			}
		} else if (latencies_stats_err_check == ERROR_MEMORY_ALLOC) {
			PRINT_ERR("Memory allocation failed, aborting...");
		} else if (latencies_stats_err_check == ERROR_GENERAL) {
			PRINT_ERR("Interanl Error, aborting...");
		} else {
			PRINT_ERR("Unknown Error, aborting...");
		}
	}

	/* free resource */
	free(ip_address_str);
	free(buffer);
	latencies_stats_cleanup();
	CLOSE(sockfd);
	WSACLEAN();
	return n_pings;
}