def get_continuing_original_input_type()

in handlers/aws/utils.py [0:0]


def get_continuing_original_input_type(sqs_record: dict[str, Any]) -> Optional[str]:
    """
    Determines the original input type of the continue queue payload
    """

    if "messageAttributes" not in sqs_record:
        return None

    if "originalEventSourceARN" not in sqs_record["messageAttributes"]:
        return None

    original_event_source: str = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]

    if original_event_source.startswith("arn:aws:logs") or original_event_source.startswith("arn:aws-us-gov:logs"):
        return "cloudwatch-logs"

    if (
        original_event_source.startswith("arn:aws:kinesis")
        or original_event_source.startswith("arn:aws-us-gov:kinesis")
    ) and original_event_source.find(":stream/") > -1:
        return "kinesis-data-stream"

    return None