func()

in statefun-sdk-go/v3/pkg/statefun/egress.go [161:198]


func (k KinesisEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
	if k.Target == nil {
		return nil, errors.New("an egress record requires a Target")
	} else if k.Stream == "" {
		return nil, errors.New("missing destination Kinesis stream")
	} else if k.Value == nil {
		return nil, errors.New("missing for Kinesis egress value")
	} else if k.PartitionKey == "" {
		return nil, errors.New("missing partition key for Kinesis egress value")
	}

	b, err := encodeKinesisValue(k)
	if err != nil {
		return nil, fmt.Errorf("failed to encode Kinesis egress value: %w", err)
	}

	kinesis := protocol.KinesisEgressRecord{
		PartitionKey:    k.PartitionKey,
		ValueBytes:      b,
		Stream:          k.Stream,
		ExplicitHashKey: k.ExplicitHashKey,
	}

	value, err := proto.Marshal(&kinesis)
	if err != nil {
		return nil, err
	}

	return &protocol.FromFunction_EgressMessage{
		EgressNamespace: k.Target.GetNamespace(),
		EgressType:      k.Target.GetType(),
		Argument: &protocol.TypedValue{
			Typename: kinesisTypeName,
			HasValue: true,
			Value:    value,
		},
	}, nil
}