in plugin/protocol/cloudevents/cloudevents.go [139:180]
func buildBatchMessage(bm *pb.BatchMessage) ([]*v2.Event, error) {
var msgs []*v2.Event
hdr := bm.Header
for _, item := range bm.MessageItem {
content := item.Content
ct, ok := item.Properties[grpc.CONTENT_TYPE]
if !ok {
ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON
}
evt := v2.NewEvent()
if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil {
return nil, err
}
result := v2.NewEvent()
ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
topic := defaultIfEmpty(bm.Topic, evt.Subject())
result.SetExtension(grpc.ENV, defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV]))
result.SetExtension(grpc.IDC, defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC]))
result.SetExtension(grpc.IP, defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP]))
result.SetExtension(grpc.PID, defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID]))
result.SetExtension(grpc.SYS, defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS]))
result.SetExtension(grpc.LANGUAGE, defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE]))
result.SetExtension(grpc.PROTOCOL_TYPE, defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE]))
result.SetExtension(grpc.PROTOCOL_DESC, defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC]))
result.SetExtension(grpc.PROTOCOL_VERSION, defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION]))
result.SetExtension(grpc.UNIQUE_ID, defaultIfEmpty(item.UniqueId, evt.Extensions()[grpc.UNIQUE_ID]))
result.SetExtension(grpc.SEQ_NUM, defaultIfEmpty(item.SeqNum, evt.Extensions()[grpc.SEQ_NUM]))
result.SetExtension(grpc.USERNAME, defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME]))
result.SetExtension(grpc.PASSWD, defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD]))
result.SetExtension(grpc.TTL, defaultIfEmpty(item.Ttl, evt.Extensions()[grpc.TTL]))
result.SetExtension(grpc.PRODUCERGROUP, defaultIfEmpty(bm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP]))
if ver == event.CloudEventsVersionV1 {
result.SetSpecVersion(event.CloudEventsVersionV1)
} else {
result.SetSpecVersion(event.CloudEventsVersionV03)
}
result.SetSubject(topic)
msgs = append(msgs, &result)
}
return msgs, nil
}