tools/hci_throughput/hci.py (565 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from dataclasses import dataclass import struct from binascii import unhexlify import random import ctypes ############ # DEFINES ############ AF_BLUETOOTH = 31 HCI_CHANNEL_USER = 1 HCI_COMMAND_PACKET = 0x01 HCI_ACL_DATA_PACKET = 0x02 HCI_EVENT_PACKET = 0x04 L2CAP_HDR_BYTES = 4 HCI_EV_CODE_DISCONN_CMP = 0x05 HCI_EV_CODE_ENCRYPTION_CHANGE = 0x08 HCI_EV_CODE_CMD_CMP = 0x0e HCI_EV_CODE_CMD_STATUS = 0x0f HCI_EV_CODE_LE_META_EVENT = 0x3e HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP = 0x0a HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE = 0x07 HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP = 0x0c HCI_SUBEV_CODE_LE_CHAN_SEL_ALG = 0x14 HCI_SUBEV_CODE_LE_LONG_TERM_KEY_REQUEST = 0x05 HCI_EV_NUM_COMP_PKTS = 0x13 CONN_FAILED_TO_BE_ESTABLISHED = 0x3e CONN_TIMEOUT = 0x08 OGF_HOST_CTL = 0x03 OCF_SET_EVENT_MASK = 0x0001 OCF_RESET = 0X0003 OGF_INFO_PARAM = 0x04 OCF_READ_LOCAL_COMMANDS = 0x0002 OCF_READ_BD_ADDR = 0x0009 OGF_LE_CTL = 0x08 OCF_LE_SET_EVENT_MASK = 0x0001 OCF_LE_READ_BUFFER_SIZE_V1 = 0x0002 OCF_LE_READ_LOCAL_SUPPORTED_FEATURES = 0x0003 OCF_LE_READ_BUFFER_SIZE_V2 = 0x0060 OCF_LE_SET_RANDOM_ADDRESS = 0x0005 OCF_LE_SET_ADVERTISING_PARAMETERS = 0x0006 OCF_LE_SET_ADVERTISE_ENABLE = 0x000a OCF_LE_SET_SCAN_PARAMETERS = 0x000b OCF_LE_SET_SCAN_ENABLE = 0x000c OCF_LE_CREATE_CONN = 0x000d OCF_LE_ENABLE_ENCRYPTION = 0x0019 OCF_LE_LONG_TERM_KEY_REQUEST_REPLY = 0x001A OCF_LE_SET_DATA_LEN = 0x0022 OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN = 0x0023 OCF_LE_READ_MAX_DATA_LEN = 0x002f OCF_LE_READ_PHY = 0x0030 OCF_LE_SET_DFLT_PHY = 0x0031 OCF_LE_SET_PHY = 0x0032 OGF_VENDOR_SPECIFIC = 0x003f BLE_HCI_OCF_VS_RD_STATIC_ADDR = 0x0001 PUBLIC_ADDRESS_TYPE = 0 STATIC_RANDOM_ADDRESS_TYPE = 1 WAIT_FOR_EVENT_TIMEOUT = 5 WAIT_FOR_EVENT_CONN_TIMEOUT = 25 LE_FEATURE_2M_PHY = ctypes.c_uint64(0x0100).value LE_FEATURE_CODED_PHY = ctypes.c_uint64(0x0800).value ############ # GLOBAL VAR ############ num_of_bytes_to_send = None # based on supported_max_tx_octets num_of_packets_to_send = None events_list = [] bdaddr = '00:00:00:00:00:00' static_addr = '00:00:00:00:00:00' le_read_buffer_size = None conn_handle = 0 requested_tx_octets = 1 requested_tx_time = 1 suggested_dflt_data_len = None max_data_len = None phy = None ev_num_comp_pkts = None num_of_completed_packets_cnt = 0 num_of_completed_packets_time = 0 read_local_commands = None le_read_local_supported_features = None ltk = None ############ # FUNCTIONS ############ def get_opcode(ogf: int, ocf: int): return ((ocf & 0x03ff) | (ogf << 10)) def get_ogf_ocf(opcode: int): ogf = opcode >> 10 ocf = opcode & 0x03ff return ogf, ocf def cmd_addr_to_ba(addr_str: str): return unhexlify("".join(addr_str.split(':')))[::-1] def ba_addr_to_str(addr_ba: bytearray): addr_str = addr_ba.hex().upper() return ':'.join(addr_str[i:i + 2] for i in range(len(addr_str), -2, -2))[1:] def gen_static_rand_addr(): while True: x = [random.randint(0, 1) for _ in range(0, 48)] if 0 in x[:-2] and 1 in x[:-2]: x[0] = 1 x[1] = 1 break addr_int = int("".join([str(x[i]) for i in range(0, len(x))]), 2) addr_hex = "{0:0{1}x}".format(addr_int, 12) addr = ":".join(addr_hex[i:i + 2] for i in range(0, len(addr_hex), 2)) return addr.upper() ############ # GLOBAL VAR CLASSES ############ @dataclass class Suggested_Dflt_Data_Length(): status: int suggested_max_tx_octets: int suggested_max_tx_time: int def __init__(self): self.set() def set( self, status=0, suggested_max_tx_octets=0, suggested_max_tx_time=0): self.status = status self.suggested_max_tx_octets = suggested_max_tx_octets self.suggested_max_tx_time = suggested_max_tx_time @dataclass class Max_Data_Length(): status: int supported_max_tx_octets: int supported_max_tx_time: int supported_max_rx_octets: int supported_max_rx_time: int def __init__(self): self.set() def set(self, status=0, supported_max_tx_octets=0, supported_max_tx_time=0, supported_max_rx_octets=0, supported_max_rx_time=0): self.status = status self.supported_max_tx_octets = supported_max_tx_octets self.supported_max_tx_time = supported_max_tx_time self.supported_max_rx_octets = supported_max_rx_octets self.supported_max_rx_time = supported_max_rx_time @dataclass class LE_Read_Buffer_Size: status: int le_acl_data_packet_length: int total_num_le_acl_data_packets: int iso_data_packet_len: int total_num_iso_data_packets: int def __init__(self): self.set() def set(self, status=0, le_acl_data_packet_length=0, total_num_le_acl_data_packets=0, iso_data_packet_len=0, total_num_iso_data_packets=0): self.status = status self.le_acl_data_packet_length = le_acl_data_packet_length self.total_num_le_acl_data_packets = total_num_le_acl_data_packets self.iso_data_packet_len = iso_data_packet_len self.total_num_iso_data_packets = total_num_iso_data_packets @dataclass class LE_Read_PHY: status: int connection_handle: int tx_phy: int rx_phy: int def __init__(self): self.set() def set(self, status=0, connection_handle=0, tx_phy=0, rx_phy=0): self.status = status self.connection_handle = connection_handle self.tx_phy = tx_phy self.rx_phy = rx_phy @dataclass class Read_Local_Commands: status: int supported_commands: bytes def __init__(self): self.set() def set(self, rcv_bytes=bytes(65)): self.status = int(rcv_bytes[0]) self.supported_commands = rcv_bytes[1:] @dataclass class LE_Read_Local_Supported_Features: status: int le_features: bytes def __init__(self): self.set() def set(self, rcv_bytes=bytes(9)): self.status = int(rcv_bytes[0]) self.le_features = ctypes.c_uint64.from_buffer_copy( rcv_bytes[1:]).value ############ # EVENTS ############ @dataclass class HCI_Ev_Disconn_Complete: status: int connection_handle: int reason: int def __init__(self): self.set() def set(self, status=0, connection_handle=0, reason=0): self.status = status self.connection_handle = connection_handle self.reason = reason @dataclass class HCI_Ev_Cmd_Complete: num_hci_command_packets: int opcode: int return_parameters: int def __init__(self): self.set() def set(self, num_hci_cmd_packets=0, opcode=0, return_parameters=b''): self.num_hci_command_packets = num_hci_cmd_packets self.opcode = opcode self.return_parameters = return_parameters @dataclass class HCI_Ev_Cmd_Status: status: int num_hci_command_packets: int opcode: int def __init__(self): self.set() def set(self, status=0, num_hci_cmd_packets=0, opcode=0): self.status = status self.num_hci_command_packets = num_hci_cmd_packets self.opcode = opcode @dataclass class HCI_Ev_LE_Encryption_Change(): status: int connection_handle: int encryption_enabled: int def __init__(self): self.set() def set(self, status=0, connection_handle=0, encryption_enabled=0): self.status = status self.connection_handle = connection_handle self.encryption_enabled = encryption_enabled @dataclass class HCI_Ev_LE_Meta: subevent_code: int def __init__(self): self.set() def set(self, subevent_code=0): self.subevent_code = subevent_code @dataclass class HCI_Ev_LE_Enhanced_Connection_Complete(HCI_Ev_LE_Meta): status: int connection_handle: int role: int peer_address_type: int peer_address: str local_resolvable_private_address: int peer_resolvable_private_address: int connection_interval: int peripheral_latency: int supervision_timeout: int central_clock_accuracy: int def __init__(self): self.set() def set(self, subevent_code=0, status=0, connection_handle=0, role=0, peer_address_type=0, peer_address='00:00:00:00:00:00', local_resolvable_private_address='00:00:00:00:00:00', peer_resolvable_private_address='00:00:00:00:00:00', connection_interval=0, peripheral_latency=0, supervision_timeout=0, central_clock_accuracy=0): super().set(subevent_code) self.status = status self.connection_handle = connection_handle self.role = role self.peer_address_type = peer_address_type self.peer_address = peer_address self.local_resolvable_private_address = local_resolvable_private_address self.peer_resolvable_private_address = peer_resolvable_private_address self.connection_interval = connection_interval self.peripheral_latency = peripheral_latency self.supervision_timeout = supervision_timeout self.central_clock_accuracy = central_clock_accuracy @dataclass class HCI_Ev_LE_Data_Length_Change(HCI_Ev_LE_Meta): conn_handle: int max_tx_octets: int max_tx_time: int max_rx_octets: int max_rx_time: int triggered: int def __init__(self): self.set() def set(self, subevent_code=0, conn_handle=0, max_tx_octets=0, max_tx_time=0, max_rx_octets=0, max_rx_time=0, triggered=0): super().set(subevent_code) self.conn_handle = conn_handle self.max_tx_octets = max_tx_octets self.max_tx_time = max_tx_time self.max_rx_octets = max_rx_octets self.max_rx_time = max_rx_time self.triggered = triggered @dataclass class HCI_Ev_LE_Long_Term_Key_Request(HCI_Ev_LE_Meta): conn_handle: int random_number: int encrypted_diversifier: int def __init__(self): self.set() def set(self, subevent_code=0, conn_handle=0, random_number=0, encrypted_diversifier=0): super().set(subevent_code) self.conn_handle = conn_handle self.random_number = random_number self.encrypted_diversifier = encrypted_diversifier @dataclass class HCI_Ev_LE_PHY_Update_Complete(HCI_Ev_LE_Meta): status: int connection_handle: int tx_phy: int rx_phy: int def __init__(self): self.set() def set(self, subevent_code=0, status=0, connection_handle=0, tx_phy=0, rx_phy=0): super().set(subevent_code) self.status = status self.connection_handle = connection_handle self.tx_phy = tx_phy self.rx_phy = rx_phy @dataclass class HCI_Number_Of_Completed_Packets: num_handles: int connection_handle: int num_completed_packets: int def __init__(self): self.set() def set(self, num_handles=0, connection_handle=0, num_completed_packets=0): self.num_handles = num_handles self.connection_handle = connection_handle self.num_completed_packets = num_completed_packets class HCI_Ev_LE_Chan_Sel_Alg(HCI_Ev_LE_Meta): connection_handle: int algorithm: int def __init__(self): self.set() def set(self, subevent_code=0, connection_handle=0, algorithm=0): super().set(subevent_code) self.connection_handle = connection_handle self.algorithm = algorithm ############ # PARAMETERS ############ @dataclass class HCI_Advertising: advertising_interval_min: int advertising_interval_max: int advertising_type: int own_address_type: int peer_address_type: int peer_address: str advertising_channel_map: int advertising_filter_policy: int ba_full_message: bytearray def __init__(self): self.set() def set(self, advertising_interval_min=0, advertising_interval_max=0, advertising_type=0, own_address_type=0, peer_address_type=0, peer_address='00:00:00:00:00:00', advertising_channel_map=0, advertising_filter_policy=0): self.advertising_interval_min = advertising_interval_min self.advertising_interval_max = advertising_interval_max self.advertising_type = advertising_type self.own_address_type = own_address_type self.peer_address_type = peer_address_type self.peer_address = peer_address self.advertising_channel_map = advertising_channel_map self.advertising_filter_policy = advertising_filter_policy self.ba_full_message = bytearray( struct.pack( '<HHBBBBB', advertising_interval_min, advertising_interval_max, advertising_type, own_address_type, peer_address_type, advertising_channel_map, advertising_filter_policy)) peer_addr_ba = cmd_addr_to_ba(peer_address) self.ba_full_message[7:7] = peer_addr_ba @dataclass class HCI_Scan: le_scan_type: int le_scan_interval: int le_scan_window: int own_address_type: int scanning_filter_policy: int ba_full_message: bytearray def __init__(self): self.set() def set(self, le_scan_type=0, le_scan_interval=0, le_scan_window=0, own_address_type=0, scanning_filter_policy=0): self.le_scan_type = le_scan_type self.le_scan_interval = le_scan_interval self.le_scan_window = le_scan_window self.own_address_type = own_address_type self.scanning_filter_policy = scanning_filter_policy self.ba_full_message = bytearray( struct.pack( '<BHHBB', le_scan_type, le_scan_interval, le_scan_window, own_address_type, scanning_filter_policy)) @dataclass class HCI_Connect: le_scan_interval: int le_scan_window: int initiator_filter_policy: int peer_address_type: int peer_address: str own_address_type: int connection_interval_min: int connection_interval_max: int max_latency: int supervision_timeout: int min_ce_length: int max_ce_length: int ba_full_message: bytearray def __init__(self): self.set() def set(self, le_scan_interval=0, le_scan_window=0, initiator_filter_policy=0, peer_address_type=0, peer_address='00:00:00:00:00:00', own_address_type=0, connection_interval_min=0, connection_interval_max=0, max_latency=0, supervision_timeout=0, min_ce_length=0, max_ce_length=0): self.le_scan_interval = le_scan_interval self.le_scan_window = le_scan_window self.initiator_filter_policy = initiator_filter_policy self.peer_address_type = peer_address_type self.peer_address = peer_address self.own_address_type = own_address_type self.connection_interval_min = connection_interval_min self.connection_interval_max = connection_interval_max self.max_latency = max_latency self.supervision_timeout = supervision_timeout self.min_ce_length = min_ce_length self.max_ce_length = max_ce_length self.ba_full_message = bytearray( struct.pack( '<HHBBBHHHHHH', le_scan_interval, le_scan_window, initiator_filter_policy, peer_address_type, own_address_type, connection_interval_min, connection_interval_max, max_latency, supervision_timeout, min_ce_length, max_ce_length)) peer_addr_ba = cmd_addr_to_ba(peer_address) self.ba_full_message[6:6] = peer_addr_ba ############ # RX / TX ############ @dataclass class HCI_Receive: packet_type: int def __init__(self): self.set() def set(self, packet_type=0): self.packet_type = packet_type @dataclass class HCI_Recv_Event_Packet(HCI_Receive): ev_code: int packet_len: int recv_data: bytearray current_event: None def __init__(self): self.set() def set(self, packet_type=0, ev_code=0, packet_len=0, recv_data=bytearray(256)): super().set(packet_type) self.ev_code = ev_code self.packet_len = packet_len self.recv_data = recv_data self.recv_data = recv_data[:packet_len] @dataclass class HCI_Recv_ACL_Data_Packet(HCI_Receive): connection_handle: int pb_flag: int bc_flag: int data_total_len: int data: bytearray def __init__(self): self.set() def set(self, packet_type=0, connection_handle=0, pb_flag=0, bc_flag=0, total_data_len=0, data=b''): super().set(packet_type) self.connection_handle = connection_handle self.pb_flag = pb_flag self.bc_flag = bc_flag self.data_total_len = total_data_len self.data = data @dataclass class HCI_Recv_L2CAP_Data: pdu_length: int channel_id: int data: bytearray def __init__(self): self.set() def set(self, pdu_length=0, channel_id=0, data=b''): self.pdu_length = pdu_length self.channel_id = channel_id self.data = data @dataclass class HCI_Cmd_Send: packet_type: int ogf: int ocf: int packet_len: int data: bytearray ba_full_message: bytearray def __init__(self): self.set() def set(self, ogf=0, ocf=0, data=b''): self.packet_type = HCI_COMMAND_PACKET self.ogf = ogf self.ocf = ocf self.opcode = get_opcode(ogf, ocf) self.packet_len = len(data) self.data = data self.ba_full_message = bytearray( struct.pack( '<BHB', self.packet_type, self.opcode, self.packet_len)) self.ba_full_message.extend(self.data) @dataclass class HCI_ACL_Data_Send: packet_type: int connection_handle: int pb_flag: int bc_flag: int data_total_length: int data: bytearray ba_full_message: bytearray def __init__(self): self.set() def set(self, connection_handle=0, pb_flag=0b00, bc_flag=0b00, data=b''): self.packet_type = HCI_ACL_DATA_PACKET self.connection_handle = connection_handle self.pb_flag = pb_flag self.bc_flag = bc_flag self.data_total_length = len(data) self.data = data self.ba_full_message = bytearray( struct.pack( '<BHH', self.packet_type, ((self.connection_handle & 0x0eff) | ( self.pb_flag << 12) | ( self.bc_flag << 14)), self.data_total_length)) self.ba_full_message.extend(self.data) @dataclass class L2CAP_Data_Send: pdu_length: int channel_id: int data: bytearray ba_full_message: bytearray def __init__(self): self.set() def set(self, pdu_length=0, channel_id=0, data=b''): if not pdu_length: self.pdu_length = len(data) else: self.pdu_length = pdu_length self.channel_id = channel_id self.data = data fmt_conf = "<HH" self.ba_full_message = bytearray( struct.pack( fmt_conf, self.pdu_length, self.channel_id)) self.ba_full_message.extend(data)