tools/hci_throughput/hci_commands.py (663 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import asyncio
import struct
import hci
import sys
import time
async def wait_ev(ev):
while ev.is_set() == False:
await asyncio.sleep(0.000001)
async def wait_for_event(ev, timeout):
try:
await asyncio.wait_for(wait_ev(ev), timeout)
except TimeoutError as e:
logging.error(f"Timeout waiting for event: {e}")
sys.exit()
class HCI_Commands():
def __init__(self, send=None, rx_buffer_q=None,
asyncio_loop=None, tp=None, device_mode="rx"):
self.hci_send_cmd = hci.HCI_Cmd_Send()
self.hci_send_acl_data = hci.HCI_ACL_Data_Send()
self.hci_recv_ev_packet = hci.HCI_Recv_Event_Packet()
self.async_sem_cmd = asyncio.Semaphore()
self.async_ev_cmd_end = asyncio.Event()
self.async_ev_connected = asyncio.Event()
self.async_ev_encryption_change = asyncio.Event()
self.async_ev_set_data_len = asyncio.Event()
self.async_ev_update_phy = asyncio.Event()
self.async_ev_num_cmp_pckts = asyncio.Event()
self.async_ev_recv_data_finish = asyncio.Event()
self.async_ev_rx_wait_finish = asyncio.Event()
self.async_lock_packets_cnt = asyncio.Lock()
self.valid_recv_data = 0
self.expected_recv_data = 0
self.last_timestamp = 0
self.sent_packets_counter = 0
self.send = send
self.rx_buffer_q = rx_buffer_q
self.tp = tp
self.loop = asyncio_loop
self.device_mode = device_mode
async def rx_buffer_q_wait(self):
try:
logging.debug("%s", self.rx_buffer_q_wait.__name__)
while not self.async_ev_rx_wait_finish.is_set():
if self.rx_buffer_q.empty():
await asyncio.sleep(0.000000001)
continue
await self.loop.create_task(self.recv_handler())
logging.info("rx_buffer_q_wait finished")
self.async_ev_rx_wait_finish.clear()
except asyncio.CancelledError:
logging.critical("rx_buffer_q_wait task canceled")
""" 7.3 Controller & Baseband commands """
async def cmd_set_event_mask(self, mask: int = 0x00001fffffffffff):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_HOST_CTL, hci.OCF_SET_EVENT_MASK,
struct.pack('<Q', mask))
logging.debug("%s %s", self.cmd_set_event_mask.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_reset(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_HOST_CTL, hci.OCF_RESET)
logging.debug("%s %s", self.cmd_reset.__name__, self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
""" 7.4 Informational parameters """
async def cmd_read_local_supported_cmds(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_INFO_PARAM,
hci.OCF_READ_LOCAL_COMMANDS)
logging.debug("%s %s", self.cmd_read_local_supported_cmds.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_read_bd_addr(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_INFO_PARAM, hci.OCF_READ_BD_ADDR)
logging.debug("%s %s", self.cmd_read_bd_addr.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
""" 7.8 LE Controller Commands """
async def cmd_le_set_event_mask(self, mask: int = 0x000000000000001f):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_EVENT_MASK,
struct.pack('<Q', mask))
logging.debug("%s %s", self.cmd_le_set_event_mask.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_buffer_size(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_BUFFER_SIZE_V1)
logging.debug("%s %s", self.cmd_le_read_buffer_size.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_local_supported_features(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_LOCAL_SUPPORTED_FEATURES)
logging.debug("%s %s",
self.cmd_le_read_local_supported_features.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_random_addr(self, addr: str):
async with self.async_sem_cmd:
addr_ba = hci.cmd_addr_to_ba(addr)
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_RANDOM_ADDRESS,
addr_ba)
logging.debug("%s %s", self.cmd_le_set_random_addr.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_advertising_params(self, adv_params: hci.HCI_Advertising):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_ADVERTISING_PARAMETERS,
adv_params.ba_full_message)
logging.debug("%s %s", self.cmd_le_set_advertising_params.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_advertising_enable(self, adv_en: int = 0):
""" Default: Disabled """
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_ADVERTISE_ENABLE,
struct.pack('<B', adv_en))
logging.debug("%s %s", self.cmd_le_set_advertising_enable.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_scan_params(self, scan_params: hci.HCI_Scan):
async with self.async_sem_cmd:
self.hci_send_cmd.set(
hci.OGF_LE_CTL,
hci.OCF_LE_SET_SCAN_PARAMETERS,
scan_params.ba_full_message)
logging.debug("%s %s", self.cmd_le_set_scan_params.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_scan_enable(self, scan_en: int = 0, filter_dup: int = 0):
""" Default:
scan_en: disabled
filter_dup: disabled
"""
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_SCAN_ENABLE,
struct.pack('<BB', scan_en, filter_dup))
logging.debug("%s %s", self.cmd_le_set_scan_enable.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_create_connection(self, con_params: hci.HCI_Connect):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_CREATE_CONN,
con_params.ba_full_message)
logging.debug("%s %s", self.cmd_le_create_connection.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_enable_encryption(self, conn_handle: int, random_number: int, ediv: int, ltk: int):
async with self.async_sem_cmd:
hci.ltk = ltk
random_number_bytes = random_number.to_bytes(8, byteorder='little')
ltk_bytes = ltk.to_bytes(16, byteorder='little')
data_bytes = struct.pack("<H", conn_handle) + random_number_bytes + \
struct.pack("<H", ediv) + ltk_bytes
self.hci_send_cmd.set(
hci.OGF_LE_CTL,
hci.OCF_LE_ENABLE_ENCRYPTION,
data_bytes)
logging.debug("%s %s", self.cmd_le_enable_encryption.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_long_term_key_request_reply(self, conn_handle: int, ltk: int):
async with self.async_sem_cmd:
ltk_bytes = ltk.to_bytes(16, byteorder='little')
data_bytes = struct.pack('<H', conn_handle) + ltk_bytes
self.hci_send_cmd.set(
hci.OGF_LE_CTL,
hci.OCF_LE_LONG_TERM_KEY_REQUEST_REPLY,
data_bytes)
logging.debug(
"%s %s",
self.cmd_le_long_term_key_request_reply.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
# Is run from another command,
# don't need to wait for cmd complete.
# await self.async_ev_cmd_end.wait()
# self.async_ev_cmd_end.clear()
async def cmd_le_set_data_len(self, conn_handle: int, tx_octets: int, tx_time: int):
""" conn_handle: Range 0x0000 to 0x0EFF
tx_octets: Range 0x001B to 0x00FB
tx_time: Range 0x0148 to 0x4290
"""
logging.debug("%s", self.cmd_le_set_data_len.__name__)
async with self.async_sem_cmd:
if tx_octets == 0 or tx_time == 0:
tx_octets = hci.max_data_len.supported_max_tx_octets
tx_time = hci.max_data_len.supported_max_tx_time
hci.requested_tx_octets = tx_octets
hci.requested_tx_time = tx_time
while conn_handle != hci.conn_handle:
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_DATA_LEN,
struct.pack('<HHH', conn_handle,
tx_octets, tx_time))
logging.debug("%s %s", self.cmd_le_set_data_len.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_suggested_dflt_data_len(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN)
logging.debug(
"%s %s",
self.cmd_le_read_suggested_dflt_data_len.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_max_data_len(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_MAX_DATA_LEN)
logging.debug("%s %s", self.cmd_le_read_max_data_len.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_phy(self, conn_handle: int):
""" conn_handle: Range 0x0000 to 0x0EFF
"""
async with self.async_sem_cmd:
while conn_handle != hci.conn_handle:
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_READ_PHY,
struct.pack('<H', conn_handle))
logging.debug("%s %s", self.cmd_le_read_phy.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_dflt_phy(self, all_phys: int = 0, tx_phys: int = 0, rx_phys: int = 0):
""" Default:
all_phys: 0 - The Host has no preference among the transmitter PHYs
supported by the Controller
tx_phys: 0 - The Host prefers to use the LE 1M transmitter PHY
(possibly among others)
rx_phys: 0 - The Host prefers to use the LE 1M receiver PHY
(possibly among others)
"""
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_DFLT_PHY,
struct.pack('<BBB', all_phys,
tx_phys, rx_phys))
logging.debug("%s %s", self.cmd_le_set_dflt_phy.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_phy(self, conn_handle: int, all_phys: int = 0,
tx_phys: int = 0, rx_phys: int = 0,
phy_options: int = 0):
""" Default:
conn_handle: Range 0x0000 to 0x0EFF
all_phys: The Host has no preference among the transmitter PHYs
supported by the Controller
tx_phys: 0 - The Host prefers to use the LE 1M transmitter PHY
(possibly among others)
rx_phys: 0 - The Host prefers to use the LE 1M receiver PHY
(possibly among others)
phy_options: 0 - the Host has no preferred coding when
transmitting on the LE Coded PHY
"""
async with self.async_sem_cmd:
while conn_handle != hci.conn_handle:
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_PHY,
struct.pack('<HBBBH', conn_handle, all_phys,
tx_phys, rx_phys, phy_options))
logging.debug("%s %s", self.cmd_le_set_phy.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_vs_read_static_addr(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_VENDOR_SPECIFIC,
hci.BLE_HCI_OCF_VS_RD_STATIC_ADDR)
logging.debug("%s %s", self.cmd_vs_read_static_addr.__name__,
self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
""" Send data """
async def acl_data_send(self, acl_data: hci.HCI_ACL_Data_Send):
async with self.async_sem_cmd:
acl_data.connection_handle = hci.conn_handle
self.hci_send_acl_data = acl_data
await self.send(self.hci_send_acl_data.ba_full_message)
self.sent_packets_counter += 1
""" Parse and process received data"""
def parse_ev_disconn_cmp(self, data: bytes):
ev_disconn_cmp = hci.HCI_Ev_Disconn_Complete()
ev_disconn_cmp.set(*struct.unpack('<BHB', bytes(data[:4])))
return ev_disconn_cmp
def parse_ev_cmd_cmp(self, data: bytes):
ev_cmd_cmp = hci.HCI_Ev_Cmd_Complete()
ev_cmd_cmp.set(*struct.unpack('<BH', bytes(data[:3])), data[3:])
return ev_cmd_cmp
def parse_ev_cmd_stat(self, data: bytes):
ev_cmd_stat = hci.HCI_Ev_Cmd_Status()
ev_cmd_stat.set(*struct.unpack('<BBH', bytes(data[:4])))
return ev_cmd_stat
def parse_ev_encryption_change(self, data: bytes):
ev_encryption_change = hci.HCI_Ev_LE_Encryption_Change()
ev_encryption_change.set(*struct.unpack('<BHB', bytes(data[:4])))
return ev_encryption_change
def parse_ev_le_meta(self, data: bytes):
ev_le_meta = hci.HCI_Ev_LE_Meta()
ev_le_meta.set(data[0])
return ev_le_meta
def parse_subev_le_enhcd_conn_cmp(self, data: bytes):
ev_le_enhcd_conn_cmp = hci.HCI_Ev_LE_Enhanced_Connection_Complete()
ev_le_enhcd_conn_cmp.set(*struct.unpack('<BBHBB', bytes(data[:6])),
hci.ba_addr_to_str(bytes(data[6:12])),
hci.ba_addr_to_str(bytes(data[12:18])),
hci.ba_addr_to_str(bytes(data[18:24])),
*struct.unpack('<HHHB', bytes(data[24:])))
return ev_le_enhcd_conn_cmp
def parse_subev_le_data_len_change(self, data: bytes):
ev_le_data_len_change = hci.HCI_Ev_LE_Data_Length_Change()
ev_le_data_len_change.set(*struct.unpack('<BHHHHH', bytes(data[:11])))
return ev_le_data_len_change
def parse_subev_le_long_term_key_request(self, data: bytes):
ev_le_long_term_key_request = hci.HCI_Ev_LE_Long_Term_Key_Request()
ev_le_long_term_key_request.set(
*struct.unpack('<BHQH', bytes(data[:13])))
return ev_le_long_term_key_request
def parse_subev_le_phy_update_cmp(self, data: bytes):
le_phy_update_cmp = hci.HCI_Ev_LE_PHY_Update_Complete()
le_phy_update_cmp.set(*struct.unpack('<BBHBB', data))
return le_phy_update_cmp
def parse_subev_le_chan_sel_alg(self, data: bytes):
le_chan_sel_alg = hci.HCI_Ev_LE_Chan_Sel_Alg()
le_chan_sel_alg.set(*struct.unpack('<BHB', data))
return le_chan_sel_alg
def parse_num_comp_pkts(self, data: bytes):
hci.ev_num_comp_pkts = hci.HCI_Number_Of_Completed_Packets()
hci.ev_num_comp_pkts.set(*struct.unpack('<BHH', bytes(data[:5])))
return hci.ev_num_comp_pkts
def process_returned_parameters(self):
def status() -> int:
current_ev_name = type(
self.hci_recv_ev_packet.current_event).__name__
if current_ev_name == type(hci.HCI_Ev_Cmd_Complete()).__name__:
return struct.unpack_from(
"<B", self.hci_recv_ev_packet.current_event.return_parameters, offset=0)[0]
elif current_ev_name == type(hci.HCI_Ev_Cmd_Status()).__name__:
return self.hci_recv_ev_packet.current_event.status
else:
return -100
current_ev = self.hci_recv_ev_packet.current_event
ogf, ocf = hci.get_ogf_ocf(current_ev.opcode)
if ogf == hci.OGF_HOST_CTL:
if ocf == hci.OCF_SET_EVENT_MASK:
return status()
elif ocf == hci.OCF_RESET:
return status()
elif ogf == hci.OGF_INFO_PARAM:
if ocf == hci.OCF_READ_LOCAL_COMMANDS:
hci.read_local_commands = hci.Read_Local_Commands()
hci.read_local_commands.set(
bytes(current_ev.return_parameters))
return status()
elif ocf == hci.OCF_READ_BD_ADDR:
hci.bdaddr = hci.ba_addr_to_str(
bytes(current_ev.return_parameters[1:7]))
return status()
elif ogf == hci.OGF_LE_CTL:
if ocf == hci.OCF_LE_SET_EVENT_MASK:
return status()
elif ocf == hci.OCF_LE_READ_BUFFER_SIZE_V1:
hci.le_read_buffer_size = hci.LE_Read_Buffer_Size()
hci.le_read_buffer_size.set(*struct.unpack("<BHB",
current_ev.return_parameters))
logging.info(f"LE Buffer size: {hci.le_read_buffer_size}")
return hci.le_read_buffer_size.status
elif ocf == hci.OCF_LE_READ_LOCAL_SUPPORTED_FEATURES:
hci.le_read_local_supported_features = hci.LE_Read_Local_Supported_Features()
hci.le_read_local_supported_features.set(
current_ev.return_parameters)
return status()
elif ocf == hci.OCF_LE_SET_RANDOM_ADDRESS:
return status()
elif ocf == hci.OCF_LE_SET_ADVERTISING_PARAMETERS:
return status()
elif ocf == hci.OCF_LE_SET_ADVERTISE_ENABLE:
return status()
elif ocf == hci.OCF_LE_SET_SCAN_PARAMETERS:
return status()
elif ocf == hci.OCF_LE_SET_SCAN_ENABLE:
return status()
elif ocf == hci.OCF_LE_CREATE_CONN:
return status()
elif ocf == hci.OCF_LE_SET_DATA_LEN:
return status()
elif ocf == hci.OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN:
hci.suggested_dflt_data_len = hci.Suggested_Dflt_Data_Length()
hci.suggested_dflt_data_len.set(*struct.unpack("<BHH",
current_ev.return_parameters))
logging.info(
f"Suggested Deafult Data Len: {hci.suggested_dflt_data_len}")
return status()
elif ocf == hci.OCF_LE_READ_MAX_DATA_LEN:
hci.max_data_len = hci.Max_Data_Length()
hci.max_data_len.set(*struct.unpack("<BHHHH",
current_ev.return_parameters))
logging.info(f"Suggested Max Data Len: {hci.max_data_len}")
if (hci.num_of_bytes_to_send >
hci.max_data_len.supported_max_tx_octets - hci.L2CAP_HDR_BYTES):
logging.critical(
f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
raise SystemExit(
f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
return status()
elif ocf == hci.OCF_LE_READ_PHY:
hci.phy = hci.LE_Read_PHY()
hci.phy.set(*struct.unpack('<BHBB',
current_ev.return_parameters))
logging.info(f"Current LE PHY: {hci.phy}")
return status()
elif ocf == hci.OCF_LE_SET_DFLT_PHY:
return status()
elif ocf == hci.OCF_LE_SET_PHY:
return status()
elif ogf == hci.OGF_VENDOR_SPECIFIC:
if ocf == hci.BLE_HCI_OCF_VS_RD_STATIC_ADDR:
if type(current_ev).__name__ == type(
hci.HCI_Ev_Cmd_Complete()).__name__:
hci.static_addr = hci.ba_addr_to_str(
bytes(current_ev.return_parameters[1:7]))
logging.info(f"Received rd static addr: {hci.static_addr}")
elif type(current_ev).__name__ == type(hci.HCI_Ev_Cmd_Status()).__name__:
logging.info(f"Rd static addr status: {current_ev.status}")
return status()
else:
return -100
def parse_acl_data(self, buffer: bytes):
packet_type, handle_pb_bc_flags, data_len = struct.unpack('<BHH',
buffer[:5])
handle = handle_pb_bc_flags & 0x0EFF
pb_flag = (handle_pb_bc_flags & 0x3000) >> 12
bc_flag = (handle_pb_bc_flags & 0xC000) >> 14
hci_recv_acl_data_packet = hci.HCI_Recv_ACL_Data_Packet()
if pb_flag == 0b10:
l2cap_data = hci.HCI_Recv_L2CAP_Data()
data = buffer[5:]
l2cap_data.set(*struct.unpack("<HH", data[:4]), data[4:])
else:
l2cap_data = buffer[5:]
hci_recv_acl_data_packet.set(
packet_type=packet_type,
connection_handle=handle,
pb_flag=pb_flag,
bc_flag=bc_flag,
total_data_len=data_len,
data=l2cap_data)
return hci_recv_acl_data_packet
def parse_subevent(self, subev_code: int):
if subev_code == hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_enhcd_conn_cmp(
self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP
elif subev_code == hci.HCI_SUBEV_CODE_LE_LONG_TERM_KEY_REQUEST:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_long_term_key_request(
self.hci_recv_ev_packet.recv_data)
hci.events_list.append(
(hci.HCI_SUBEV_CODE_LE_LONG_TERM_KEY_REQUEST,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_LONG_TERM_KEY_REQUEST
elif subev_code == hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_data_len_change(
self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE
elif subev_code == hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_phy_update_cmp(
self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP
elif subev_code == hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG:
self.hci_recv_ev_packet.current_event = \
self.parse_subev_le_chan_sel_alg(
self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG
else:
return -1
def parse_event(self, buffer: bytes):
self.hci_recv_ev_packet.set(*struct.unpack('<BBB', bytes(buffer[:3])),
buffer[3:])
if self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_DISCONN_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_disconn_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_DISCONN_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_DISCONN_CMP
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_CMD_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_cmd_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_CMD_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_CMD_CMP
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_CMD_STATUS:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_cmd_stat(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_CMD_STATUS,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_CMD_STATUS
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_ENCRYPTION_CHANGE:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_encryption_change(
self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_ENCRYPTION_CHANGE,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_ENCRYPTION_CHANGE
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_LE_META_EVENT:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_le_meta(self.hci_recv_ev_packet.recv_data)
return hci.HCI_EV_CODE_LE_META_EVENT
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_NUM_COMP_PKTS:
self.hci_recv_ev_packet.current_event = \
self.parse_num_comp_pkts(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_NUM_COMP_PKTS,
self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_NUM_COMP_PKTS
else:
return -1
async def handle_event(self, buffer: bytes):
event_code = self.parse_event(buffer)
curr_ev = self.hci_recv_ev_packet.current_event
if event_code == hci.HCI_EV_CODE_DISCONN_CMP:
logging.debug("Received code: %s - HCI_EV_CODE_DISCONN_CMP",
event_code)
logging.debug(
"Status: %s for event: %s - HCI_EV_CODE_DISCONN_CMP",
curr_ev.status,
self.hci_recv_ev_packet.current_event)
if curr_ev.reason == hci.CONN_FAILED_TO_BE_ESTABLISHED:
logging.error(
f"Connection failed to be established. Exiting...")
raise Exception(
"Connection failed to be established. Exiting...")
if curr_ev.reason == hci.CONN_TIMEOUT:
logging.error(f"Connection timeout. Exiting...")
raise Exception("Connection timeout. Exiting...")
elif event_code == hci.HCI_EV_CODE_CMD_CMP:
logging.debug("Received code: %s - HCI_EV_CODE_CMD_CMP",
event_code)
sent_opcode = self.hci_send_cmd.opcode
recv_opcode = curr_ev.opcode
if sent_opcode == recv_opcode:
status = self.process_returned_parameters()
if status != 0:
logging.error(
"Status: %s for event: %s - HCI_EV_CODE_CMD_CMP",
status,
curr_ev)
self.async_ev_cmd_end.set()
elif event_code == hci.HCI_EV_CODE_CMD_STATUS:
logging.debug(
"Received code: %s - HCI_EV_CODE_CMD_STATUS", event_code)
sent_opcode = self.hci_send_cmd.opcode
recv_opcode = curr_ev.opcode
if sent_opcode == recv_opcode:
status = self.process_returned_parameters()
if status != 0:
logging.error("Status: %s for event: %s", status, curr_ev)
self.async_ev_cmd_end.set()
elif event_code == hci.HCI_EV_CODE_ENCRYPTION_CHANGE:
logging.debug(
"Received code: %s - HCI_EV_CODE_ENCRYPTION_CHANGE",
event_code)
status = curr_ev.status
encryption_enabled = curr_ev.encryption_enabled
if (status == 0 and encryption_enabled != 0):
self.async_ev_encryption_change.set()
else:
raise Exception(
"Encryption failed. Status: %d, encryption enabled: %d",
status, encryption_enabled)
elif event_code == hci.HCI_EV_CODE_LE_META_EVENT:
logging.debug(
"Received code: %s - HCI_EV_CODE_LE_META_EVENT", event_code)
subev_code = self.parse_subevent(curr_ev.subevent_code)
if subev_code == hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP:
logging.debug(
"Received subev code: %s - HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP",
subev_code)
hci.conn_handle = self.hci_recv_ev_packet.current_event.connection_handle
if self.async_ev_connected.is_set() == False:
logging.info("Connection established. Event received.")
self.async_ev_connected.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE:
logging.debug(
"Received subev code: %s - HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE",
subev_code)
self.async_ev_set_data_len.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP:
logging.debug(
"Received subev code: %s - HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP",
subev_code)
self.async_ev_update_phy.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG:
logging.debug(
"Received subev code: %s - HCI_SUBEV_CODE_LE_CHAN_SEL_ALG",
subev_code)
elif subev_code == hci.HCI_SUBEV_CODE_LE_LONG_TERM_KEY_REQUEST:
logging.debug(
"Received subev code: %s - HCI_SUBEV_CODE_LE_LONG_TERM_KEY_REQUEST",
subev_code)
await self.cmd_le_long_term_key_request_reply(
hci.conn_handle, hci.ltk)
elif subev_code < 0:
logging.warning(f"Unknown received subevent: {buffer}\n")
elif event_code == hci.HCI_EV_NUM_COMP_PKTS:
logging.debug(
"Received code: %s - HCI_EV_NUM_COMP_PKTS", event_code)
async with self.async_lock_packets_cnt:
hci.num_of_completed_packets_cnt += curr_ev.num_completed_packets
hci.num_of_completed_packets_time = time.perf_counter()
self.async_ev_num_cmp_pckts.set()
if event_code < 0:
logging.warning(f"Unknown received event: {buffer}\n")
else:
logging.debug("%s \t%s ", self.handle_event.__name__,
self.hci_recv_ev_packet)
def match_recv_l2cap_data(self, buffer: bytes, timestamp: int):
self.expected_recv_data += self.tp.predef_packet_key
packet_key = struct.unpack("<I", buffer[-4:])[0]
result = self.expected_recv_data == packet_key
if result:
self.valid_recv_data += 1
logging.info(f"L2CAP packet number - Received: {packet_key}, \
Expected: {self.expected_recv_data}, Result: {result}")
packet_number = (packet_key / self.tp.predef_packet_key) - 1
self.tp.append_to_csv_file(timestamp, packet_number)
# if self.tp and self.device_mode == "rx":
# if timestamp - self.last_timestamp > self.tp.sample_time \
# or packet_number == 0 \
# or packet_number == self.tp.total_packets_number-1:
# self.tp.record_throughput(packet_number, timestamp)
# self.last_timestamp = timestamp
if packet_number >= self.tp.total_packets_number - 1:
self.async_ev_recv_data_finish.set()
def handle_acl_data(self, buffer: bytes, timestamp: int):
hci_recv_acl_data_packet = self.parse_acl_data(buffer)
logging.debug("%s", hci_recv_acl_data_packet)
recv_data_type = type(hci_recv_acl_data_packet.data).__name__
if recv_data_type == 'HCI_Recv_L2CAP_Data':
self.match_recv_l2cap_data(buffer, timestamp)
async def recv_handler(self):
while not self.rx_buffer_q.empty():
q_buffer_item, q_timestamp = self.rx_buffer_q.get()
packet_type = struct.unpack('<B', bytes(q_buffer_item[:1]))[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.handle_acl_data(q_buffer_item, q_timestamp)
elif packet_type == hci.HCI_EVENT_PACKET:
await self.loop.create_task(self.handle_event(q_buffer_item))