tools/hci_throughput/throughput.py (166 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import time
import matplotlib.pyplot as plt
import csv
import struct
import argparse
import traceback
data_types = ['kb', 'kB']
def parse_arguments():
parser = argparse.ArgumentParser(
description='Plot throughput from the csv file.',
epilog='How to run script: \
python throughput.py -f tests/Wed_Apr_13_08:36:29_2022/tp_receiver.csv -s 0.1')
parser.add_argument('-f', '--file', type=str, nargs='*',
help='csv file path', default=["tp_receiver"])
parser.add_argument('-s', '--samp_t', type=float, nargs='*',
help='specify throughput sample time', default=1.0)
try:
args = parser.parse_args()
except Exception as e:
print(traceback.format_exc())
return args
def gen_data(num_of_bytes_in_packet: int,
last_number_from_previous_data_packet: int):
counter = last_number_from_previous_data_packet + 1
rem = num_of_bytes_in_packet % 4
valid_data_len = int((num_of_bytes_in_packet - rem) / 4)
total_data_len = valid_data_len + rem
data = [0] * total_data_len
for i in range(rem, total_data_len):
data[i] = counter
counter += 1
last_value = data[len(data) - 1]
if rem:
fmt = "<" + str(rem) + "B" + str(valid_data_len) + "I"
else:
fmt = "<" + str(valid_data_len) + "I"
data_ba = struct.pack(fmt, *data)
return data_ba, last_value
class Throughput():
def __init__(
self,
name="tp_chart",
mode="rx",
total_packets_number=0,
bytes_number_in_packet=0,
throughput_data_type='kb',
flag_plot_packets=True,
sample_time=1,
test_directory=None):
self.name = name
self.mode = mode
self.total_packets_number = total_packets_number
self.bytes_number_in_packet = bytes_number_in_packet
self.predef_packet_key = int(
(bytes_number_in_packet - (bytes_number_in_packet % 4)) / 4)
self.total_bits_number = bytes_number_in_packet * 8
assert throughput_data_type in data_types
self.throughput_data_type = throughput_data_type
self.flag_plot_packets = flag_plot_packets
self.sample_time = sample_time
self.test_directory = test_directory
if self.test_directory is not None:
self.csv_file_name = self.test_directory + "/" + \
time.strftime("%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
else:
self.csv_file_name = time.strftime(
"%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
self.clean_csv_file()
def calc_throughput(self, current_num, last_num, current_time, last_time):
if self.throughput_data_type == 'kb':
return float(
(((current_num - last_num) * self.total_bits_number) /
(current_time - last_time)) / 1000)
elif self.throughput_data_type == 'kB':
return float(
(((current_num - last_num) * self.bytes_number_in_packet) /
(current_time - last_time)) / 1000)
def clean_csv_file(self):
file = open(self.csv_file_name, 'w')
file.write("Time,Packet\n")
def append_to_csv_file(
self,
timestamp: float = 0.0,
packet_number: int = 0):
with open(self.csv_file_name, "a") as file:
csv_writer = csv.writer(file)
csv_writer.writerow([timestamp, packet_number])
def get_average(self, packet_numbers, timestamps):
if self.throughput_data_type == 'kb':
average_tp = ((packet_numbers * self.total_bits_number)
/ (timestamps[-1] - timestamps[0])) / 1000
elif self.throughput_data_type == 'kB':
average_tp = ((packet_numbers * self.bytes_number_in_packet)
/ (timestamps[-1] - timestamps[0])) / 1000
return average_tp
def save_average(self, tp_csv_filename=None):
if self.mode == "rx":
timestamps = []
packet_numbers = []
if tp_csv_filename is None:
tp_csv_filename = self.csv_file_name
else:
tp_csv_filename += ".csv"
with open(tp_csv_filename, "r") as file:
csv_reader = csv.reader(file)
next(csv_reader)
for row in csv_reader:
timestamps.append(float(row[0]))
packet_numbers.append(float(row[1]))
average_tp = self.get_average(packet_numbers[-1], timestamps)
print(
f"Average rx throughput: {round(average_tp, 3)} {self.throughput_data_type}ps")
with open(self.test_directory + "/average_rx_tp.csv", "a") as file:
csv_writer = csv.writer(file)
csv_writer.writerow([average_tp])
def plot_tp_from_file(self, filename: str = None, sample_time: float = 1,
save_to_file: bool = True):
timestamps = []
packet_numbers = []
if filename is None:
filename = self.csv_file_name
print("Results:", filename)
with open(filename, "r") as file:
csv_reader = csv.reader(file)
next(csv_reader)
for row in csv_reader:
timestamps.append(float(row[0]))
packet_numbers.append(float(row[1]))
last_time = 0
last_number = packet_numbers[0]
throughput = []
offset = timestamps[0]
for i in range(0, len(timestamps)):
timestamps[i] -= offset
if timestamps[i] - last_time > sample_time:
throughput.append((timestamps[i],
self.calc_throughput(packet_numbers[i],
last_number,
timestamps[i],
last_time)))
last_time = timestamps[i]
last_number = packet_numbers[i]
average_tp = self.get_average(packet_numbers[-1], timestamps)
fig, ax = plt.subplots()
if self.flag_plot_packets:
ax2 = ax.twinx()
ax.plot(*zip(*throughput), 'k-')
if self.flag_plot_packets:
ax2.plot(timestamps, packet_numbers, 'b-')
ax.set_title(self.name)
ax.set_ylabel(f"Throughput [{self.throughput_data_type}/s]")
ax.set_xlabel("Time [s]")
ax.text(0.9, 1.02, f"Average: {round(average_tp, 3)}"
f"{self.throughput_data_type}ps", transform=ax.transAxes,
color='k')
if self.flag_plot_packets:
ax2 = ax2.set_ylabel(f"Packets [Max:{len(packet_numbers)}]",
color='b')
if save_to_file:
path = filename.replace(".csv", ".png")
plt.savefig(path)
plt.show(block=True)
if __name__ == "__main__":
args = parse_arguments()
tp = Throughput(bytes_number_in_packet=247)
tp.plot_tp_from_file(*args.file, args.samp_t[0], save_to_file=False)