in tools/hci_throughput/hci_commands.py [0:0]
def process_returned_parameters(self):
def status() -> int:
current_ev_name = type(
self.hci_recv_ev_packet.current_event).__name__
if current_ev_name == type(hci.HCI_Ev_Cmd_Complete()).__name__:
return struct.unpack_from(
"<B", self.hci_recv_ev_packet.current_event.return_parameters, offset=0)[0]
elif current_ev_name == type(hci.HCI_Ev_Cmd_Status()).__name__:
return self.hci_recv_ev_packet.current_event.status
else:
return -100
current_ev = self.hci_recv_ev_packet.current_event
ogf, ocf = hci.get_ogf_ocf(current_ev.opcode)
if ogf == hci.OGF_HOST_CTL:
if ocf == hci.OCF_SET_EVENT_MASK:
return status()
elif ocf == hci.OCF_RESET:
return status()
elif ogf == hci.OGF_INFO_PARAM:
if ocf == hci.OCF_READ_LOCAL_COMMANDS:
hci.read_local_commands = hci.Read_Local_Commands()
hci.read_local_commands.set(
bytes(current_ev.return_parameters))
return status()
elif ocf == hci.OCF_READ_BD_ADDR:
hci.bdaddr = hci.ba_addr_to_str(
bytes(current_ev.return_parameters[1:7]))
return status()
elif ogf == hci.OGF_LE_CTL:
if ocf == hci.OCF_LE_SET_EVENT_MASK:
return status()
elif ocf == hci.OCF_LE_READ_BUFFER_SIZE_V1:
hci.le_read_buffer_size = hci.LE_Read_Buffer_Size()
hci.le_read_buffer_size.set(*struct.unpack("<BHB",
current_ev.return_parameters))
logging.info(f"LE Buffer size: {hci.le_read_buffer_size}")
return hci.le_read_buffer_size.status
elif ocf == hci.OCF_LE_READ_LOCAL_SUPPORTED_FEATURES:
hci.le_read_local_supported_features = hci.LE_Read_Local_Supported_Features()
hci.le_read_local_supported_features.set(
current_ev.return_parameters)
return status()
elif ocf == hci.OCF_LE_SET_RANDOM_ADDRESS:
return status()
elif ocf == hci.OCF_LE_SET_ADVERTISING_PARAMETERS:
return status()
elif ocf == hci.OCF_LE_SET_ADVERTISE_ENABLE:
return status()
elif ocf == hci.OCF_LE_SET_SCAN_PARAMETERS:
return status()
elif ocf == hci.OCF_LE_SET_SCAN_ENABLE:
return status()
elif ocf == hci.OCF_LE_CREATE_CONN:
return status()
elif ocf == hci.OCF_LE_SET_DATA_LEN:
return status()
elif ocf == hci.OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN:
hci.suggested_dflt_data_len = hci.Suggested_Dflt_Data_Length()
hci.suggested_dflt_data_len.set(*struct.unpack("<BHH",
current_ev.return_parameters))
logging.info(
f"Suggested Deafult Data Len: {hci.suggested_dflt_data_len}")
return status()
elif ocf == hci.OCF_LE_READ_MAX_DATA_LEN:
hci.max_data_len = hci.Max_Data_Length()
hci.max_data_len.set(*struct.unpack("<BHHHH",
current_ev.return_parameters))
logging.info(f"Suggested Max Data Len: {hci.max_data_len}")
if (hci.num_of_bytes_to_send >
hci.max_data_len.supported_max_tx_octets - hci.L2CAP_HDR_BYTES):
logging.critical(
f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
raise SystemExit(
f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
return status()
elif ocf == hci.OCF_LE_READ_PHY:
hci.phy = hci.LE_Read_PHY()
hci.phy.set(*struct.unpack('<BHBB',
current_ev.return_parameters))
logging.info(f"Current LE PHY: {hci.phy}")
return status()
elif ocf == hci.OCF_LE_SET_DFLT_PHY:
return status()
elif ocf == hci.OCF_LE_SET_PHY:
return status()
elif ogf == hci.OGF_VENDOR_SPECIFIC:
if ocf == hci.BLE_HCI_OCF_VS_RD_STATIC_ADDR:
if type(current_ev).__name__ == type(
hci.HCI_Ev_Cmd_Complete()).__name__:
hci.static_addr = hci.ba_addr_to_str(
bytes(current_ev.return_parameters[1:7]))
logging.info(f"Received rd static addr: {hci.static_addr}")
elif type(current_ev).__name__ == type(hci.HCI_Ev_Cmd_Status()).__name__:
logging.info(f"Rd static addr status: {current_ev.status}")
return status()
else:
return -100