tools/hci_throughput/main.py (201 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import multiprocessing import check_addr import argparse import yaml import sys import subprocess import traceback import matplotlib.pyplot as plt import csv import util import os import math import random PROCESS_TIMEOUT = 500 # seconds, adjust if necessary def parse_arguments(): parser = argparse.ArgumentParser( description='App for measuring BLE throughput over ACL.', epilog='How to run python script for hci0 -> rx and hci1 -> tx: \ sudo python main.py -i 0 1 -m rx tx \ -t path/to/custom_transport_directory -cf config.yaml') parser.add_argument('-i', '--indexes', type=str, nargs='*', help='specify adapters indexes', default=[0, 1]) parser.add_argument('-m', '--modes', type=str, nargs="*", help='devices modes - receiver, transmitter', choices=['rx', 'tx'], default=['rx', 'tx']) parser.add_argument('-t', '--transport_directory', type=str, nargs='*', help='specify hci transport directory path. \ Use for transport other than the default linux socket.', default=["default"]) parser.add_argument('-cf', '--config_file', type=str, nargs="*", help='configuration file for devices', default=["config.yaml"]) try: args = parser.parse_args() except Exception as e: print(traceback.format_exc()) print(f"Indexes: {args.indexes}") print(f"Modes: {args.modes}") print(f"Transport directory: {args.transport_directory}") return args def get_dev_addr_and_type(hci_indexes: list, transport_directory: str): if (len(hci_indexes) != 2): raise Exception("HCI index error.") manager = multiprocessing.Manager() addr_list = manager.list() check_addrs_proc = multiprocessing.Process(target=check_addr.check_addr, name="Check addresses", args=(hci_indexes, addr_list, transport_directory)) check_addrs_proc.start() print("check_addrs_proc pid: ", check_addrs_proc.pid) check_addrs_proc.join() dev_addr_type_list = [] for i in range(0, len(addr_list)): dev_addr_type_list.append((hci_indexes[i],) + addr_list[i]) return dev_addr_type_list def change_config_var(filename: str, group: str, variable: str, new_value: int): with open(filename, "r") as file: cfg = yaml.safe_load(file) if group: cfg[group][variable] = new_value else: cfg[variable] = new_value with open(filename, "w") as file: yaml.safe_dump(cfg, file, indent=1, sort_keys=False, default_style=None, default_flow_style=False) def generate_long_term_key(): rand_val = random.getrandbits(128) return rand_val.to_bytes(16, byteorder='little') def get_init_dict(filename: str, args_list: list, modes: list, dir: str, transport_directory: str): ini = { modes[0]: { "dev_index": args_list[0][0], "own_address_type": args_list[0][1], "own_address": args_list[0][2], "peer_dev_index": args_list[1][0], "peer_address_type": args_list[1][1], "peer_address": args_list[1][2] }, modes[1]: { "dev_index": args_list[1][0], "own_address_type": args_list[1][1], "own_address": args_list[1][2], "peer_dev_index": args_list[0][0], "peer_address_type": args_list[0][1], "peer_address": args_list[0][2] }, "test_dir": dir, "transport_directory": transport_directory, "ltk": hex(random.getrandbits(128)) } with open(filename, 'w') as file: yaml.safe_dump(ini, file, indent=1, sort_keys=False) return ini def run_once(modes: list, cfg_file: str, init_file: str): list_proc = [] for mode in modes: proc = subprocess.Popen(["python", "hci_device.py", "-m", mode, "-if", init_file, "-cf", cfg_file]) print("start subprocess pid: ", proc.pid) list_proc.append(proc) try: for proc in list_proc: proc.wait(PROCESS_TIMEOUT) except subprocess.TimeoutExpired: for proc in list_proc: print("TimeoutExpired subprocess pid: ", proc.pid) proc.terminate() for proc in list_proc: proc.wait() return -1 for proc in list_proc: print("stop subprocess pid: ", proc.pid) proc.terminate() proc.wait() return 0 def testing_variable_influence(cfg: dict, modes: list, cfg_file: str, init_file: str, init_dict: dict, save_to_file: bool): tp_test_counter = 1 changed_params_list = [] averages = [] cfg_group = cfg["test"]["change_param_group"] cfg_variable = cfg["test"]["change_param_variable"] cfg_start_val = cfg["test"]["start_value"] cfg_stop_val = cfg["test"]["stop_value"] cfg_step = cfg["test"]["step"] data_type = cfg["tp"]["data_type"] total_iterations = math.ceil((cfg_stop_val - cfg_start_val) / cfg_step) average_tp_csv_path = init_dict["test_dir"] + "/average_rx_tp.csv" with open(average_tp_csv_path, "w") as file: file.write(f"Average throughput [{data_type}ps]\n") for i in range(cfg_start_val, cfg_stop_val, cfg_step): changed_params_list.append(i) if cfg_group and cfg_variable: print(f"Current param value: {i}") num_of_params_to_change = len(cfg_variable) for j in range(0, num_of_params_to_change): change_config_var(filename=cfg_file, group=cfg_group[j], variable=cfg_variable[j], new_value=i) print(f"Running test: {tp_test_counter}/{total_iterations}...") rc = run_once(modes, cfg_file, init_file) if rc != 0: print(f"Test {i} failed. Closing...") return tp_test_counter += 1 with open(average_tp_csv_path, "r") as file: csv_reader = csv.reader(file) next(csv_reader) for row in csv_reader: averages.append(float(*row)) fig, ax = plt.subplots() ax.plot(changed_params_list[:len(averages)], averages, '-k') ax.set_ylabel(f"Average throughput [{data_type}/s]") ax.set_xlabel("Changed parameter/next iteration") ax.set_title("Average througput") if save_to_file: name = init_dict["test_dir"] + "/average_tps" plt.savefig(fname=name, format='png') plt.show(block=True) def main(): args = parse_arguments() init_file = "init.yaml" cfg_file = args.config_file[0] if (isinstance(args.transport_directory, list)): args.transport_directory = args.transport_directory.pop() else: args.transport_directory = args.transport_directory with open(cfg_file, "r") as file: cfg = yaml.safe_load(file) addr_list = get_dev_addr_and_type(args.indexes, args.transport_directory) if len(addr_list) != len(args.indexes): raise Exception("No device address received. Check HCI indexes.") print(f"Received: {addr_list}") test_dir_path = util.create_test_directory() init_dict = get_init_dict(filename=init_file, args_list=addr_list, modes=args.modes, dir=test_dir_path, transport_directory=args.transport_directory) util.copy_config_files_to_test_directory([init_file, cfg_file], init_dict["test_dir"]) try: if cfg["flag_testing"]: testing_variable_influence(cfg, args.modes, *args.config_file, init_file, init_dict, True) else: print(f"Running test...") rc = run_once(args.modes, cfg_file, init_file) if rc != 0: print("Test failed.") print("Finished. Closing...") except KeyboardInterrupt: pass except Exception as e: print(traceback.format_exc()) finally: # Set default ownership for dirs and files util.set_default_chmod_recurs(os.getcwd() + "/tests") sys.exit() if __name__ == "__main__": main()