in source/code/handlers/execution_handler.py [0:0]
def _handle_task_execution(self):
def execute_timed_out():
"""
Function is called when the handling of the request times out
:return:
"""
time_used = int(int(os.getenv(handlers.ENV_LAMBDA_TIMEOUT)) - self._context.get_remaining_time_in_millis() / 1000)
self._logger.error(ERR_EXECUTION_NOT_COMPLETED, time_used)
if self.action_properties.get(actions.ACTION_EXECUTE_SIZE, None) is not None:
self._logger.error(ERR_TIMEOUT, self.task)
self._timeout_event.set()
self._logger.flush()
self._timer.cancel()
def handle_metrics(result):
self._logger.info(INF_SENDING_METRICS_DATA, "enabled" if allow_send_metrics() else "disabled")
if allow_send_metrics():
try:
result_data = result if isinstance(result, dict) else json.loads(result)
if actions.METRICS_DATA in result_data:
send_metrics_data(metrics_data=result_data[actions.METRICS_DATA], logger=self._logger)
except Exception as ex:
self._logger.warning(WARN_METRICS_DATA, str(ex))
self._logger.info(INF_ACTION, self.action, self.action_id, self.task, safe_json(self.action_parameters, indent=3))
if not handlers.running_local(self._context):
self._logger.info(INF_LAMBDA_MEMORY, self._context.function_name, self._context.memory_limit_in_mb)
self._logger.debug("Setting task state to {}", handlers.STATUS_STARTED)
self._action_tracking.update_task(self.action_id, self.task, task_metrics=self.metrics, status=handlers.STATUS_STARTED)
start = time.time()
return_data = {
"task": self.task,
"action": self.action,
"id": self.action_id,
"dryrun": self.dryrun,
}
if self._context is not None:
execution_time_left = (self._context.get_remaining_time_in_millis() / 1000.00) - EXECUTE_TIME_REMAINING
self._timer = threading.Timer(execution_time_left, execute_timed_out)
self._timer.start()
try:
self._logger.debug("Start executing task")
action_result = self._action_instance.execute()
if isinstance(action_result, str):
action_result = json.loads(action_result)
finally:
if self._timer is not None:
self._timer.cancel()
if self._timeout_event.is_set():
raise Exception("Timeout execution action")
if not self._action_instance.properties.get(actions.ACTION_INTERNAL, False):
handle_metrics(action_result)
execution_time = int(time.time() - start)
self._logger.debug("Task needs{}completion", " no" if not self.has_completion else " ")
if not self.has_completion or self.dryrun:
self._logger.debug("Setting state of task to {} ", handlers.STATUS_COMPLETED)
self._action_tracking.update_task(action_id=self.action_id, task=self.task, task_metrics=self.metrics,
status=handlers.STATUS_COMPLETED,
status_data={
handlers.TASK_TR_STARTED_TS: int(start),
handlers.TASK_TR_RESULT: action_result,
handlers.TASK_TR_EXECUTION_TIME: str(execution_time),
handlers.TASK_TR_EXECUTION_LOGSTREAM: self.execution_log_stream
})
# noinspection PyBroadException
try:
self._logger.info(INF_ACTION_RESULT, execution_time, safe_json(action_result, indent=3))
except Exception:
self._logger.info(INF_ACTION_RESULT, execution_time, str(action_result))
else:
# the action has a method for testing completion of the task, set the status to waiting and store the result
# of the execution that started the action as start result that will be passed to the completion method together
self._logger.debug("Setting state of task to {} ", handlers.STATUS_WAIT_FOR_COMPLETION)
self._action_tracking.update_task(action_id=self.action_id,
task=self.task,
task_metrics=self.metrics,
status=handlers.STATUS_WAIT_FOR_COMPLETION,
status_data={
handlers.TASK_TR_LAST_WAIT_COMPLETION: datetime.now().isoformat(),
handlers.TASK_TR_STARTED_TS: int(start),
handlers.TASK_TR_START_RESULT: action_result,
handlers.TASK_TR_START_EXECUTION_TIME: str(execution_time),
handlers.TASK_TR_EXECUTION_LOGSTREAM: self.execution_log_stream
})
self._logger.info(INF_STARTED_AND_WAITING_FOR_COMPLETION, safe_json(action_result, indent=3))
if not handlers.running_local(self._context):
rule = handlers.enable_completion_cloudwatch_rule(self._context)
self._logger.info(INF_RULE_ENABLED, rule)
else:
self._logger.info(INF_SIMULATION_MODE_NO_RULE_ENABLED)
# no exception from action
return_data.update({
"result": handlers.STATUS_WAIT_FOR_COMPLETION if self.has_completion else handlers.STATUS_COMPLETED,
"action-result": str(action_result),
"datetime": datetime.now().isoformat(),
"running-time": str(execution_time),
"task-group": self._event[handlers.TASK_TR_GROUP],
"task-id": self._event[handlers.TASK_TR_ID]
})
return safe_dict(return_data)