plugin/protocol/cloudevents/cloudevents.go (177 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cloudevents
import (
"context"
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/common/protocol/grpc"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/common/protocol/tcp"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/protocol"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
"github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/event/datacodec"
)
func init() {
plugin.Register(protocol.CloudEvents, &CloudeventsPlugin{})
}
// CloudeventsPlugin CloudEvents protocol adaptor
// used to transform CloudEvents message to CloudEvents message.
type CloudeventsPlugin struct {
}
func (c *CloudeventsPlugin) Type() string {
return plugin.Protocol
}
func (c *CloudeventsPlugin) Setup(name string, dec plugin.Decoder) error {
return nil
}
func (c *CloudeventsPlugin) ToCloudEvent(msg interface{}) (*v2.Event, error) {
switch msg := msg.(type) {
case *tcp.Package:
pck := msg
return c.deserializeTcpProtocol(pck)
case *grpc.SimpleMessageWrapper:
sw := msg
sm := sw.SimpleMessage
return deserializeGrpcProtocol(sm)
}
panic("implement me")
}
func (c *CloudeventsPlugin) ToCloudEvents(msg interface{}) ([]*v2.Event, error) {
bmw := msg.(*grpc.BatchMessageWrapper)
return buildBatchMessage(bmw.BatchMessage)
}
func (c *CloudeventsPlugin) FromCloudEvent(event *v2.Event) (interface{}, error) {
desc := event.Extensions()[grpc.PROTOCOL_DESC].(string)
if desc == "grpc" {
return buildSimpleMessage(event)
}
return nil, fmt.Errorf("only grpc supported now")
}
func (c *CloudeventsPlugin) ProtocolType() string {
//TODO implement me
panic("implement me")
}
func (c *CloudeventsPlugin) deserializeTcpProtocol(pck *tcp.Package) (*v2.Event, error) {
// TODO add when support tcp procotol
panic("implement me")
}
func defaultIfEmpty(in string, def interface{}) string {
if in == "" {
return def.(string)
}
return in
}
func defaultIfNil(def string, in interface{}) string {
if in == nil {
return def
}
return in.(string)
}
func deserializeGrpcProtocol(sm *pb.SimpleMessage) (*v2.Event, error) {
content := sm.Content
ct, ok := sm.Properties[grpc.CONTENT_TYPE]
if !ok {
ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON
}
evt := v2.NewEvent()
if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil {
return nil, err
}
hdr := sm.Header
result := v2.NewEvent()
ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
topic := defaultIfEmpty(sm.Topic, evt.Subject())
result.SetExtension(grpc.ENV, defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV]))
result.SetExtension(grpc.IDC, defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC]))
result.SetExtension(grpc.IP, defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP]))
result.SetExtension(grpc.PID, defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID]))
result.SetExtension(grpc.SYS, defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS]))
result.SetExtension(grpc.LANGUAGE, defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE]))
result.SetExtension(grpc.PROTOCOL_TYPE, defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE]))
result.SetExtension(grpc.PROTOCOL_DESC, defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC]))
result.SetExtension(grpc.PROTOCOL_VERSION, defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION]))
result.SetExtension(grpc.UNIQUE_ID, defaultIfEmpty(sm.UniqueId, evt.Extensions()[grpc.UNIQUE_ID]))
result.SetExtension(grpc.SEQ_NUM, defaultIfEmpty(sm.SeqNum, evt.Extensions()[grpc.SEQ_NUM]))
result.SetExtension(grpc.USERNAME, defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME]))
result.SetExtension(grpc.PASSWD, defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD]))
result.SetExtension(grpc.TTL, defaultIfEmpty(sm.Ttl, evt.Extensions()[grpc.TTL]))
result.SetExtension(grpc.PRODUCERGROUP, defaultIfEmpty(sm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP]))
if ver == event.CloudEventsVersionV1 {
result.SetSpecVersion(event.CloudEventsVersionV1)
} else {
result.SetSpecVersion(event.CloudEventsVersionV03)
}
result.SetSubject(topic)
return &result, nil
}
func buildBatchMessage(bm *pb.BatchMessage) ([]*v2.Event, error) {
var msgs []*v2.Event
hdr := bm.Header
for _, item := range bm.MessageItem {
content := item.Content
ct, ok := item.Properties[grpc.CONTENT_TYPE]
if !ok {
ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON
}
evt := v2.NewEvent()
if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil {
return nil, err
}
result := v2.NewEvent()
ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
topic := defaultIfEmpty(bm.Topic, evt.Subject())
result.SetExtension(grpc.ENV, defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV]))
result.SetExtension(grpc.IDC, defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC]))
result.SetExtension(grpc.IP, defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP]))
result.SetExtension(grpc.PID, defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID]))
result.SetExtension(grpc.SYS, defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS]))
result.SetExtension(grpc.LANGUAGE, defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE]))
result.SetExtension(grpc.PROTOCOL_TYPE, defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE]))
result.SetExtension(grpc.PROTOCOL_DESC, defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC]))
result.SetExtension(grpc.PROTOCOL_VERSION, defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION]))
result.SetExtension(grpc.UNIQUE_ID, defaultIfEmpty(item.UniqueId, evt.Extensions()[grpc.UNIQUE_ID]))
result.SetExtension(grpc.SEQ_NUM, defaultIfEmpty(item.SeqNum, evt.Extensions()[grpc.SEQ_NUM]))
result.SetExtension(grpc.USERNAME, defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME]))
result.SetExtension(grpc.PASSWD, defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD]))
result.SetExtension(grpc.TTL, defaultIfEmpty(item.Ttl, evt.Extensions()[grpc.TTL]))
result.SetExtension(grpc.PRODUCERGROUP, defaultIfEmpty(bm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP]))
if ver == event.CloudEventsVersionV1 {
result.SetSpecVersion(event.CloudEventsVersionV1)
} else {
result.SetSpecVersion(event.CloudEventsVersionV03)
}
result.SetSubject(topic)
msgs = append(msgs, &result)
}
return msgs, nil
}
func buildSimpleMessage(evt *v2.Event) (*grpc.SimpleMessageWrapper, error) {
ct, err := datacodec.Encode(context.TODO(), evt.DataContentType(), evt)
if err != nil {
return nil, err
}
hdr := &pb.RequestHeader{
Env: defaultIfNil("env", evt.Extensions()[grpc.ENV]),
Idc: defaultIfNil("idc", evt.Extensions()[grpc.IDC]),
Ip: defaultIfNil("127.0.0.1", evt.Extensions()[grpc.IP]),
Pid: defaultIfNil("123", evt.Extensions()[grpc.PID]),
Sys: defaultIfNil("sys123", evt.Extensions()[grpc.SYS]),
Username: defaultIfNil("user", evt.Extensions()[grpc.USERNAME]),
Password: defaultIfNil("pass", evt.Extensions()[grpc.PASSWD]),
Language: defaultIfNil("JAVA", evt.Extensions()[grpc.LANGUAGE]),
ProtocolType: defaultIfNil("protocol", evt.Extensions()[grpc.PROTOCOL_TYPE]),
ProtocolDesc: defaultIfNil("protocolDesc", evt.Extensions()[grpc.PROTOCOL_DESC]),
ProtocolVersion: defaultIfNil("1.0", evt.Extensions()[grpc.PROTOCOL_VERSION]),
}
msg := &pb.SimpleMessage{
Header: hdr,
Content: string(ct),
ProducerGroup: defaultIfNil("producerGroup", evt.Extensions()[grpc.PRODUCERGROUP]),
SeqNum: defaultIfNil("", evt.Extensions()[grpc.SEQ_NUM]),
UniqueId: defaultIfNil("", evt.Extensions()[grpc.UNIQUE_ID]),
Topic: evt.Subject(),
Ttl: defaultIfNil("3000", evt.Extensions()[grpc.TTL]),
Properties: map[string]string{grpc.CONTENT_TYPE: evt.DataContentType()},
}
for k, v := range evt.Extensions() {
msg.Properties[k] = defaultIfNil("", v)
}
return &grpc.SimpleMessageWrapper{SimpleMessage: msg}, nil
}