func()

in statefun-sdk-go/v3/pkg/statefun/egress.go [63:100]


func (k KafkaEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
	if k.Target == nil {
		return nil, errors.New("an egress record requires a Target")
	}
	if k.Topic == "" {
		return nil, errors.New("a Kafka record requires a topic")
	}

	if k.Value == nil {
		return nil, errors.New("a Kafka record requires a value")
	}

	b, err := encodeKafkaValue(k)
	if err != nil {
		return nil, fmt.Errorf("failed to encode Kafka egress value: %w", err)
	}

	kafka := protocol.KafkaProducerRecord{
		Key:        k.Key,
		ValueBytes: b,
		Topic:      k.Topic,
	}

	value, err := proto.Marshal(&kafka)
	if err != nil {
		return nil, err
	}

	return &protocol.FromFunction_EgressMessage{
		EgressNamespace: k.Target.GetNamespace(),
		EgressType:      k.Target.GetType(),
		Argument: &protocol.TypedValue{
			Typename: kafkaTypeName,
			HasValue: true,
			Value:    value,
		},
	}, nil
}