in source/code/handlers/task_tracking_handler.py [0:0]
def handle_request(self):
"""
Handles the event triggered by updates to the actions tracking table.
:return: results of handling selected updates
"""
def tasks_items_to_execute():
"""
Generator function that selects all record items from the event that need processing.
:return:
"""
def table_name(rec):
source_arn = rec["eventSourceARN"]
return source_arn.split("/")[1]
def from_tracking_table(rec):
return table_name(rec) == os.getenv(handlers.ENV_ACTION_TRACKING_TABLE)
def from_concurrency_table(rec):
return table_name(rec) == os.getenv(handlers.ENV_CONCURRENCY_TABLE)
def get_old_image(task_record):
return task_record["dynamodb"].get("OldImage", {})
def get_new_image(task_record):
return task_record["dynamodb"].get("NewImage", {})
def get_new_status(task_record):
return get_new_image(task_record).get(handlers.TASK_TR_STATUS, {}).get("S")
def get_old_status(task_record):
return get_new_image(task_record).get(handlers.TASK_TR_STATUS, {}).get("S")
def is_task_tracking_table_update(task_record):
if not from_tracking_table(task_record):
return False
return task_record["eventName"] in ["UPDATE", "MODIFY"]
def is_task_done(task_record):
if not is_task_tracking_table_update(task_record):
return False
new_status = get_new_status(task_record)
old_status = get_old_status(task_record)
if old_status != new_status:
return False
return new_status in handlers.task_tracking_table.NOT_LONGER_ACTIVE_STATUSES
def is_task_with_concurrency(task_record):
return get_new_image(task_record).get(handlers.TASK_TR_CONCURRENCY_KEY, {}).get("S") is not None
def get_old_last_update(task_record):
return get_old_image(task_record).get(handlers.TASK_TR_LAST_WAIT_COMPLETION, {}).get("S")
def get_new_last_update(task_record):
return get_new_image(task_record).get(handlers.TASK_TR_LAST_WAIT_COMPLETION, {}).get("S")
def is_delete_task(task_record):
return from_tracking_table(r) and task_record["eventName"] == "REMOVE"
def is_new_task(task_record):
if from_tracking_table(r) and task_record["eventName"] == "INSERT":
return get_new_status(task_record) == handlers.STATUS_PENDING
return False
def is_completed_with_concurrency(task_record):
return is_task_done(task_record) and is_task_with_concurrency(task_record)
def is_completed_without_concurrency(task_record):
return is_task_done(task_record) and not is_task_with_concurrency(task_record)
def is_wait_for_completion(task_record):
if not is_task_tracking_table_update(task_record):
return False
if get_old_status(task_record) != handlers.STATUS_WAIT_FOR_COMPLETION or \
get_new_status(task_record) != handlers.STATUS_WAIT_FOR_COMPLETION:
return False
return get_old_last_update(task_record) != get_new_last_update(task_record)
def is_concurrency_task_completed(concurrency_record):
if not from_concurrency_table(concurrency_record):
return False
if concurrency_record["eventName"] == "REMOVE":
return False
return concurrency_record["dynamodb"].get("NewImage", {}).get("RunNext", {}).get("BOOL", False)
def get_action_type(rec):
if is_new_task(rec):
return NEW_TASK
if is_completed_without_concurrency(rec):
return FINISHED_TASK
if is_completed_with_concurrency(rec):
return FINISHED_CONCURRENCY_TASK
if is_wait_for_completion(rec):
return CHECK_COMPLETION
if is_delete_task(rec):
return DELETE_ITEM
if is_concurrency_task_completed(rec):
return START_WAITING_ACTION
return None
for r in self._event.get("Records"):
self._logger.debug("Record to process is {}", safe_json(r, indent=2))
if r.get("eventSource") == "aws:dynamodb":
image_used = "NewImage" if "NewImage" in r["dynamodb"] else "OldImage"
if r["dynamodb"].get("NewImage", {}).get(handlers.TASK_TR_ACTION) is None and \
r["dynamodb"].get("OldImage", {}).get(handlers.TASK_TR_ACTION) is not None:
continue
self._logger.debug_enabled = r["dynamodb"][image_used].get(handlers.TASK_TR_DEBUG, {}).get("BOOL", False)
update_to_handle = get_action_type(r)
if update_to_handle is not None:
yield update_to_handle, r
else:
self._logger.debug("No action for record")
try:
start = datetime.now()
task_handlers = [
self._handle_new_task_item,
self._handle_finished_task_without_completion,
self._handle_completed_concurrency_item,
self._handle_check_completion,
self._handle_deleted_item,
self._handle_start_waiting_action
]
for task_tracking_update_type, record in tasks_items_to_execute():
self.done_work = True
used_image = "OldImage" if record["eventName"] == "REMOVE" else "NewImage"
image = record["dynamodb"][used_image]
handled_item = unpack_record(image)
self._logger.debug_enabled = handled_item.get(handlers.TASK_TR_DEBUG, False)
self._logger.debug("Executing handler function {} for type {} ({})",
task_handlers[task_tracking_update_type].__name__, self.task_string(task_tracking_update_type),
task_tracking_update_type)
task_handlers[task_tracking_update_type](handled_item)
if not self.done_work:
self._logger.clear()
running_time = float((datetime.now() - start).total_seconds())
if self.done_work:
self._logger.debug(DEBUG_RESULT, running_time)
return safe_dict({
"datetime": datetime.now().isoformat(),
"waiting-for-execution": self.waiting_for_execution_tasks,
"started-check-for-completion": self.started_completion_checks,
"started-execution": self.started_tasks,
"started-waiting": self.started_waiting_tasks,
"completed-concurrency-tasks": self.finished_concurrency_tasks,
"running-time": running_time
})
finally:
self._logger.flush()