def kafka_egress_message()

in statefun-sdk-python/statefun/egress_io.py [0:0]


def kafka_egress_message(typename: str,
                         topic: str,
                         value: typing.Union[str, bytes, bytearray, int, float],
                         value_type: Type = None,
                         key: str = None):
    """
    Build a message that can be emitted to a Kafka generic egress.

    If a value_type is provided, then @value will be serialized according to the
    provided value_type's serializer. Otherwise we will try to convert @value to bytes
    if it is one of:
    - utf-8 string
    - bytes
    - bytearray
    - an int (as defined by Kafka's serialization format)
    - float (as defined by Kafka's serialization format)

    :param typename: the target egress to emit to (as defined in the module.yaml)
    :param topic: The Kafka destination topic for that record
    :param key: the utf8 encoded string key to produce (can be empty)
    :param value: the value to produce
    :param value_type: an optional hint to this value type.
    :return: A Protobuf message representing the record to be produced via the Kafka generic egress.
    """
    if not topic:
        raise ValueError("A destination Kafka topic is missing")
    if value is None:
        raise ValueError("Missing value")
    record = KafkaProducerRecord()
    record.topic = topic
    if value_type:
        ser = value_type.serializer()
        record.value_bytes = ser.serialize(value)
    elif isinstance(value, str):
        record.value_bytes = bytes(value, 'utf-8')
    elif isinstance(value, (bytes, bytearray)):
        record.value_bytes = bytes(value)
    elif isinstance(value, int):
        # see:
        # IntegerSerializer Javadoc
        # https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html
        record.value_bytes = struct.pack('>i', value)
    elif isinstance(value, float):
        # see:
        # DoubleDeserializer Javadoc
        # https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html
        record.value_bytes = struct.pack('>d', value)
    else:
        raise TypeError("Unable to convert value to bytes.")
    if key is not None:
        record.key = key

    typed_value = TypedValue()
    typed_value.typename = "type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord"
    typed_value.has_value = True
    typed_value.value = record.SerializeToString()

    return EgressMessage(typename, typed_value)