testtools/UART_interface/serial_connect.py (145 lines of code) (raw):

import serial import os import sys import getopt import time try: import testtools.UART_interface.azure_test_firmware_errors as azure_test_firmware_errors import testtools.UART_interface.serial_settings as serial_settings import testtools.UART_interface.serial_commands_dict as commands_dict import testtools.UART_interface.rpi_uart_interface import testtools.UART_interface.mxchip_uart_interface import testtools.UART_interface.esp_arduino_uart_interface except: import azure_test_firmware_errors import serial_settings import serial_commands_dict as commands_dict import rpi_uart_interface import mxchip_uart_interface import esp_arduino_uart_interface # Note: This collection of scripts is designed to be used as a command line script with args (for automation purposes) # to communicate over serial to Azure IoT C SDK supported devices. uart = None def usage(): # Iterates through command dictionary to print out script's opt usage usage_txt = "serial_connect.py usage: \r\n" for commands in commands_dict.cmds: usage_txt += " - %s: " %commands + commands_dict.cmds[commands]['text'] + "\r\n" return usage_txt def parse_opts(): options, remainder = getopt.gnu_getopt(sys.argv[1:], 'hsri:o:b:p:m:d:t:', ['input', 'output', 'help', 'skip', 'baudrate', 'port', 'mxchip_file', 'device', 'timeout', 'reset']) # print('OPTIONS :', options) for opt, arg in options: if opt in ('-h', '--help'): print(usage()) elif opt in ('-i', '--input'): serial_settings.input_file = arg elif opt in ('-o', '--output'): serial_settings.output_file = arg elif opt in ('-b', '--baudrate'): serial_settings.baud_rate = int(arg) elif opt in ('-p', '--port'): serial_settings.port = arg elif opt in ('-m', '--mxchip_file'): serial_settings.mxchip_file = "/media/newt/" + arg elif opt in ('-s', '--skip'): serial_settings.skip_setup = True elif opt in ('-d', '--device'): serial_settings.device_type = arg elif opt in ('-t', '--timeout'): serial_settings.test_timeout = int(arg) elif opt in ('-r', '--reset'): serial_settings.reset_device = True def check_sdk_errors(line): if "ERROR:" in line: azure_test_firmware_errors.SDK_ERRORS += 1 def check_firmware_errors(line): if azure_test_firmware_errors.iot_init_failure in line: print("Failed to connect to saved IoT Hub!") azure_test_firmware_errors.SDK_ERRORS += 1 elif azure_test_firmware_errors.sensor_init_failure in line: print("Failed to init mxchip sensor") azure_test_firmware_errors.SDK_ERRORS += 1 elif azure_test_firmware_errors.wifi_failure in line: print("Failed to connect to saved WiFi network.") azure_test_firmware_errors.SDK_ERRORS += 1 # Method to write to serial line with connection monitoring def serial_write(ser, message, file=None): # Check that the device is no longer sending bytes if ser.in_waiting: serial_read(ser, message, file) # Check that the serial connection is open if ser.writable(): wait = serial_settings.mxchip_buf_pause # wait for at least 50ms between 128 byte writes. buf = bytearray((message.strip()+'\r\n').encode('ascii')) buf_len = len(buf) # needed for loop as buf is a destructed list bytes_written = 0 timeout = time.time() while bytes_written < buf_len: round = ser.write(buf[:128]) buf = buf[round:] bytes_written += round # print("bytes written: %d" %bytes_written) time.sleep(wait) if (time.time() - timeout > serial_settings.serial_comm_timeout): break # print("final written: %d" %bytes_written) return bytes_written else: try: time.sleep(2) ser.open() serial_write(ser, message, file) except: return False # Read from serial line with connection monitoring # If there is a sudden disconnect, program should report line in input script reached, and close files. def serial_read(ser, message, file, first_read=False): # Special per opt handling: if "send_telemetry" in message or "set_az_iothub" in message: time.sleep(.15) elif "exit" in message and first_read: time.sleep(serial_settings.wait_for_flash) output = ser.in_waiting while output < 4: time.sleep(1) output = ser.in_waiting print("%d bytes in waiting" %output) if ser.readable(): output = ser.readline(ser.in_waiting) output = output.decode(encoding='utf-8', errors='ignore') check_firmware_errors(output) check_sdk_errors(output) print(output) try: # File must exist to write to it file.writelines(output) except: pass def run(): if 'mxchip' in serial_settings.device_type: mxchip_uart_interface.device_setup() ser = serial.Serial(port=serial_settings.port, baudrate=serial_settings.baud_rate) time.sleep(.5) # Print initial message output = ser.readline(ser.in_waiting) output = output.strip().decode(encoding='utf-8', errors='ignore') print(output) if serial_settings.skip_setup: # skip waiting for WiFi and IoT Hub setup while (ser.in_waiting): time.sleep(.1) output = ser.readline(ser.in_waiting) output = output.strip().decode(encoding='utf-8', errors='ignore') mxchip_uart_interface.check_firmware_errors(output) # Do we want to save this output to file if there is an error printed? if len(output) > 4: print(output) else: # Wait for WiFi and IoT Hub setup to complete start_time = time.time() # for mxchip, branch this away while(serial_settings.setup_string not in output) and ((time.time() - start_time) < (3*serial_settings.wait_for_flash + 5)): time.sleep(.1) output = ser.readline(ser.in_waiting) output = output.strip().decode(encoding='utf-8', errors='ignore') mxchip_uart_interface.check_firmware_errors(output) if len(output) > 4: print(output) if 'rpi' in serial_settings.device_type or 'raspi' in serial_settings.device_type: # for rpi, transfer pipeline/artifact files from agent to device # get filepath, walk it # for each file, send rz, then sz file to rpi port uart = rpi_uart_interface.rpi_uart_interface() elif 'mxchip' in serial_settings.device_type: uart = mxchip_uart_interface.mxchip_uart_interface() elif 'esp32' in serial_settings.device_type or 'esp8266' in serial_settings.device_type: uart = esp_arduino_uart_interface.esp_uart_interface() uart.write_read(ser, serial_settings.input_file, serial_settings.output_file) ser.reset_input_buffer() ser.reset_output_buffer() ser.close() print("Num of Errors: %d" %azure_test_firmware_errors.SDK_ERRORS) sys.exit(azure_test_firmware_errors.SDK_ERRORS) if __name__ == '__main__': parse_opts() run()