testtools/UART_interface/esp_arduino_uart_interface.py (132 lines of code) (raw):

import os import sys import time try: from testtools.UART_interface.base_uart_interface import uart_interface import testtools.UART_interface.azure_test_firmware_errors as azure_test_firmware_errors import testtools.UART_interface.serial_settings as serial_settings import testtools.UART_interface.serial_commands_dict as commands_dict except: import azure_test_firmware_errors import serial_settings import serial_commands_dict as commands_dict from base_uart_interface import uart_interface # ------- global usecase fcns ------- def device_setup(): pass def check_sdk_errors(line): local_line = line.lower() if "error" in local_line or "fail" in local_line: if "epoch time failed!" in local_line: # don't count NTP retries. azure_test_firmware_errors.SDK_ERRORS += 0 else: azure_test_firmware_errors.SDK_ERRORS += 1 # missing test_failures method, may not need due to this being a sample, # not a series of tests def check_firmware_errors(line): if azure_test_firmware_errors.iot_init_failure in line: print("Failed to connect to saved IoT Hub!") azure_test_firmware_errors.SDK_ERRORS += 1 elif azure_test_firmware_errors.wifi_failure in line: print("Failed to connect to saved WiFi network.") azure_test_firmware_errors.SDK_ERRORS += 1 # ------- interface class ------- class esp_uart_interface(uart_interface): message_callbacks = 0 messages_sent = 5 def check_sample_errors(self, line): if "Confirmation callback" in line: self.message_callbacks += 1 # If there is a sudden disconnect, program should report line in input script reached, and close files. # method to write to serial line with connection monitoring def serial_write(self, ser, message, file=None): # Check that the device is no longer sending bytes if ser.in_waiting: self.serial_read(ser, message, file) # Check that the serial connection is open if ser.writable(): wait = serial_settings.mxchip_buf_pause # wait for at least 50ms between 128 byte writes. buf = bytearray((message.strip() + '\r\n').encode('ascii')) buf_len = len(buf) # needed for loop as buf is a destructed list bytes_written = 0 timeout = time.time() while bytes_written < buf_len: temp_written = ser.write(buf[:128]) buf = buf[temp_written:] bytes_written += temp_written time.sleep(wait) if (time.time() - timeout > serial_settings.serial_comm_timeout): break return bytes_written else: try: time.sleep(2) ser.open() self.serial_write(ser, message, file) except: return 0 # Read from serial line with connection monitoring # If there is a sudden disconnect, program should report line in input script reached, and close files. def serial_read(self, ser, message, file, first_read=False): # Special per opt handling: if "exit" in message and first_read: time.sleep(serial_settings.wait_for_flash) output = ser.in_waiting while output < 4: time.sleep(1) output = ser.in_waiting print("%d bytes in waiting" % output) if ser.readable(): output = ser.readline(ser.in_waiting) output = output.decode(encoding='utf-8', errors='ignore') check_firmware_errors(output) check_sdk_errors(output) self.check_sample_errors(output) print(output) try: # File must exist to write to it file.writelines(output) except: pass return output else: try: time.sleep(2) ser.open() self.serial_read(ser, message, file, first_read=True) except: return None def write_read(self, ser, input_file, output_file): if 'esp32' in serial_settings.device_type: serial_settings.bits_to_cache = 800 serial_settings.baud_rate = 1000000 else: serial_settings.bits_to_cache = 1600 serial_settings.baud_rate = 115200 session_start = time.time() if input_file: # set wait between read/write wait = (serial_settings.bits_to_cache/serial_settings.baud_rate) with open(input_file) as input_file_obj: # initialize input/output_file line = input_file_obj.readline() output_file_obj = open(output_file, 'w+') # Output file while line: # Print only instruction, not secret content if line.split(): print("Sending %s" %line.split()[0]) # Attempt to write to serial port if not self.serial_write(ser, line, output_file_obj): print("Failed to write to serial port, please diagnose connection.") output_file_obj.close() break time.sleep(1) # Attempt to read serial port output = self.serial_read(ser, line, output_file_obj, first_read=True) while(output): time.sleep(wait) output = self.serial_read(ser, line, output_file_obj) if "done with sending" in output: serial_settings.tests_run = True line = input_file_obj.readline() if serial_settings.test_timeout: print("Test input end. Waiting for results. Time Elapsed: ", (time.time() - session_start)) while((time.time() - session_start) < serial_settings.test_timeout): time.sleep(.2) output = self.serial_read(ser, line, output_file_obj) check_sdk_errors(output) #for now we can assume one test suite is run if "done with sending" in output or serial_settings.tests_run: break print("Test iteration ended. Time Elapsed: ", (time.time() - session_start)) else: # read any trailing output, save to file while (ser.in_waiting): time.sleep(.2) output = self.serial_read(ser, line, output_file_obj) check_sdk_errors(output) # forward failed callbacks to SDK_ERRORS azure_test_firmware_errors.SDK_ERRORS += self.messages_sent - self.message_callbacks with open('exitcode.txt', 'w') as fexit: fexit.write('%d' %azure_test_firmware_errors.SDK_ERRORS) output_file_obj.close()