def kinesis_record_id()

in handlers/aws/utils.py [0:0]


def kinesis_record_id(event_payload: dict[str, Any]) -> str:
    """
    Generates a unique event id given the payload of an event from a kinesis stream
    """
    offset: int = event_payload["fields"]["log"]["offset"]
    stream_type: str = event_payload["fields"]["aws"]["kinesis"]["type"]
    stream_name: str = event_payload["fields"]["aws"]["kinesis"]["name"]
    partition_key: str = event_payload["fields"]["aws"]["kinesis"]["partition_key"]
    sequence_number: str = event_payload["fields"]["aws"]["kinesis"]["sequence_number"]
    approximate_arrival_timestamp: int = event_payload["meta"]["approximate_arrival_timestamp"]

    src: str = f"{stream_type}-{stream_name}-{partition_key}-{sequence_number}"
    hex_src = get_hex_prefix(src)

    return f"{approximate_arrival_timestamp}-{hex_src}-{offset:012d}"