tools/hci_throughput/hci_socket.py (134 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import hci
import socket
import ctypes
import struct
import asyncio
import logging
import subprocess
import sys
import time
import multiprocessing
SOCKET_RECV_BUFFER_SIZE = 425984
SOCKET_RECV_TIMEOUT = 3
def btmgmt_dev_reset(index):
logging.info(f"Selecting index {index}")
proc = subprocess.Popen(['btmgmt', '-i', str(index), 'power', 'off'],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
class BindingError(Exception):
pass
class HCI_User_Channel_Socket_Error(BaseException):
pass
class HCI_User_Channel_Socket():
def __init__(self, device_index=0, device_mode=None,
asyncio_loop=None):
logging.debug(
"Device index: %s, Device address: %s",
device_index,
device_mode)
self.loop = asyncio_loop
self.libc = ctypes.cdll.LoadLibrary('libc.so.6')
self.rx_buffer_q = multiprocessing.Manager().Queue()
self.counter = 0
self.device_index = device_index
self.device_mode = device_mode
self.hci_socket = self.socket_create()
self.socket_bind(self.device_index)
self.socket_clear()
self.listener_proc = None
self.listener_ev = multiprocessing.Manager().Event()
def socket_create(self):
logging.debug("%s", self.socket_create.__name__)
new_socket = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW | socket.SOCK_NONBLOCK,
socket.BTPROTO_HCI)
if new_socket is None:
raise HCI_User_Channel_Socket_Error("Socket error. \
Opening socket failed")
new_socket.setblocking(False)
socket_size = new_socket.getsockopt(
socket.SOL_SOCKET, socket.SO_RCVBUF)
logging.info(f"Default socket recv buffer size: {socket_size}")
new_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 500000)
socket_size = new_socket.getsockopt(
socket.SOL_SOCKET, socket.SO_RCVBUF)
logging.info(f"Set socket recv buffer size: {socket_size}")
return new_socket
def socket_bind(self, index):
logging.debug("%s index: %s", self.socket_bind.__name__, index)
# addr: struct sockaddr_hci from /usr/include/bluetooth/hci.h
addr = struct.pack(
'HHH',
hci.AF_BLUETOOTH,
index,
hci.HCI_CHANNEL_USER)
retry_binding = 2
for i in range(retry_binding):
try:
bind = self.libc.bind(self.hci_socket.fileno(),
ctypes.cast(addr,
ctypes.POINTER(ctypes.c_ubyte)),
len(addr))
if bind != 0:
raise BindingError
except BindingError:
logging.warning("Binding error. Trying to reset bluetooth.")
btmgmt_dev_reset(self.device_index)
if i < retry_binding - 1:
continue
else:
self.hci_socket.close()
logging.error("Binding error. Check HCI index present.")
sys.exit()
logging.info("Binding done!")
break
def socket_clear(self):
logging.debug("%s", self.socket_clear.__name__)
try:
logging.info("Clearing the buffer...")
time.sleep(1)
cnt = 0
while True:
buff = self.hci_socket.recv(SOCKET_RECV_BUFFER_SIZE)
cnt += len(buff)
logging.debug(f"Read from buffer {cnt} bytes")
except BlockingIOError:
logging.info("Buffer empty and ready!")
return
async def send(self, ba_message):
await self.loop.sock_sendall(self.hci_socket, ba_message)
def socket_listener(self):
recv_at_once = 0
while True:
try:
if self.listener_ev.is_set():
logging.info("listener_ev set")
break
buffer = self.hci_socket.recv(SOCKET_RECV_BUFFER_SIZE)
logging.info(
f"Socket recv: {self.counter} th packet with len: {len(buffer)}")
self.rx_buffer_q.put((buffer, time.perf_counter()))
recv_at_once += 1
self.counter += 1
except BlockingIOError:
if recv_at_once > 1:
logging.info(f"Socket recv in one loop: {recv_at_once}")
recv_at_once = 0
pass
except BrokenPipeError:
logging.info("BrokenPipeError: Closing...")
print("BrokenPipeError. Press Ctrl-C to exit...")
def close(self):
logging.debug("%s ", self.close.__name__)
return self.hci_socket.close()
def start(self):
self.listener_proc = multiprocessing.Process(
target=self.socket_listener, daemon=True)
self.listener_proc.start()
logging.info(f"start listener_proc pid: {self.listener_proc.pid}")
def stop(self):
logging.info(f"stop listener_proc pid: {self.listener_proc.pid}")
self.listener_ev.set()
self.listener_proc.join()
self.close()