func buildBatchMessage()

in plugin/protocol/cloudevents/cloudevents.go [139:180]


func buildBatchMessage(bm *pb.BatchMessage) ([]*v2.Event, error) {
	var msgs []*v2.Event
	hdr := bm.Header

	for _, item := range bm.MessageItem {
		content := item.Content
		ct, ok := item.Properties[grpc.CONTENT_TYPE]
		if !ok {
			ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON
		}
		evt := v2.NewEvent()
		if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil {
			return nil, err
		}
		result := v2.NewEvent()
		ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
		topic := defaultIfEmpty(bm.Topic, evt.Subject())
		result.SetExtension(grpc.ENV, defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV]))
		result.SetExtension(grpc.IDC, defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC]))
		result.SetExtension(grpc.IP, defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP]))
		result.SetExtension(grpc.PID, defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID]))
		result.SetExtension(grpc.SYS, defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS]))
		result.SetExtension(grpc.LANGUAGE, defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE]))
		result.SetExtension(grpc.PROTOCOL_TYPE, defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE]))
		result.SetExtension(grpc.PROTOCOL_DESC, defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC]))
		result.SetExtension(grpc.PROTOCOL_VERSION, defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION]))
		result.SetExtension(grpc.UNIQUE_ID, defaultIfEmpty(item.UniqueId, evt.Extensions()[grpc.UNIQUE_ID]))
		result.SetExtension(grpc.SEQ_NUM, defaultIfEmpty(item.SeqNum, evt.Extensions()[grpc.SEQ_NUM]))
		result.SetExtension(grpc.USERNAME, defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME]))
		result.SetExtension(grpc.PASSWD, defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD]))
		result.SetExtension(grpc.TTL, defaultIfEmpty(item.Ttl, evt.Extensions()[grpc.TTL]))
		result.SetExtension(grpc.PRODUCERGROUP, defaultIfEmpty(bm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP]))
		if ver == event.CloudEventsVersionV1 {
			result.SetSpecVersion(event.CloudEventsVersionV1)
		} else {
			result.SetSpecVersion(event.CloudEventsVersionV03)
		}
		result.SetSubject(topic)
		msgs = append(msgs, &result)
	}
	return msgs, nil
}