in handlers/aws/utils.py [0:0]
def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str]:
"""
Determines the trigger type according to the payload of the trigger event
and if the config must be read from attributes or from S3 file in env
"""
if "awslogs" in event and "data" in event["awslogs"]:
return "cloudwatch-logs", CONFIG_FROM_S3FILE
if "Records" not in event or len(event["Records"]) < 1:
raise Exception("Not supported trigger")
event_source = ""
first_record = event["Records"][0]
if "body" in first_record:
event_body = first_record["body"]
try:
body = json_parser(event_body)
if (
isinstance(body, dict)
and "output_destination" in event_body
and "output_args" in event_body
and "event_payload" in event_body
):
return "replay-sqs", CONFIG_FROM_S3FILE
if (
isinstance(body, dict)
and "Records" in body
and len(body["Records"]) > 0
and "eventSource" in body["Records"][0]
):
event_source = body["Records"][0]["eventSource"]
if event_source not in _available_triggers:
raise Exception("except in the function")
else:
raise Exception("except in the function")
except Exception:
if "eventSource" not in first_record:
raise Exception("Not supported trigger")
event_source = first_record["eventSource"]
else:
if "eventSource" not in first_record:
raise Exception("Not supported trigger")
event_source = first_record["eventSource"]
if event_source not in _available_triggers:
raise Exception("Not supported trigger")
trigger_type: Optional[str] = _available_triggers[event_source]
assert trigger_type is not None
if "messageAttributes" not in first_record:
return trigger_type, CONFIG_FROM_S3FILE
if "originalEventSourceARN" not in first_record["messageAttributes"]:
return trigger_type, CONFIG_FROM_S3FILE
return trigger_type, CONFIG_FROM_PAYLOAD