in handlers/aws/utils.py [0:0]
def get_continuing_original_input_type(sqs_record: dict[str, Any]) -> Optional[str]:
"""
Determines the original input type of the continue queue payload
"""
if "messageAttributes" not in sqs_record:
return None
if "originalEventSourceARN" not in sqs_record["messageAttributes"]:
return None
original_event_source: str = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]
if original_event_source.startswith("arn:aws:logs") or original_event_source.startswith("arn:aws-us-gov:logs"):
return "cloudwatch-logs"
if (
original_event_source.startswith("arn:aws:kinesis")
or original_event_source.startswith("arn:aws-us-gov:kinesis")
) and original_event_source.find(":stream/") > -1:
return "kinesis-data-stream"
return None