in statefun-sdk-python/statefun/egress_io.py [0:0]
def kinesis_egress_message(typename: str,
stream: str,
value: typing.Union[str, bytes, bytearray],
partition_key: str,
value_type: typing.Union[None, Type] = None,
explicit_hash_key: str = None):
"""
Build a message that can be emitted to a Kinesis generic egress.
:param typename: the typename as specified in module.yaml
:param stream: The AWS Kinesis destination stream for that record
:param partition_key: the utf8 encoded string partition key to use
:param value: the value to produce
:param explicit_hash_key: a utf8 encoded string explicit hash key to use (can be empty)
:param value_type: an optional hint to this value type
:return: A Protobuf message representing the record to be produced to AWS Kinesis via the Kinesis generic egress.
"""
if not stream:
raise ValueError("Missing destination Kinesis stream")
if value is None:
raise ValueError("Missing value")
if partition_key is None:
raise ValueError("Missing partition key")
record = KinesisEgressRecord()
record.stream = stream
if value_type:
ser = value_type.serializer()
record.value_bytes = ser.serialize(value)
elif isinstance(value, str):
record.value_bytes = bytes(value, 'utf-8')
elif isinstance(value, (bytes, bytearray)):
record.value_bytes = bytes(value)
else:
raise TypeError("Unable to convert value to bytes.")
record.partition_key = partition_key
if explicit_hash_key is not None:
record.explicit_hash_key = explicit_hash_key
typed_value = TypedValue()
typed_value.typename = "type.googleapis.com/io.statefun.sdk.egress.KinesisEgressRecord"
typed_value.has_value = True
typed_value.value = record.SerializeToString()
return EgressMessage(typename, typed_value)