def get_trigger_type_and_config_source()

in handlers/aws/utils.py [0:0]


def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str]:
    """
    Determines the trigger type according to the payload of the trigger event
    and if the config must be read from attributes or from S3 file in env
    """

    if "awslogs" in event and "data" in event["awslogs"]:
        return "cloudwatch-logs", CONFIG_FROM_S3FILE

    if "Records" not in event or len(event["Records"]) < 1:
        raise Exception("Not supported trigger")

    event_source = ""
    first_record = event["Records"][0]
    if "body" in first_record:
        event_body = first_record["body"]
        try:
            body = json_parser(event_body)
            if (
                isinstance(body, dict)
                and "output_destination" in event_body
                and "output_args" in event_body
                and "event_payload" in event_body
            ):
                return "replay-sqs", CONFIG_FROM_S3FILE

            if (
                isinstance(body, dict)
                and "Records" in body
                and len(body["Records"]) > 0
                and "eventSource" in body["Records"][0]
            ):
                event_source = body["Records"][0]["eventSource"]
                if event_source not in _available_triggers:
                    raise Exception("except in the function")
            else:
                raise Exception("except in the function")

        except Exception:
            if "eventSource" not in first_record:
                raise Exception("Not supported trigger")

            event_source = first_record["eventSource"]
    else:
        if "eventSource" not in first_record:
            raise Exception("Not supported trigger")

        event_source = first_record["eventSource"]

    if event_source not in _available_triggers:
        raise Exception("Not supported trigger")

    trigger_type: Optional[str] = _available_triggers[event_source]
    assert trigger_type is not None

    if "messageAttributes" not in first_record:
        return trigger_type, CONFIG_FROM_S3FILE

    if "originalEventSourceARN" not in first_record["messageAttributes"]:
        return trigger_type, CONFIG_FROM_S3FILE

    return trigger_type, CONFIG_FROM_PAYLOAD