marketplace/deployer_util/run_tester.py (80 lines of code) (raw):

#!/usr/bin/env python3 # # Copyright 2018 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import time import log_util as log from argparse import ArgumentParser from bash_util import Command from bash_util import CommandException from dict_util import deep_get from yaml_util import load_resources_yaml _PROG_HELP = "Deploy and run tester pods and wait for them to finish execution" def main(): parser = ArgumentParser(description=_PROG_HELP) parser.add_argument('--namespace') parser.add_argument('--manifest') parser.add_argument('--timeout', type=int, default=300) args = parser.parse_args() try: Command( ''' kubectl apply --namespace="{}" --filename="{}" '''.format(args.namespace, args.manifest), print_call=True) except CommandException as ex: log.error("Failed to apply tester job. Reason: {}", ex.message) return resources = load_resources_yaml(args.manifest) test_failed = False for resource_def in resources: full_name = "{}/{}".format(resource_def['kind'], deep_get(resource_def, 'metadata', 'name')) if resource_def['kind'] != 'Pod': log.info("Skip '{}'", full_name) continue start_time = time.time() poll_interval = 4 tester_timeout = args.timeout while True: try: resource = Command( ''' kubectl get "{}" --namespace="{}" -o=json '''.format(full_name, args.namespace), print_call=True).json() except CommandException as ex: log.info(str(ex)) log.info("retrying") time.sleep(poll_interval) continue result = deep_get(resource, 'status', 'phase') if result == "Failed": print_tester_logs(full_name, args.namespace) log.error("Tester '{}' failed.", full_name) test_failed = True break if result == "Succeeded": print_tester_logs(full_name, args.namespace) log.info("Tester '{}' succeeded.", full_name) break if time.time() - start_time > tester_timeout: print_tester_logs(full_name, args.namespace) log.error("Tester '{}' timeout.", full_name) test_failed = True break time.sleep(poll_interval) if test_failed: sys.exit("At least 1 test failed or timed out.") def print_tester_logs(full_name, namespace): try: Command( 'kubectl logs {} --namespace="{}"'.format(full_name, namespace), print_call=True, print_result=True) except CommandException as ex: log.error(str(ex)) log.error("Failed to get the tester logs.") if __name__ == "__main__": main()