def kinesis_egress_message()

in statefun-sdk-python/statefun/egress_io.py [0:0]


def kinesis_egress_message(typename: str,
                           stream: str,
                           value: typing.Union[str, bytes, bytearray],
                           partition_key: str,
                           value_type: typing.Union[None, Type] = None,
                           explicit_hash_key: str = None):
    """
    Build a message that can be emitted to a Kinesis generic egress.

    :param typename: the typename as specified in module.yaml
    :param stream: The AWS Kinesis destination stream for that record
    :param partition_key: the utf8 encoded string partition key to use
    :param value: the value to produce
    :param explicit_hash_key: a utf8 encoded string explicit hash key to use (can be empty)
    :param value_type: an optional hint to this value type
    :return: A Protobuf message representing the record to be produced to AWS Kinesis via the Kinesis generic egress.
    """
    if not stream:
        raise ValueError("Missing destination Kinesis stream")
    if value is None:
        raise ValueError("Missing value")
    if partition_key is None:
        raise ValueError("Missing partition key")
    record = KinesisEgressRecord()
    record.stream = stream
    if value_type:
        ser = value_type.serializer()
        record.value_bytes = ser.serialize(value)
    elif isinstance(value, str):
        record.value_bytes = bytes(value, 'utf-8')
    elif isinstance(value, (bytes, bytearray)):
        record.value_bytes = bytes(value)
    else:
        raise TypeError("Unable to convert value to bytes.")
    record.partition_key = partition_key
    if explicit_hash_key is not None:
        record.explicit_hash_key = explicit_hash_key

    typed_value = TypedValue()
    typed_value.typename = "type.googleapis.com/io.statefun.sdk.egress.KinesisEgressRecord"
    typed_value.has_value = True
    typed_value.value = record.SerializeToString()

    return EgressMessage(typename, typed_value)