def _handle_task_execution()

in source/code/handlers/execution_handler.py [0:0]


    def _handle_task_execution(self):

        def execute_timed_out():
            """
            Function is called when the handling of the request times out
            :return:
            """
            time_used = int(int(os.getenv(handlers.ENV_LAMBDA_TIMEOUT)) - self._context.get_remaining_time_in_millis() / 1000)
            self._logger.error(ERR_EXECUTION_NOT_COMPLETED, time_used)

            if self.action_properties.get(actions.ACTION_EXECUTE_SIZE, None) is not None:
                self._logger.error(ERR_TIMEOUT, self.task)

            self._timeout_event.set()
            self._logger.flush()
            self._timer.cancel()

        def handle_metrics(result):
            self._logger.info(INF_SENDING_METRICS_DATA, "enabled" if allow_send_metrics() else "disabled")
            if allow_send_metrics():
                try:
                    result_data = result if isinstance(result, dict) else json.loads(result)
                    if actions.METRICS_DATA in result_data:
                        send_metrics_data(metrics_data=result_data[actions.METRICS_DATA], logger=self._logger)
                except Exception as ex:
                    self._logger.warning(WARN_METRICS_DATA, str(ex))

        self._logger.info(INF_ACTION, self.action, self.action_id, self.task, safe_json(self.action_parameters, indent=3))
        if not handlers.running_local(self._context):
            self._logger.info(INF_LAMBDA_MEMORY, self._context.function_name, self._context.memory_limit_in_mb)

        self._logger.debug("Setting task state to {}", handlers.STATUS_STARTED)
        self._action_tracking.update_task(self.action_id, self.task, task_metrics=self.metrics, status=handlers.STATUS_STARTED)

        start = time.time()

        return_data = {
            "task": self.task,
            "action": self.action,
            "id": self.action_id,
            "dryrun": self.dryrun,
        }

        if self._context is not None:
            execution_time_left = (self._context.get_remaining_time_in_millis() / 1000.00) - EXECUTE_TIME_REMAINING
            self._timer = threading.Timer(execution_time_left, execute_timed_out)
            self._timer.start()

        try:
            self._logger.debug("Start executing task")
            action_result = self._action_instance.execute()
            if isinstance(action_result, str):
                action_result = json.loads(action_result)
        finally:
            if self._timer is not None:
                self._timer.cancel()
                if self._timeout_event.is_set():
                    raise Exception("Timeout execution action")

        if not self._action_instance.properties.get(actions.ACTION_INTERNAL, False):
            handle_metrics(action_result)

        execution_time = int(time.time() - start)
        self._logger.debug("Task needs{}completion", " no" if not self.has_completion else " ")
        if not self.has_completion or self.dryrun:

            self._logger.debug("Setting state of task to {} ", handlers.STATUS_COMPLETED)
            self._action_tracking.update_task(action_id=self.action_id, task=self.task, task_metrics=self.metrics,
                                              status=handlers.STATUS_COMPLETED,
                                              status_data={
                                                  handlers.TASK_TR_STARTED_TS: int(start),
                                                  handlers.TASK_TR_RESULT: action_result,
                                                  handlers.TASK_TR_EXECUTION_TIME: str(execution_time),
                                                  handlers.TASK_TR_EXECUTION_LOGSTREAM: self.execution_log_stream
                                              })
            # noinspection PyBroadException
            try:
                self._logger.info(INF_ACTION_RESULT, execution_time, safe_json(action_result, indent=3))
            except Exception:
                self._logger.info(INF_ACTION_RESULT, execution_time, str(action_result))

        else:
            # the action has a method for testing completion of the task, set the status to waiting and store the result
            # of the execution that started the action as start result that will be passed to the completion method together
            self._logger.debug("Setting state of task to {} ", handlers.STATUS_WAIT_FOR_COMPLETION)
            self._action_tracking.update_task(action_id=self.action_id,
                                              task=self.task,
                                              task_metrics=self.metrics,
                                              status=handlers.STATUS_WAIT_FOR_COMPLETION,
                                              status_data={
                                                  handlers.TASK_TR_LAST_WAIT_COMPLETION: datetime.now().isoformat(),
                                                  handlers.TASK_TR_STARTED_TS: int(start),
                                                  handlers.TASK_TR_START_RESULT: action_result,
                                                  handlers.TASK_TR_START_EXECUTION_TIME: str(execution_time),
                                                  handlers.TASK_TR_EXECUTION_LOGSTREAM: self.execution_log_stream
                                              })

            self._logger.info(INF_STARTED_AND_WAITING_FOR_COMPLETION, safe_json(action_result, indent=3))

            if not handlers.running_local(self._context):
                rule = handlers.enable_completion_cloudwatch_rule(self._context)
                self._logger.info(INF_RULE_ENABLED, rule)
            else:
                self._logger.info(INF_SIMULATION_MODE_NO_RULE_ENABLED)

        # no exception from action
        return_data.update({
            "result": handlers.STATUS_WAIT_FOR_COMPLETION if self.has_completion else handlers.STATUS_COMPLETED,
            "action-result": str(action_result),
            "datetime": datetime.now().isoformat(),
            "running-time": str(execution_time),
            "task-group": self._event[handlers.TASK_TR_GROUP],
            "task-id": self._event[handlers.TASK_TR_ID]
        })

        return safe_dict(return_data)