func encodeKafkaValue()

in statefun-sdk-go/v3/pkg/statefun/egress.go [102:133]


func encodeKafkaValue(k KafkaEgressBuilder) ([]byte, error) {
	if k.ValueType != nil {
		buffer := bytes.Buffer{}
		if err := k.ValueType.Serialize(&buffer, k.Value); err != nil {
			return nil, err
		}

		return buffer.Bytes(), nil
	}

	switch value := k.Value.(type) {
	case string:
		if !utf8.ValidString(value) {
			return nil, fmt.Errorf("strings must be valid utf-8")
		}

		return []byte(value), nil
	case []byte:
		b := make([]byte, len(value))
		copy(b, value)
		return b, nil
	case int, int32, int64, float32, float64:
		buffer := bytes.Buffer{}
		if err := binary.Write(&buffer, binary.BigEndian, value); err != nil {
			return nil, err
		}

		return buffer.Bytes(), nil
	default:
		return nil, errors.New("unable to convert value to bytes")
	}
}