def handle_request()

in source/code/handlers/task_tracking_handler.py [0:0]


    def handle_request(self):
        """
        Handles the event triggered by updates to the actions tracking table.
        :return: results of handling selected updates
        """

        def tasks_items_to_execute():
            """
            Generator function that selects all record items from the event that need processing.
            :return:
            """

            def table_name(rec):
                source_arn = rec["eventSourceARN"]
                return source_arn.split("/")[1]

            def from_tracking_table(rec):
                return table_name(rec) == os.getenv(handlers.ENV_ACTION_TRACKING_TABLE)

            def from_concurrency_table(rec):
                return table_name(rec) == os.getenv(handlers.ENV_CONCURRENCY_TABLE)

            def get_old_image(task_record):
                return task_record["dynamodb"].get("OldImage", {})

            def get_new_image(task_record):
                return task_record["dynamodb"].get("NewImage", {})

            def get_new_status(task_record):
                return get_new_image(task_record).get(handlers.TASK_TR_STATUS, {}).get("S")

            def get_old_status(task_record):
                return get_new_image(task_record).get(handlers.TASK_TR_STATUS, {}).get("S")

            def is_task_tracking_table_update(task_record):
                if not from_tracking_table(task_record):
                    return False
                return task_record["eventName"] in ["UPDATE", "MODIFY"]

            def is_task_done(task_record):

                if not is_task_tracking_table_update(task_record):
                    return False

                new_status = get_new_status(task_record)
                old_status = get_old_status(task_record)

                if old_status != new_status:
                    return False
                return new_status in handlers.task_tracking_table.NOT_LONGER_ACTIVE_STATUSES

            def is_task_with_concurrency(task_record):
                return get_new_image(task_record).get(handlers.TASK_TR_CONCURRENCY_KEY, {}).get("S") is not None

            def get_old_last_update(task_record):
                return get_old_image(task_record).get(handlers.TASK_TR_LAST_WAIT_COMPLETION, {}).get("S")

            def get_new_last_update(task_record):
                return get_new_image(task_record).get(handlers.TASK_TR_LAST_WAIT_COMPLETION, {}).get("S")

            def is_delete_task(task_record):
                return from_tracking_table(r) and task_record["eventName"] == "REMOVE"

            def is_new_task(task_record):
                if from_tracking_table(r) and task_record["eventName"] == "INSERT":
                    return get_new_status(task_record) == handlers.STATUS_PENDING
                return False

            def is_completed_with_concurrency(task_record):
                return is_task_done(task_record) and is_task_with_concurrency(task_record)

            def is_completed_without_concurrency(task_record):

                return is_task_done(task_record) and not is_task_with_concurrency(task_record)

            def is_wait_for_completion(task_record):

                if not is_task_tracking_table_update(task_record):
                    return False

                if get_old_status(task_record) != handlers.STATUS_WAIT_FOR_COMPLETION or \
                        get_new_status(task_record) != handlers.STATUS_WAIT_FOR_COMPLETION:
                    return False

                return get_old_last_update(task_record) != get_new_last_update(task_record)

            def is_concurrency_task_completed(concurrency_record):
                if not from_concurrency_table(concurrency_record):
                    return False

                if concurrency_record["eventName"] == "REMOVE":
                    return False

                return concurrency_record["dynamodb"].get("NewImage", {}).get("RunNext", {}).get("BOOL", False)

            def get_action_type(rec):

                if is_new_task(rec):
                    return NEW_TASK

                if is_completed_without_concurrency(rec):
                    return FINISHED_TASK

                if is_completed_with_concurrency(rec):
                    return FINISHED_CONCURRENCY_TASK

                if is_wait_for_completion(rec):
                    return CHECK_COMPLETION

                if is_delete_task(rec):
                    return DELETE_ITEM

                if is_concurrency_task_completed(rec):
                    return START_WAITING_ACTION

                return None

            for r in self._event.get("Records"):

                self._logger.debug("Record to process is {}", safe_json(r, indent=2))

                if r.get("eventSource") == "aws:dynamodb":

                    image_used = "NewImage" if "NewImage" in r["dynamodb"] else "OldImage"

                    if r["dynamodb"].get("NewImage", {}).get(handlers.TASK_TR_ACTION) is None and \
                            r["dynamodb"].get("OldImage", {}).get(handlers.TASK_TR_ACTION) is not None:
                        continue

                    self._logger.debug_enabled = r["dynamodb"][image_used].get(handlers.TASK_TR_DEBUG, {}).get("BOOL", False)

                    update_to_handle = get_action_type(r)

                    if update_to_handle is not None:
                        yield update_to_handle, r
                    else:
                        self._logger.debug("No action for record")

        try:

            start = datetime.now()

            task_handlers = [
                self._handle_new_task_item,
                self._handle_finished_task_without_completion,
                self._handle_completed_concurrency_item,
                self._handle_check_completion,
                self._handle_deleted_item,
                self._handle_start_waiting_action
            ]

            for task_tracking_update_type, record in tasks_items_to_execute():
                self.done_work = True

                used_image = "OldImage" if record["eventName"] == "REMOVE" else "NewImage"
                image = record["dynamodb"][used_image]
                handled_item = unpack_record(image)
                self._logger.debug_enabled = handled_item.get(handlers.TASK_TR_DEBUG, False)

                self._logger.debug("Executing handler function {} for type {} ({})",
                                   task_handlers[task_tracking_update_type].__name__, self.task_string(task_tracking_update_type),
                                   task_tracking_update_type)
                task_handlers[task_tracking_update_type](handled_item)

            if not self.done_work:
                self._logger.clear()

            running_time = float((datetime.now() - start).total_seconds())
            if self.done_work:
                self._logger.debug(DEBUG_RESULT, running_time)

            return safe_dict({
                "datetime": datetime.now().isoformat(),
                "waiting-for-execution": self.waiting_for_execution_tasks,
                "started-check-for-completion": self.started_completion_checks,
                "started-execution": self.started_tasks,
                "started-waiting": self.started_waiting_tasks,
                "completed-concurrency-tasks": self.finished_concurrency_tasks,
                "running-time": running_time
            })

        finally:
            self._logger.flush()