in plugin/connector/rocketmq/convert/rocketmq_message_reader.go [53:82]
func (r *RocketMQMessageReader) ReadBinary(_ context.Context, encoder binding.BinaryWriter) (err error) {
subject := r.version.Attribute("subject")
_ = encoder.SetAttribute(subject, r.topic)
msgType := r.version.Attribute("type")
_ = encoder.SetAttribute(msgType, constants.CloudEventMessageType)
contentType := r.version.Attribute("datacontenttype")
_ = encoder.SetAttribute(contentType, r.properties[constants.RocketMQMessageContentTypeProperties])
for k, v := range r.properties {
attr := r.version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, v)
} else {
err = encoder.SetExtension(k, v)
}
if err != nil {
return err
}
}
if r.body != nil {
err = encoder.SetData(bytes.NewReader(r.body))
if err != nil {
return err
}
}
return
}