export function kafkaEgressMessage()

in statefun-sdk-js/src/egress.ts [66:86]


export function kafkaEgressMessage({typename = "", topic = "", key = "", value = null, valueType}: KafkaEgressOpts) {
    if (isEmptyOrNull(typename)) {
        throw new Error("typename is missing");
    }
    validateTypeName(typename);
    if (isEmptyOrNull(topic)) {
        throw new Error("topic is missing")
    }
    if (value === undefined || value === null) {
        throw new Error("value is missing");
    }
    const pbKafka = new proto.io.statefun.sdk.egress.KafkaProducerRecord();
    pbKafka.setTopic(topic);
    pbKafka.setValueBytes(trySerializerForEgress(valueType, value));
    if (!isEmptyOrNull(key)) {
        pbKafka.setKey(key);
    }
    const bytes = pbKafka.serializeBinary();
    const box = TypedValueSupport.toTypedValueRaw("type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord", bytes);
    return new EgressMessage(typename, box);
}