in statefun-sdk-go/v3/pkg/statefun/egress.go [63:100]
func (k KafkaEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
if k.Target == nil {
return nil, errors.New("an egress record requires a Target")
}
if k.Topic == "" {
return nil, errors.New("a Kafka record requires a topic")
}
if k.Value == nil {
return nil, errors.New("a Kafka record requires a value")
}
b, err := encodeKafkaValue(k)
if err != nil {
return nil, fmt.Errorf("failed to encode Kafka egress value: %w", err)
}
kafka := protocol.KafkaProducerRecord{
Key: k.Key,
ValueBytes: b,
Topic: k.Topic,
}
value, err := proto.Marshal(&kafka)
if err != nil {
return nil, err
}
return &protocol.FromFunction_EgressMessage{
EgressNamespace: k.Target.GetNamespace(),
EgressType: k.Target.GetType(),
Argument: &protocol.TypedValue{
Typename: kafkaTypeName,
HasValue: true,
Value: value,
},
}, nil
}