tools/hci_throughput/hci_device.py (271 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import hci
import hci_commands
import logging
import argparse
import sys
import asyncio
import struct
import throughput as tp
import traceback
import yaml
import util
import transport_factory
import signal
show_tp_plots = False
test_dir = None
transport_directory = None
class ParentCalledException(KeyboardInterrupt):
""" This exception is raised when e.g. parent process sends signal.
This allows to terminate processes correctly. """
pass
def parse_arguments():
parser = argparse.ArgumentParser(
description='HCI device with User Channel Socket. \
Start a device according to predefined mode (receiver/transmitter). \
The initialization of the device is based on received parameters \
or predefined init.yaml and config.yaml files.\
The tx device will try to connect to rx device and send data. \
After completion the throughput plots will pop up. ',
epilog='How to run the python script: \
sudo python hci_device.py -m rx -oa 00:00:00:00:00:00 -oat 0 -di 0 \
-pa 00:00:00:00:00:00 -pat 0 -pdi 0 -cf config.yaml\
or, if present, specifying init.yaml file \
sudo python main.py -m tx -if init.yaml')
parser.add_argument('-m', '--mode', type=str, nargs="?",
help='device mode - receiver, transmitter',
choices=['rx', 'tx'])
parser.add_argument('-if', '--init_file', type=str, nargs="?",
help='yaml init file, e.g.: -f init.yaml',
default="init.yaml")
parser.add_argument('-oa', '--own_addr', type=str, nargs="?",
help='device own address, e.g.: -oa 00:00:00:00:00:00')
parser.add_argument('-oat', '--own_addr_type', type=int, nargs="?",
help='device own address type, public e.g.: -oat 0')
parser.add_argument('-di', '--dev_idx', type=str, nargs="?",
help='device own hci index, hci0 e.g.: -ohi 0')
parser.add_argument(
'-pa', '--peer_addr', type=str, nargs="?",
help='peer device address, e.g.: -pa 00:00:00:00:00:00')
parser.add_argument(
'-pat', '--peer_addr_type', type=int, nargs="?",
help='peer device own address type, public e.g.: -pat 0')
parser.add_argument('-pdi', '--peer_dev_idx', type=str, nargs="?",
help='peer device index, e.g. hci0: -phi 0')
parser.add_argument('-cf', '--config_file', type=str, nargs="?",
help='yaml config file, e.g.: -f config.yaml',
default="config.yaml")
try:
args = parser.parse_args()
return args
except Exception as e:
logging.error(traceback.format_exc())
sys.exit()
async def set_phy(bt_dev: hci_commands.HCI_Commands, conn_handle, cfg_phy,
supported_features):
def error(info):
print("ERROR: Check log files")
raise Exception(info, ": Unsupported PHY. Closing...")
PHY_2M = supported_features & hci.LE_FEATURE_2M_PHY
PHY_CODED = supported_features & hci.LE_FEATURE_CODED_PHY
if (cfg_phy == "1M"):
await bt_dev.cmd_le_set_phy(conn_handle, all_phys=0, tx_phys=1, rx_phys=1, phy_options=0)
logging.info(f"PHY 1M")
elif (cfg_phy == "2M"):
if (PHY_2M):
await bt_dev.cmd_le_set_phy(conn_handle, all_phys=0, tx_phys=2, rx_phys=2, phy_options=0)
logging.info(f"PHY 2M")
else:
error("2M")
elif (cfg_phy == "Coded"):
if (PHY_CODED):
await bt_dev.cmd_le_set_phy(conn_handle, all_phys=0, tx_phys=3, rx_phys=3, phy_options=0)
logging.info(f"PHY Coded")
else:
error("Coded")
else:
error("Possible PHY in config.yaml: 1M, 2M, Coded")
async def init(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict):
""" init: Assumed to be the same for all devices """
asyncio.create_task(bt_dev.rx_buffer_q_wait())
await bt_dev.cmd_reset()
if ini["own_address_type"]:
await bt_dev.cmd_le_set_random_addr(ini["own_address"])
await bt_dev.cmd_set_event_mask(mask=0x200080000204e090)
await bt_dev.cmd_le_set_event_mask(mask=0x00000007FFFFFFFF)
await bt_dev.cmd_le_read_local_supported_features()
await bt_dev.cmd_le_read_buffer_size()
await bt_dev.cmd_le_read_max_data_len()
async def finish(bt_dev: hci_commands.HCI_Commands, cfg: dict):
logging.info("Received %s good packets", bt_dev.valid_recv_data)
if bt_dev.tp:
if show_tp_plots:
bt_dev.tp.plot_tp_from_file(sample_time=cfg["tp"]["sample_time"])
if bt_dev.device_mode == "rx":
bt_dev.tp.save_average()
util.copy_log_files_to_test_directory(test_dir)
logging.info(f"Correctly received: {bt_dev.valid_recv_data}")
logging.info(f"Sent packets: {bt_dev.sent_packets_counter}")
bt_dev.async_ev_rx_wait_finish.set()
# Wait for rx_buffer_q_wait task to finish and socket to close
await asyncio.sleep(1)
async def async_main_rx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict):
await init(bt_dev, ini, cfg)
bt_dev.tp = tp.Throughput(name="tp_receiver", mode=bt_dev.device_mode,
total_packets_number=hci.num_of_packets_to_send,
bytes_number_in_packet=hci.num_of_bytes_to_send,
throughput_data_type=cfg["tp"]["data_type"],
flag_plot_packets=cfg["tp"]["flag_plot_packets"],
sample_time=cfg["tp"]["sample_time"],
test_directory=test_dir)
############
# ADVERTISE
############
adv_params = hci.HCI_Advertising()
adv_params.set(
advertising_interval_min=cfg["adv"]["advertising_interval_min"],
advertising_interval_max=cfg["adv"]["advertising_interval_max"],
advertising_type=cfg["adv"]["advertising_type"],
own_address_type=ini["own_address_type"],
peer_address_type=ini["peer_address_type"],
peer_address=cfg["adv"]["peer_address"],
advertising_channel_map=cfg["adv"]["advertising_channel_map"],
advertising_filter_policy=cfg["adv"]["advertising_filter_policy"]
)
await bt_dev.cmd_le_set_advertising_params(adv_params)
await bt_dev.cmd_le_set_advertising_enable(1)
await hci_commands.wait_for_event(bt_dev.async_ev_connected, hci.WAIT_FOR_EVENT_CONN_TIMEOUT)
await bt_dev.cmd_le_set_data_len(hci.conn_handle, tx_octets=0, tx_time=0)
await hci_commands.wait_for_event(bt_dev.async_ev_set_data_len, hci.WAIT_FOR_EVENT_TIMEOUT)
logging.debug("Before finish event")
await asyncio.shield(bt_dev.async_ev_recv_data_finish.wait())
logging.debug("after finish event")
bt_dev.async_ev_recv_data_finish.clear()
await bt_dev.cmd_le_set_advertising_enable(0)
await finish(bt_dev, cfg)
async def async_main_tx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict):
await init(bt_dev, ini, cfg)
conn_params = hci.HCI_Connect()
conn_params.set(
le_scan_interval=cfg["conn"]["le_scan_interval"],
le_scan_window=cfg["conn"]["le_scan_window"],
initiator_filter_policy=cfg["conn"]["initiator_filter_policy"],
peer_address_type=ini['peer_address_type'],
peer_address=ini['peer_address'],
own_address_type=ini['own_address_type'],
connection_interval_min=cfg["conn"]["connection_interval_min"],
connection_interval_max=cfg["conn"]["connection_interval_max"],
max_latency=cfg["conn"]["max_latency"],
supervision_timeout=cfg["conn"]["supervision_timeout"],
min_ce_length=cfg["conn"]["min_ce_length"],
max_ce_length=cfg["conn"]["max_ce_length"]
)
await bt_dev.cmd_le_create_connection(conn_params)
await hci_commands.wait_for_event(bt_dev.async_ev_connected, hci.WAIT_FOR_EVENT_CONN_TIMEOUT)
await bt_dev.cmd_le_set_data_len(hci.conn_handle, tx_octets=0, tx_time=0)
await hci_commands.wait_for_event(bt_dev.async_ev_set_data_len, hci.WAIT_FOR_EVENT_TIMEOUT)
await set_phy(bt_dev, hci.conn_handle, cfg['phy'],
hci.le_read_local_supported_features.le_features)
await hci_commands.wait_for_event(bt_dev.async_ev_update_phy, hci.WAIT_FOR_EVENT_TIMEOUT)
if cfg["enable_encryption"]:
await bt_dev.cmd_le_enable_encryption(hci.conn_handle, random_number=0, ediv=0, ltk=hci.ltk)
await hci_commands.wait_for_event(bt_dev.async_ev_encryption_change, 10)
############
# L2CAP SEND
############
l2cap_data = hci.L2CAP_Data_Send()
acl_data = hci.HCI_ACL_Data_Send()
packets_to_send = hci.num_of_packets_to_send
packet_credits = hci.le_read_buffer_size.total_num_le_acl_data_packets
fmt = "<" + str(hci.num_of_bytes_to_send) + "B"
data = struct.pack(fmt, *([0] * hci.num_of_bytes_to_send))
last_value = 0
sent_packets = 0
tx_sent_timestamps = []
bt_dev.tp = tp.Throughput(name="tp_transmitter", mode=bt_dev.device_mode,
total_packets_number=hci.num_of_packets_to_send,
bytes_number_in_packet=hci.num_of_bytes_to_send,
throughput_data_type=cfg["tp"]["data_type"],
flag_plot_packets=cfg["tp"]["flag_plot_packets"],
sample_time=cfg["tp"]["sample_time"],
test_directory=test_dir)
async with bt_dev.async_lock_packets_cnt:
hci.num_of_completed_packets_cnt = 0
while sent_packets < hci.num_of_packets_to_send:
if packet_credits > 0 and packets_to_send > 0:
data, last_value = tp.gen_data(
hci.num_of_bytes_to_send, last_value)
l2cap_data.set(channel_id=0x0044, data=data)
acl_data.set(connection_handle=hci.conn_handle, pb_flag=0b00,
bc_flag=0b00, data=l2cap_data.ba_full_message)
await bt_dev.acl_data_send(acl_data)
async with bt_dev.async_lock_packets_cnt:
packets_to_send -= 1
packet_credits -= 1
else:
logging.info(f"Waiting for num_of_cmp_packets event")
await bt_dev.async_ev_num_cmp_pckts.wait()
bt_dev.async_ev_num_cmp_pckts.clear()
if hci.num_of_completed_packets_cnt > 0:
async with bt_dev.async_lock_packets_cnt:
sent_packets += hci.num_of_completed_packets_cnt
tx_sent_timestamps.append((hci.num_of_completed_packets_time,
sent_packets))
logging.info(f"Sent : {sent_packets}")
packet_credits += hci.num_of_completed_packets_cnt
hci.num_of_completed_packets_cnt = 0
for timestamp in tx_sent_timestamps:
bt_dev.tp.append_to_csv_file(*timestamp)
await finish(bt_dev, cfg)
def parse_cfg_files(args) -> dict:
if args.init_file is None:
ini = {
"own_address": args.own_addr,
"own_address_type": args.own_addr_type,
"dev_index": args.dev_idx,
"peer_address": args.peer_addr,
"peer_address_type": args.peer_addr_type,
"peer_dev_index": args.peer_dev_idx
}
else:
with open(args.init_file, "r") as file:
init_file = yaml.safe_load(file)
ini = init_file[args.mode]
global test_dir, transport_directory, ltk
test_dir = init_file["test_dir"]
transport_directory = init_file["transport_directory"]
hci.ltk = int(init_file["ltk"], 16)
with open(args.config_file) as f:
cfg = yaml.safe_load(f)
global show_tp_plots
hci.num_of_bytes_to_send = cfg["num_of_bytes_to_send"]
hci.num_of_packets_to_send = cfg["num_of_packets_to_send"]
show_tp_plots = cfg["show_tp_plots"]
return ini, cfg
def signal_handler(signum, frame):
logging.critical(f"Received signal: {signal.Signals(signum).name}")
raise ParentCalledException(
f"Received signal: {signal.Signals(signum).name}")
def main():
args = parse_arguments()
ini, cfg = parse_cfg_files(args)
log_path = f"log/log_{args.mode}.log"
transport = None
try:
util.configure_logging(log_path, clear_log_file=True)
loop = asyncio.get_event_loop()
loop.set_debug(True)
transport = transport_factory.TransportFactory(
device_index=ini['dev_index'],
device_mode=args.mode, asyncio_loop=loop,
transport_directory=transport_directory)
signal.signal(signal.SIGTERM, signal_handler)
bt_dev = hci_commands.HCI_Commands(send=transport.send,
rx_buffer_q=transport.rx_buffer_q,
asyncio_loop=loop,
device_mode=args.mode)
transport.start()
if args.mode == 'rx':
loop.run_until_complete(async_main_rx(bt_dev, ini, cfg))
elif args.mode == 'tx':
loop.run_until_complete(async_main_tx(bt_dev, ini, cfg))
except Exception as e:
logging.error(traceback.format_exc())
except (KeyboardInterrupt or ParentCalledException):
logging.critical("Hard exit triggered.")
logging.error(traceback.format_exc())
finally:
if transport is not None:
transport.stop()
sys.exit()
if __name__ == '__main__':
main()