deviceadvisor/script/DATestRun.py (265 lines of code) (raw):

import boto3 import uuid import json import os import pprint import subprocess import re import random import sys from time import sleep ############################################## # Cleanup Certificates and Things and created certificate and private key file def delete_thing_with_certi(thingName, certiId, certiArn): client.detach_thing_principal( thingName=thingName, principal=certiArn) client.update_certificate( certificateId=certiId, newStatus='INACTIVE') client.delete_certificate(certificateId=certiId, forceDelete=True) client.delete_thing(thingName=thingName) os.remove(os.environ["DA_CERTI"]) os.remove(os.environ["DA_KEY"]) # Export the testing log and upload it to S3 bucket def process_logs(log_group, log_stream, thing_name): logs_client = boto3.client('logs') response = logs_client.get_log_events( logGroupName=log_group, logStreamName=log_stream ) log_file = "DA_Log_Java_" + thing_name + ".log" f = open(log_file, 'w') for event in response["events"]: f.write(event['message']) f.close() try: s3_bucket_name = secrets_client.get_secret_value( SecretId="ci/DeviceAdvisor/s3bucket")["SecretString"] s3.Bucket(s3_bucket_name).upload_file(log_file, log_file) print("[Device Advisor] Device Advisor Log file uploaded to " + log_file, file=sys.stderr) except Exception: print( "[Device Advisor] Error: could not store log in S3 bucket!", file=sys.stderr) os.remove(log_file) # Sleep for a random time def sleep_with_backoff(base, max): sleep(random.randint(base, max)) ############################################## # Initialize variables # create aws clients try: client = boto3.client('iot', region_name=os.environ["AWS_DEFAULT_REGION"]) dataClient = boto3.client( 'iot-data', region_name=os.environ["AWS_DEFAULT_REGION"]) deviceAdvisor = boto3.client( 'iotdeviceadvisor', region_name=os.environ["AWS_DEFAULT_REGION"]) s3 = boto3.resource('s3', region_name=os.environ["AWS_DEFAULT_REGION"]) secrets_client = boto3.client( "secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"]) except Exception: print("[Device Advisor] Error: could not create boto3 clients.", file=sys.stderr) exit(-1) # const BACKOFF_BASE = 5 BACKOFF_MAX = 10 # 60 minutes divided by the maximum back-off = longest time a DA run can last with this script. MAXIMUM_CYCLE_COUNT = (3600 / BACKOFF_MAX) # Did Device Advisor fail a test? If so, this should be true did_at_least_one_test_fail = False # load test config f = open('deviceadvisor/script/DATestConfig.json') DATestConfig = json.load(f) f.close() # create an temporary certificate/key file path certificate_path = os.path.join(os.getcwd(), 'certificate.pem.crt') key_path = os.path.join(os.getcwd(), 'private.pem.key') # load environment variables requried for testing shadowProperty = os.environ['DA_SHADOW_PROPERTY'] shadowDefault = os.environ['DA_SHADOW_VALUE_DEFAULT'] ############################################## # make sure sdk get installed print("[Device Advisor]Info: Start to build sdk...", file=sys.stderr) subprocess.run("mvn clean install -Dmaven.test.skip=true", shell=True) # Pretty printer for Device Advisor responds pp = pprint.PrettyPrinter(stream=sys.stderr) # test result test_result = {} for test_suite in DATestConfig['test_suites']: test_name = test_suite['test_name'] disabled = test_suite.get('disabled', False) if disabled: print("[Device Advisor] Info: " f"{test_name} test suite is disabled, skipping", file=sys.stderr) continue ############################################## # create a test thing thing_name = "DATest_" + str(uuid.uuid4()) try: thing_group = secrets_client.get_secret_value( SecretId="ci/DeviceAdvisor/thing_group")["SecretString"] # create_thing_response: # { # 'thingName': 'string', # 'thingArn': 'string', # 'thingId': 'string' # } print("[Device Advisor] Info: Started to create thing " f"'{thing_name}'", file=sys.stderr) create_thing_response = client.create_thing( thingName=thing_name ) os.environ["DA_THING_NAME"] = thing_name # Some tests (e.g. Jobs) require the tested things to be a part of the DA group thing. client.add_thing_to_thing_group( thingGroupName=thing_group, thingName=thing_name, ) except Exception as e: print(f"[Device Advisor] Error: Failed to create thing '{thing_name}'; " f"exception: {e}", file=sys.stderr) exit(-1) ############################################## # create certificate and keys used for testing try: print("[Device Advisor] Info: Started to create certificate...", file=sys.stderr) # create_cert_response: # { # 'certificateArn': 'string', # 'certificateId': 'string', # 'certificatePem': 'string', # 'keyPair': # { # 'PublicKey': 'string', # 'PrivateKey': 'string' # } # } create_cert_response = client.create_keys_and_certificate( setAsActive=True ) # write certificate to file f = open(certificate_path, "w") f.write(create_cert_response['certificatePem']) f.close() # write private key to file f = open(key_path, "w") f.write(create_cert_response['keyPair']['PrivateKey']) f.close() # setup environment variable os.environ["DA_CERTI"] = certificate_path os.environ["DA_KEY"] = key_path except Exception: try: client.delete_thing(thingName=thing_name) except Exception: print("[Device Advisor] Error: Could not delete thing.", file=sys.stderr) print("[Device Advisor] Error: Failed to create certificate.", file=sys.stderr) exit(-1) certificate_arn = create_cert_response['certificateArn'] certificate_id = create_cert_response['certificateId'] ############################################## # attach policy to certificate try: policy_name = secrets_client.get_secret_value( SecretId="ci/DeviceAdvisor/policy_name")["SecretString"] client.attach_policy( policyName=policy_name, target=certificate_arn ) except Exception as ex: print(ex, file=sys.stderr) delete_thing_with_certi(thing_name, certificate_id, certificate_arn) print("[Device Advisor] Error: Failed to attach policy.", file=sys.stderr) exit(-1) ############################################## # attach certification to thing try: print( "[Device Advisor] Info: Attach certificate to test thing...", file=sys.stderr) # attache the certificate to thing client.attach_thing_principal( thingName=thing_name, principal=create_cert_response['certificateArn'] ) except Exception: delete_thing_with_certi(thing_name, certificate_id, certificate_arn) print("[Device Advisor] Error: Failed to attach certificate.", file=sys.stderr) exit(-1) ############################################## # Run device advisor try: ###################################### # set default shadow, for shadow update, if the # shadow does not exists, update will fail print("[Device Advisor] Info: About to update shadow.", file=sys.stderr) payload_shadow = json.dumps( { "state": { "desired": { shadowProperty: shadowDefault }, "reported": { shadowProperty: shadowDefault } } }) shadow_response = dataClient.update_thing_shadow( thingName=thing_name, payload=payload_shadow) get_shadow_response = dataClient.get_thing_shadow(thingName=thing_name) # make sure shadow is created before we go to next step print("[Device Advisor] Info: About to wait for shadow update.", file=sys.stderr) while (get_shadow_response is None): get_shadow_response = dataClient.get_thing_shadow( thingName=thing_name) # start device advisor test # test_start_response # { # 'suiteRunId': 'string', # 'suiteRunArn': 'string', # 'createdAt': datetime(2015, 1, 1) # } print("[Device Advisor] Info: Start device advisor test: " + test_name, file=sys.stderr) sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX) test_start_response = deviceAdvisor.start_suite_run( suiteDefinitionId=test_suite['test_suite_id'], suiteRunConfiguration={ 'primaryDevice': { 'thingArn': create_thing_response['thingArn'], }, 'parallelRun': True }) # get DA endpoint print("[Device Advisor] Info: Getting Device Advisor endpoint.", file=sys.stderr) endpoint_response = deviceAdvisor.get_endpoint( thingArn=create_thing_response['thingArn'] ) os.environ['DA_ENDPOINT'] = endpoint_response['endpoint'] cycle_number = 0 # This flag is needed to handle the case when some problem occurred on Device Advisor side (e.g. connect fails) test_executed = False while True: cycle_number += 1 if (cycle_number >= MAXIMUM_CYCLE_COUNT): print(f"[Device Advisor] Error: {cycle_number} of cycles lasting " f"{BACKOFF_BASE} to {BACKOFF_MAX} seconds have passed.", file=sys.stderr) raise Exception(f"ERROR - {cycle_number} of cycles lasting " f"{BACKOFF_BASE} to {BACKOFF_MAX} seconds have passed.") # Add backoff to avoid TooManyRequestsException sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX) print( "[Device Advisor] Info: About to get Device Advisor suite run.", file=sys.stderr) test_result_responds = deviceAdvisor.get_suite_run( suiteDefinitionId=test_suite['test_suite_id'], suiteRunId=test_start_response['suiteRunId'] ) print( "[Device Advisor] Debug: deviceAdvisor.get_suite_run respond:", file=sys.stderr) pp.pprint(test_result_responds) # If the status is PENDING or the responds does not loaded, the test suite is still loading if (test_result_responds['status'] == 'PENDING' or # test group has not been loaded len(test_result_responds['testResult']['groups']) == 0 or # test case has not been loaded len(test_result_responds['testResult']['groups'][0]['tests']) == 0 or test_result_responds['testResult']['groups'][0]['tests'][0]['status'] == 'PENDING'): continue # Start to run the test sample after the status turns into RUNNING elif (not test_executed and test_result_responds['status'] == 'RUNNING' and test_result_responds['testResult']['groups'][0]['tests'][0]['status'] == 'RUNNING'): print( "[Device Advisor] Info: About to get start Device Advisor companion test application.", file=sys.stderr) working_dir = os.getcwd() exe_path = os.path.join( "deviceadvisor/tests/", test_suite['test_exe_path']) os.chdir(exe_path) print("[Device Advisor] Debug: CWD: " + os.getcwd(), file=sys.stderr) run_cmd = 'mvn clean compile exec:java -Dexec.mainClass=' + \ test_suite['test_exe_path'] + '.' + \ test_suite['test_exe_path'] if 'cmd_args' in test_suite: run_cmd = run_cmd + ' -Dexec.args="' + \ test_suite['cmd_args'] + '"' print("[Device Advisor] Debug: run_cmd:" + run_cmd, file=sys.stderr) result = subprocess.run(run_cmd, shell=True, timeout=60*2) print("[Device Advisor] Debug: result: ", result, file=sys.stderr) if result.returncode == 0: # Once the SDK test completes successfully, we assume that Device Advisor service received # and processed all requests. test_executed = True os.chdir(working_dir) # If the test finalizing or store the test result elif (test_result_responds['status'] != 'RUNNING'): test_result[test_name] = test_result_responds['status'] # If the test failed, upload the logs to S3 before clean up if (test_result[test_name] != "PASS"): print( "[Device Advisor] Info: About to upload log to S3.", file=sys.stderr) log_url = test_result_responds['testResult']['groups'][0]['tests'][0]['logUrl'] group_string = re.search('group=(.*);', log_url) log_group = group_string.group(1) stream_string = re.search('stream=(.*)', log_url) log_stream = stream_string.group(1) process_logs(log_group, log_stream, thing_name) delete_thing_with_certi( thing_name, certificate_id, certificate_arn) break except Exception as e: delete_thing_with_certi(thing_name, certificate_id, certificate_arn) print(f"[Device Advisor] Error: Failed to test: {test_name}; exception: {e}", file=sys.stderr) did_at_least_one_test_fail = True sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX) ############################################## # print result and cleanup things print(test_result, file=sys.stderr) failed = False for test in test_result: if (test_result[test] != "PASS" and test_result[test] != "PASS_WITH_WARNINGS"): print("[Device Advisor]Error: Test \"" + test + "\" Failed with status:" + test_result[test], file=sys.stderr) failed = True if failed: # if the test failed, we dont clean the Thing so that we can track the error exit(-1) if (did_at_least_one_test_fail == True): print("[Device Advisor] At least one test failed!", file=sys.stderr) exit(-1) exit(0)