in source/code/testing/task_test_runner.py [0:0]
def run(self, parameters,
complete_check_polling_interval=60,
task_timeout=None,
task_name=None,
datetime_delta=None,
events=None,
tag_filter=None,
run_in_regions=None,
action_select_params=None,
debug=False,
run_after_select=None):
self.results = []
self.executed_tasks = []
self.parameters = parameters
self.action_select_parameters = action_select_params if action_select_params is not None else {}
self.task_name = task_name if task_name is not None else "{}-test".format(self.action_name).lower()
self._ensure_action_stack()
self.run_after_select = run_after_select
self.context = Context()
self._events = events if events is not None else {}
self._tag_filter = tag_filter
save_debug = self.debug
self.debug = debug
self.logger._debug = self.debug
self.interval = parameters.get(actions.ACTION_PARAM_INTERVAL, None)
self.run_in_regions = run_in_regions if run_in_regions is not None else [self.test_region]
try:
if datetime_delta is not None:
set_datetime_delta(datetime_delta)
actions.set_date_time_provider(DatetimeProvider)
self.logger.test("Setting simulated test execution date and time to {}", actions.date_time_provider().now())
for executed_task in self._get_tasks_to_execute():
try:
self.executed_tasks.append(executed_task)
self.logger.test("Start execution of action {} using assumed role {} in region {}", self.action_name,
self._assumed_role, self.tested_region)
start_result = executed_task.execute()
if not executed_task.get(actions.ACTION_PARAM_HAS_COMPLETION, False):
self.logger.test("Action completed with result {}", safe_json(start_result, indent=3))
setattr(executed_task, handlers.TASK_TR_RESULT, start_result)
setattr(executed_task, handlers.TASK_TR_STATUS, handlers.STATUS_COMPLETED)
else:
self.logger.test("Waiting for task to complete")
setattr(executed_task, handlers.TASK_TR_START_RESULT, start_result)
# noinspection PyProtectedMember
timeout = executed_task._timeout_
if timeout is None:
timeout = task_timeout * 60 if task_timeout is not None else 60
timeout *= 60
with Timer(timeout_seconds=timeout) as timer:
while True:
is_completed = getattr(executed_task, handlers.COMPLETION_METHOD, None)
if is_completed is None:
raise Exception("Tested action needs completion but does not implement the required {} method",
handlers.COMPLETION_METHOD)
complete_result = executed_task.is_completed(start_result)
if complete_result is not None:
self.logger.test("Action completed with result {}", safe_json(complete_result, indent=3))
setattr(executed_task, handlers.TASK_TR_STATUS, handlers.STATUS_COMPLETED)
setattr(executed_task, handlers.TASK_TR_RESULT, complete_result)
break
if timer.timeout:
self.logger.test("Action timed out")
setattr(executed_task, handlers.TASK_TR_STATUS, handlers.STATUS_TIMED_OUT)
setattr(executed_task, handlers.TASK_TR_ERROR, "Timeout")
break
self.logger.test("Action not completed yet, waiting {} seconds", complete_check_polling_interval)
time.sleep(complete_check_polling_interval)
self.results.append(executed_task)
except Exception as ex:
self.logger.test("Action failed {}", str(ex))
setattr(executed_task, handlers.TASK_TR_STATUS, handlers.STATUS_FAILED)
setattr(executed_task, handlers.TASK_TR_ERROR, str(ex))
self.results.append(executed_task)
finally:
if datetime_delta is not None:
actions.reset_date_provider()
self.debug = save_debug
self.logger._debug = self.debug
return self.results