in statefun-sdk-js/src/egress.ts [66:86]
export function kafkaEgressMessage({typename = "", topic = "", key = "", value = null, valueType}: KafkaEgressOpts) {
if (isEmptyOrNull(typename)) {
throw new Error("typename is missing");
}
validateTypeName(typename);
if (isEmptyOrNull(topic)) {
throw new Error("topic is missing")
}
if (value === undefined || value === null) {
throw new Error("value is missing");
}
const pbKafka = new proto.io.statefun.sdk.egress.KafkaProducerRecord();
pbKafka.setTopic(topic);
pbKafka.setValueBytes(trySerializerForEgress(valueType, value));
if (!isEmptyOrNull(key)) {
pbKafka.setKey(key);
}
const bytes = pbKafka.serializeBinary();
const box = TypedValueSupport.toTypedValueRaw("type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord", bytes);
return new EgressMessage(typename, box);
}