func()

in plugin/connector/rocketmq/convert/rocketmq_message_reader.go [53:82]


func (r *RocketMQMessageReader) ReadBinary(_ context.Context, encoder binding.BinaryWriter) (err error) {
	subject := r.version.Attribute("subject")
	_ = encoder.SetAttribute(subject, r.topic)

	msgType := r.version.Attribute("type")
	_ = encoder.SetAttribute(msgType, constants.CloudEventMessageType)

	contentType := r.version.Attribute("datacontenttype")
	_ = encoder.SetAttribute(contentType, r.properties[constants.RocketMQMessageContentTypeProperties])

	for k, v := range r.properties {
		attr := r.version.Attribute(k)
		if attr != nil {
			err = encoder.SetAttribute(attr, v)
		} else {
			err = encoder.SetExtension(k, v)
		}
		if err != nil {
			return err
		}
	}

	if r.body != nil {
		err = encoder.SetData(bytes.NewReader(r.body))
		if err != nil {
			return err
		}
	}
	return
}