in go/protocol/telemetry_cloud_event.go [93:190]
func (ce *CloudEvent) toMessage(msg *mqtt.Message) error {
// Cloud events were not specified; just bail out.
if ce == nil {
return nil
}
for _, key := range ceReserved {
if _, ok := msg.UserProperties[key]; ok {
return &errors.Client{
Message: "metadata key reserved for cloud event",
Kind: errors.ConfigurationInvalid{
PropertyName: "Metadata",
PropertyValue: key,
},
}
}
}
if ce.ID != "" {
msg.UserProperties[ceID] = ce.ID
} else {
id, err := errutil.NewUUID()
if err != nil {
return err
}
msg.UserProperties[ceID] = id
}
// We have reasonable defaults for all other values; source, however, is
// both required and something the caller must specify.
if ce.Source == nil {
return &errors.Client{
Message: "source must be defined",
Kind: errors.ConfigurationInvalid{
PropertyName: "CloudEvent",
},
}
}
msg.UserProperties[ceSource] = ce.Source.String()
if ce.SpecVersion != "" {
msg.UserProperties[ceSpecVersion] = ce.SpecVersion
} else {
msg.UserProperties[ceSpecVersion] = DefaultCloudEventSpecVersion
}
if ce.Type != "" {
msg.UserProperties[ceType] = ce.Type
} else {
msg.UserProperties[ceType] = DefaultCloudEventType
}
if ce.DataContentType != "" && ce.DataContentType != msg.ContentType {
return &errors.Client{
Message: "cloud event content type mismatch",
Kind: errors.ConfigurationInvalid{
PropertyName: "DataContentType",
PropertyValue: ce.DataContentType,
},
}
}
if !contentTypeRegex.MatchString(msg.ContentType) {
return &errors.Client{
Message: "cloud event invalid content type",
Kind: errors.ConfigurationInvalid{
PropertyName: "DataContentType",
PropertyValue: msg.ContentType,
},
}
}
if ce.DataSchema != nil {
if ce.DataSchema.Scheme == "" {
return &errors.Client{
Message: "cloud event data schema URI not absolute",
Kind: errors.ConfigurationInvalid{
PropertyName: "CloudEvent",
},
}
}
msg.UserProperties[ceDataSchema] = ce.DataSchema.String()
}
if ce.Subject != "" {
msg.UserProperties[ceSubject] = ce.Subject
} else {
msg.UserProperties[ceSubject] = msg.Topic
}
if !ce.Time.IsZero() {
msg.UserProperties[ceTime] = ce.Time.Format(time.RFC3339)
} else {
msg.UserProperties[ceTime] = time.Now().UTC().Format(time.RFC3339)
}
return nil
}