testtools/UART_interface/mxchip_uart_interface.py (115 lines of code) (raw):

import os import sys import time try: from testtools.UART_interface.base_uart_interface import uart_interface except: from base_uart_interface import uart_interface try: import testtools.UART_interface.azure_test_firmware_errors as azure_test_firmware_errors import testtools.UART_interface.serial_settings as serial_settings import testtools.UART_interface.serial_commands_dict as commands_dict except: import azure_test_firmware_errors import serial_settings import serial_commands_dict as commands_dict # ------- global usecase fcns ------- def device_setup(): if 'posix' in os.name: # check to see that mxchip firmware drop path resolves for x in range(serial_settings.wait_for_flash): exists = os.path.exists(serial_settings.mxchip_file) if exists: break time.sleep(1) if x >= (serial_settings.wait_for_flash - 1): # firmware flash path did not resolve print("MXCHIP drop path did not resolve, chip did not flash successfully.") raise FileExistsError def check_sdk_errors(line): if "ERROR:" in line: azure_test_firmware_errors.SDK_ERRORS += 1 def check_firmware_errors(line): if azure_test_firmware_errors.iot_init_failure in line: print("Failed to connect to saved IoT Hub!") azure_test_firmware_errors.SDK_ERRORS += 1 elif azure_test_firmware_errors.sensor_init_failure in line: print("Failed to init mxchip sensor") azure_test_firmware_errors.SDK_ERRORS += 1 elif azure_test_firmware_errors.wifi_failure in line: print("Failed to connect to saved WiFi network.") azure_test_firmware_errors.SDK_ERRORS += 1 # ------- interface class ------- class mxchip_uart_interface(uart_interface): # Note: commands on MXCHIP have line endings with \r AND \n # If there is a sudden disconnect, program should report line in input script reached, and close files. # method to write to serial line with connection monitoring def serial_write(self, ser, message, file=None): # Check that the device is no longer sending bytes if ser.in_waiting: self.serial_read(ser, message, file) # Check that the serial connection is open if ser.writable(): wait = serial_settings.mxchip_buf_pause # wait for at least 50ms between 128 byte writes. buf = bytearray((message.strip() + '\r\n').encode('ascii')) buf_len = len(buf) # needed for loop as buf is a destructed list bytes_written = 0 timeout = time.time() while bytes_written < buf_len: temp_written = ser.write(buf[:128]) buf = buf[temp_written:] bytes_written += temp_written # print("bytes written: %d" %bytes_written) time.sleep(wait) if (time.time() - timeout > serial_settings.serial_comm_timeout): break # print("final written: %d" %bytes_written) return bytes_written else: try: time.sleep(2) ser.open() self.serial_write(ser, message, file) except: return False # Read from serial line with connection monitoring # If there is a sudden disconnect, program should report line in input script reached, and close files. def serial_read(self, ser, message, file, first_read=False): # Special per opt handling: if "send_telemetry" in message or "set_az_iothub" in message: time.sleep(.15) elif "exit" in message and first_read: time.sleep(serial_settings.wait_for_flash) output = ser.in_waiting while output < 4: time.sleep(1) output = ser.in_waiting print("%d bytes in waiting" % output) if ser.readable(): output = ser.readline(ser.in_waiting) output = output.decode(encoding='utf-8', errors='ignore') check_firmware_errors(output) check_sdk_errors(output) print(output) try: # File must exist to write to it file.writelines(output) except: pass return output else: try: time.sleep(2) ser.open() self.serial_read(ser, message, file, first_read=True) except: return False # Note: the buffer size on the mxchip appears to be 128 Bytes. def write_read(self, ser, input_file, output_file): serial_settings.bits_to_cache = 1600 if input_file: # set wait between read/write wait = (serial_settings.bits_to_cache/serial_settings.baud_rate) with open(input_file) as fp: # initialize input/output_file line = fp.readline() f = open(output_file, 'w+') # Output file while line: # time.sleep(.1) # Print only instruction, not secret content if line.split(): print("Sending %s" %line.split()[0]) # Attempt to write to serial port if not self.serial_write(ser, line, f): print("Failed to write to serial port, please diagnose connection.") f.close() break time.sleep(1) # Attempt to read serial port output = self.serial_read(ser, line, f, first_read=True) while(output): time.sleep(wait) output = self.serial_read(ser, line, f) line = fp.readline() # read any trailing output, save to file while (ser.in_waiting): time.sleep(.2) output = self.serial_read(ser, line, f) f.close()