in go/protocol/telemetry_cloud_event.go [195:317]
func CloudEventFromTelemetry[T any](
msg *TelemetryMessage[T],
) (*CloudEvent, error) {
var ok bool
var err error
ce := &CloudEvent{}
ce.SpecVersion, ok = msg.Metadata[ceSpecVersion]
if !ok {
return nil, &errors.Client{
Message: "cloud event missing spec version header",
Kind: errors.HeaderMissing{
HeaderName: ceSpecVersion,
},
}
}
if ce.SpecVersion != "1.0" {
return nil, &errors.Client{
Message: "cloud event invalid spec version",
Kind: errors.HeaderInvalid{
HeaderName: ceSpecVersion,
HeaderValue: ce.SpecVersion,
},
}
}
ce.ID, ok = msg.Metadata[ceID]
if !ok {
return nil, &errors.Client{
Message: "cloud event missing ID header",
Kind: errors.HeaderMissing{
HeaderName: ceID,
},
}
}
src, ok := msg.Metadata[ceSource]
if !ok {
return nil, &errors.Client{
Message: "cloud event missing source header",
Kind: errors.HeaderMissing{
HeaderName: ceSource,
},
}
}
ce.Source, err = url.Parse(src)
if err != nil {
return nil, &errors.Client{
Message: "cloud event invalid source header",
Kind: errors.HeaderInvalid{
HeaderName: ceSource,
HeaderValue: src,
},
Nested: err,
}
}
ce.Type, ok = msg.Metadata[ceType]
if !ok {
return nil, &errors.Client{
Message: "cloud event missing type header",
Kind: errors.HeaderMissing{
HeaderName: ceType,
},
}
}
// Don't fail for missing optional properties, but do fail for optional
// properties that don't parse.
if !contentTypeRegex.MatchString(msg.ContentType) {
return nil, &errors.Client{
Message: "cloud event content type invalid",
Kind: errors.HeaderInvalid{
HeaderName: ceDataContentType,
HeaderValue: msg.ContentType,
},
}
}
ce.DataContentType = msg.ContentType
if ds, ok := msg.Metadata[ceDataSchema]; ok {
ce.DataSchema, err = url.Parse(ds)
if err != nil {
return nil, &errors.Client{
Message: "cloud event invalid data schema header",
Kind: errors.HeaderInvalid{
HeaderName: ceDataSchema,
HeaderValue: ds,
},
Nested: err,
}
}
if ce.DataSchema.Scheme == "" {
return nil, &errors.Client{
Message: "cloud event data schema URI not absolute",
Kind: errors.HeaderInvalid{
HeaderName: ceDataSchema,
HeaderValue: ds,
},
}
}
}
ce.Subject = msg.Metadata[ceSubject]
if t, ok := msg.Metadata[ceTime]; ok {
ce.Time, err = iso8601.ParseString(t)
if err != nil {
return nil, &errors.Client{
Message: "cloud event invalid time header",
Kind: errors.HeaderInvalid{
HeaderName: ceTime,
HeaderValue: t,
},
Nested: err,
}
}
}
return ce, nil
}