go/protocol/telemetry_cloud_event.go (264 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package protocol import ( "log/slog" "net/url" "regexp" "time" "github.com/Azure/iot-operations-sdks/go/internal/mqtt" "github.com/Azure/iot-operations-sdks/go/protocol/errors" "github.com/Azure/iot-operations-sdks/go/protocol/internal/errutil" "github.com/relvacode/iso8601" ) // CloudEvent provides an implementation of the CloudEvents 1.0 spec; see: // https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md type CloudEvent struct { ID string Source *url.URL SpecVersion string Type string DataContentType string DataSchema *url.URL Subject string Time time.Time } const ( DefaultCloudEventSpecVersion = "1.0" DefaultCloudEventType = "ms.aio.telemetry" ceID = "id" ceSource = "source" ceSpecVersion = "specversion" ceType = "type" ceDataContentType = "datacontenttype" ceDataSchema = "dataschema" ceSubject = "subject" ceTime = "time" ) var contentTypeRegex = regexp.MustCompile( `^([-a-z]+)/([-a-z0-9.]+)(?:\+([-a-z0-9.]+))?$`, ) var ceReserved = []string{ ceID, ceSource, ceSpecVersion, ceType, // ceDataContentType - not stored in user properties, so omitted here. ceDataSchema, ceSubject, ceTime, } // Attrs returns additional attributes for slog. func (ce *CloudEvent) Attrs() []slog.Attr { // Cloud events were not specified; just bail out. if ce == nil { return nil } a := make([]slog.Attr, 0, 8) a = append(a, slog.String(ceID, ce.ID), slog.String(ceSource, ce.Source.String()), slog.String(ceSpecVersion, ce.SpecVersion), slog.String(ceType, ce.Type), ) if ce.DataContentType != "" { a = append(a, slog.String(ceDataContentType, ce.DataContentType)) } if ce.DataSchema != nil { a = append(a, slog.String(ceDataSchema, ce.DataSchema.String())) } if ce.Subject != "" { a = append(a, slog.String(ceSubject, ce.Subject)) } if !ce.Time.IsZero() { a = append(a, slog.String(ceTime, ce.Time.Format(time.RFC3339))) } return a } // Initialize default values in the cloud event where possible; error where not. func (ce *CloudEvent) toMessage(msg *mqtt.Message) error { // Cloud events were not specified; just bail out. if ce == nil { return nil } for _, key := range ceReserved { if _, ok := msg.UserProperties[key]; ok { return &errors.Client{ Message: "metadata key reserved for cloud event", Kind: errors.ConfigurationInvalid{ PropertyName: "Metadata", PropertyValue: key, }, } } } if ce.ID != "" { msg.UserProperties[ceID] = ce.ID } else { id, err := errutil.NewUUID() if err != nil { return err } msg.UserProperties[ceID] = id } // We have reasonable defaults for all other values; source, however, is // both required and something the caller must specify. if ce.Source == nil { return &errors.Client{ Message: "source must be defined", Kind: errors.ConfigurationInvalid{ PropertyName: "CloudEvent", }, } } msg.UserProperties[ceSource] = ce.Source.String() if ce.SpecVersion != "" { msg.UserProperties[ceSpecVersion] = ce.SpecVersion } else { msg.UserProperties[ceSpecVersion] = DefaultCloudEventSpecVersion } if ce.Type != "" { msg.UserProperties[ceType] = ce.Type } else { msg.UserProperties[ceType] = DefaultCloudEventType } if ce.DataContentType != "" && ce.DataContentType != msg.ContentType { return &errors.Client{ Message: "cloud event content type mismatch", Kind: errors.ConfigurationInvalid{ PropertyName: "DataContentType", PropertyValue: ce.DataContentType, }, } } if !contentTypeRegex.MatchString(msg.ContentType) { return &errors.Client{ Message: "cloud event invalid content type", Kind: errors.ConfigurationInvalid{ PropertyName: "DataContentType", PropertyValue: msg.ContentType, }, } } if ce.DataSchema != nil { if ce.DataSchema.Scheme == "" { return &errors.Client{ Message: "cloud event data schema URI not absolute", Kind: errors.ConfigurationInvalid{ PropertyName: "CloudEvent", }, } } msg.UserProperties[ceDataSchema] = ce.DataSchema.String() } if ce.Subject != "" { msg.UserProperties[ceSubject] = ce.Subject } else { msg.UserProperties[ceSubject] = msg.Topic } if !ce.Time.IsZero() { msg.UserProperties[ceTime] = ce.Time.Format(time.RFC3339) } else { msg.UserProperties[ceTime] = time.Now().UTC().Format(time.RFC3339) } return nil } // CloudEventFromTelemetry extracts cloud event data from the given telemetry // message. It will return an error if any required properties are missing or // any properties do not match the expected schema. func CloudEventFromTelemetry[T any]( msg *TelemetryMessage[T], ) (*CloudEvent, error) { var ok bool var err error ce := &CloudEvent{} ce.SpecVersion, ok = msg.Metadata[ceSpecVersion] if !ok { return nil, &errors.Client{ Message: "cloud event missing spec version header", Kind: errors.HeaderMissing{ HeaderName: ceSpecVersion, }, } } if ce.SpecVersion != "1.0" { return nil, &errors.Client{ Message: "cloud event invalid spec version", Kind: errors.HeaderInvalid{ HeaderName: ceSpecVersion, HeaderValue: ce.SpecVersion, }, } } ce.ID, ok = msg.Metadata[ceID] if !ok { return nil, &errors.Client{ Message: "cloud event missing ID header", Kind: errors.HeaderMissing{ HeaderName: ceID, }, } } src, ok := msg.Metadata[ceSource] if !ok { return nil, &errors.Client{ Message: "cloud event missing source header", Kind: errors.HeaderMissing{ HeaderName: ceSource, }, } } ce.Source, err = url.Parse(src) if err != nil { return nil, &errors.Client{ Message: "cloud event invalid source header", Kind: errors.HeaderInvalid{ HeaderName: ceSource, HeaderValue: src, }, Nested: err, } } ce.Type, ok = msg.Metadata[ceType] if !ok { return nil, &errors.Client{ Message: "cloud event missing type header", Kind: errors.HeaderMissing{ HeaderName: ceType, }, } } // Don't fail for missing optional properties, but do fail for optional // properties that don't parse. if !contentTypeRegex.MatchString(msg.ContentType) { return nil, &errors.Client{ Message: "cloud event content type invalid", Kind: errors.HeaderInvalid{ HeaderName: ceDataContentType, HeaderValue: msg.ContentType, }, } } ce.DataContentType = msg.ContentType if ds, ok := msg.Metadata[ceDataSchema]; ok { ce.DataSchema, err = url.Parse(ds) if err != nil { return nil, &errors.Client{ Message: "cloud event invalid data schema header", Kind: errors.HeaderInvalid{ HeaderName: ceDataSchema, HeaderValue: ds, }, Nested: err, } } if ce.DataSchema.Scheme == "" { return nil, &errors.Client{ Message: "cloud event data schema URI not absolute", Kind: errors.HeaderInvalid{ HeaderName: ceDataSchema, HeaderValue: ds, }, } } } ce.Subject = msg.Metadata[ceSubject] if t, ok := msg.Metadata[ceTime]; ok { ce.Time, err = iso8601.ParseString(t) if err != nil { return nil, &errors.Client{ Message: "cloud event invalid time header", Kind: errors.HeaderInvalid{ HeaderName: ceTime, HeaderValue: t, }, Nested: err, } } } return ce, nil }