testtools/UART_interface/rpi_uart_interface.py (138 lines of code) (raw):
import os
import sys
import time
try:
from testtools.UART_interface.base_uart_interface import uart_interface
import testtools.UART_interface.azure_test_firmware_errors as azure_test_firmware_errors
import testtools.UART_interface.serial_settings as serial_settings
import testtools.UART_interface.serial_commands_dict as commands_dict
except:
import azure_test_firmware_errors
import serial_settings
import serial_commands_dict as commands_dict
from base_uart_interface import uart_interface
# ------- global usecase fcns -------
def device_setup():
pass
def check_sdk_errors(line):
if "Error:" in line:
azure_test_firmware_errors.SDK_ERRORS += 1
def check_test_failures(line):
if " tests ran" in line:
result = [int(s) for s in line.split() if s.isdigit()]
print("line is: " + line)
print("result is: " + str(result))
if len(result) == 2:
azure_test_firmware_errors.SDK_ERRORS = result[0]
else:
azure_test_firmware_errors.SDK_ERRORS = result[1]
serial_settings.tests_run = True
return False
return True
def check_firmware_errors(line):
if azure_test_firmware_errors.iot_init_failure in line:
print("Failed to connect to saved IoT Hub!")
azure_test_firmware_errors.SDK_ERRORS += 1
elif azure_test_firmware_errors.sensor_init_failure in line:
print("Failed to init mxchip sensor")
azure_test_firmware_errors.SDK_ERRORS += 1
elif azure_test_firmware_errors.wifi_failure in line:
print("Failed to connect to saved WiFi network.")
azure_test_firmware_errors.SDK_ERRORS += 1
# ------- interface class -------
class rpi_uart_interface(uart_interface):
# If there is a sudden disconnect, program should report line in input script reached, and close files.
# method to write to serial line with connection monitoring
def serial_write(self, ser, message, file=None):
# Check that the device is no longer sending bytes
if ser.in_waiting:
self.serial_read(ser, message, file)
# Check that the serial connection is open
if ser.writable():
# special handling for sending a file
if "sz " in message:
temp_written = ser.write(bytearray("rz\r\n".encode('ascii')))
os.system(message)
# check for completion
output = ser.readline(ser.in_waiting)
output = output.decode(encoding='utf-8', errors='ignore')
# print(output)
start_time = time.time()
while ("Transfer" not in output) and (time.time() - start_time) < serial_settings.wait_for_flash:
time.sleep(.01)
output = ser.readline(ser.in_waiting)
output = output.decode(encoding='utf-8', errors='ignore')
if "incomplete" in output:
serial_settings.tests_run = True
azure_test_firmware_errors.SDK_ERRORS += 1
return 0
if "incomplete" in output:
serial_settings.tests_run = True
azure_test_firmware_errors.SDK_ERRORS += 1
return 0
return temp_written
# special handling for receiving a file, Note: input file should use rz <filepath> when a file from the RPi is desired
elif "rz " in message:
os.system('rz')
message.replace('rz ', 'sz ')
buf = bytearray((message).encode('ascii'))
print(buf)
bytes_written = ser.write(buf)
return bytes_written
else:
try:
time.sleep(2)
ser.open()
self.serial_write(ser, message, file)
except:
return 0
# Read from serial line with connection monitoring
# If there is a sudden disconnect, program should report line in input script reached, and close files.
def serial_read(self, ser, message, file, first_read=False):
if ser.readable():
output = ser.readline(ser.in_waiting)
output = output.decode(encoding='utf-8', errors='ignore').strip()
check_sdk_errors(output)
check_test_failures(output)
print(output)
try:
# File must exist to write to it
file.writelines(output)
except:
pass
return output
else:
try:
#This exists purely if the serial line becomes unreadable during operation, will likely never be hit, if so, check your cable
time.sleep(2)
ser.open()
self.serial_read(ser, message, file, first_read=True)
except:
return False
# Note: the buffer size on the mxchip appears to be 128 Bytes.
def write_read(self, ser, input_file, output_file):
session_start = time.time()
# set bits to cache variable to allow for mxchip to pass bytes into serial buffer
serial_settings.bits_to_cache = 800
if input_file:
# set wait between read/write
wait = (serial_settings.bits_to_cache/serial_settings.baud_rate)
with open(input_file) as fp:
# initialize input/output_file
line = fp.readline()
f = open(output_file, 'w+') # Output file
while line:
# time.sleep(.1)
# Print only instruction, not secret content
if line.split():
print("Sending %s" %line.split()[0])
# Attempt to write to serial port
if not self.serial_write(ser, line, f):
print("Failed to write to serial port, please diagnose connection.")
f.close()
break
time.sleep(1)
# Attempt to read serial port
output = self.serial_read(ser, line, f, first_read=True)
while(output):
time.sleep(wait)
output = self.serial_read(ser, line, f)
line = fp.readline()
# read any trailing output, save to file
if serial_settings.test_timeout:
print("Test input end. Waiting for results. Time Elapsed: ", (time.time() - session_start))
while((time.time() - session_start) < serial_settings.test_timeout):
time.sleep(0)
output = ser.readline(ser.in_waiting)
output = output.decode(encoding='utf-8', errors='ignore')
check_test_failures(output)
if len(output) > 4:
print(output)
#for now we can assume one test suite is run
if " tests run" in output or serial_settings.tests_run:
break
print("Test iteration ended. Time Elapsed: ", (time.time() - session_start))
else:
while (ser.in_waiting):
time.sleep(.2)
output = self.serial_read(ser, line, f)
check_sdk_errors(output)
# reset after every test if setting
if serial_settings.reset_device:
ser.write(bytearray("sudo reboot\n".encode("ascii")))
f.close()