in statefun-sdk-python/statefun/egress_io.py [0:0]
def kafka_egress_message(typename: str,
topic: str,
value: typing.Union[str, bytes, bytearray, int, float],
value_type: Type = None,
key: str = None):
"""
Build a message that can be emitted to a Kafka generic egress.
If a value_type is provided, then @value will be serialized according to the
provided value_type's serializer. Otherwise we will try to convert @value to bytes
if it is one of:
- utf-8 string
- bytes
- bytearray
- an int (as defined by Kafka's serialization format)
- float (as defined by Kafka's serialization format)
:param typename: the target egress to emit to (as defined in the module.yaml)
:param topic: The Kafka destination topic for that record
:param key: the utf8 encoded string key to produce (can be empty)
:param value: the value to produce
:param value_type: an optional hint to this value type.
:return: A Protobuf message representing the record to be produced via the Kafka generic egress.
"""
if not topic:
raise ValueError("A destination Kafka topic is missing")
if value is None:
raise ValueError("Missing value")
record = KafkaProducerRecord()
record.topic = topic
if value_type:
ser = value_type.serializer()
record.value_bytes = ser.serialize(value)
elif isinstance(value, str):
record.value_bytes = bytes(value, 'utf-8')
elif isinstance(value, (bytes, bytearray)):
record.value_bytes = bytes(value)
elif isinstance(value, int):
# see:
# IntegerSerializer Javadoc
# https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html
record.value_bytes = struct.pack('>i', value)
elif isinstance(value, float):
# see:
# DoubleDeserializer Javadoc
# https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html
record.value_bytes = struct.pack('>d', value)
else:
raise TypeError("Unable to convert value to bytes.")
if key is not None:
record.key = key
typed_value = TypedValue()
typed_value.typename = "type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord"
typed_value.has_value = True
typed_value.value = record.SerializeToString()
return EgressMessage(typename, typed_value)