plugin/connector/rocketmq/convert/rocketmq_message_reader.go (74 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package convert
import (
"bytes"
"context"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/rocketmq/constants"
"github.com/apache/rocketmq-client-go/v2/primitive"
ce "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/cloudevents/sdk-go/v2/event"
)
// RocketMQMessageReader cloud event message reader
type RocketMQMessageReader struct {
topic string
properties map[string]string
body []byte
version spec.Version
}
// NewRocketMQMessageReader get RocketMQ message reader
func NewRocketMQMessageReader(message *primitive.Message) *RocketMQMessageReader {
return &RocketMQMessageReader{
topic: message.Topic,
properties: message.GetProperties(),
body: message.Body,
version: spec.VS.Version(event.CloudEventsVersionV1),
}
}
// ReadEncoding the RocketMQMessageReader only supports binary encoding
func (r *RocketMQMessageReader) ReadEncoding() binding.Encoding {
return binding.EncodingBinary
}
// ReadBinary read message and write through encoder
func (r *RocketMQMessageReader) ReadBinary(_ context.Context, encoder binding.BinaryWriter) (err error) {
subject := r.version.Attribute("subject")
_ = encoder.SetAttribute(subject, r.topic)
msgType := r.version.Attribute("type")
_ = encoder.SetAttribute(msgType, constants.CloudEventMessageType)
contentType := r.version.Attribute("datacontenttype")
_ = encoder.SetAttribute(contentType, r.properties[constants.RocketMQMessageContentTypeProperties])
for k, v := range r.properties {
attr := r.version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, v)
} else {
err = encoder.SetExtension(k, v)
}
if err != nil {
return err
}
}
if r.body != nil {
err = encoder.SetData(bytes.NewReader(r.body))
if err != nil {
return err
}
}
return
}
// GetAttribute get attribute
func (r *RocketMQMessageReader) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) {
attr := r.version.Attribute(k.String())
if attr != nil {
return attr, r.properties[k.String()]
}
return attr, nil
}
// GetExtension get extension
func (r *RocketMQMessageReader) GetExtension(s string) interface{} {
return r.properties[s]
}
// ReadStructured the RocketMQMessageReader only supports binary encoding
func (r *RocketMQMessageReader) ReadStructured(context.Context, binding.StructuredWriter) error {
return binding.ErrNotStructured
}
// ToCloudEvent convert RocketMQ message to cloud event
func (r *RocketMQMessageReader) ToCloudEvent(ctx context.Context) (*ce.Event, error) {
event, err := binding.ToEvent(ctx, r)
if err != nil {
return nil, err
}
return event, nil
}