func CloudEventFromTelemetry[T any]()

in go/protocol/telemetry_cloud_event.go [195:317]


func CloudEventFromTelemetry[T any](
	msg *TelemetryMessage[T],
) (*CloudEvent, error) {
	var ok bool
	var err error
	ce := &CloudEvent{}

	ce.SpecVersion, ok = msg.Metadata[ceSpecVersion]
	if !ok {
		return nil, &errors.Client{
			Message: "cloud event missing spec version header",
			Kind: errors.HeaderMissing{
				HeaderName: ceSpecVersion,
			},
		}
	}
	if ce.SpecVersion != "1.0" {
		return nil, &errors.Client{
			Message: "cloud event invalid spec version",
			Kind: errors.HeaderInvalid{
				HeaderName:  ceSpecVersion,
				HeaderValue: ce.SpecVersion,
			},
		}
	}

	ce.ID, ok = msg.Metadata[ceID]
	if !ok {
		return nil, &errors.Client{
			Message: "cloud event missing ID header",
			Kind: errors.HeaderMissing{
				HeaderName: ceID,
			},
		}
	}

	src, ok := msg.Metadata[ceSource]
	if !ok {
		return nil, &errors.Client{
			Message: "cloud event missing source header",
			Kind: errors.HeaderMissing{
				HeaderName: ceSource,
			},
		}
	}
	ce.Source, err = url.Parse(src)
	if err != nil {
		return nil, &errors.Client{
			Message: "cloud event invalid source header",
			Kind: errors.HeaderInvalid{
				HeaderName:  ceSource,
				HeaderValue: src,
			},
			Nested: err,
		}
	}

	ce.Type, ok = msg.Metadata[ceType]
	if !ok {
		return nil, &errors.Client{
			Message: "cloud event missing type header",
			Kind: errors.HeaderMissing{
				HeaderName: ceType,
			},
		}
	}

	// Don't fail for missing optional properties, but do fail for optional
	// properties that don't parse.

	if !contentTypeRegex.MatchString(msg.ContentType) {
		return nil, &errors.Client{
			Message: "cloud event content type invalid",
			Kind: errors.HeaderInvalid{
				HeaderName:  ceDataContentType,
				HeaderValue: msg.ContentType,
			},
		}
	}

	ce.DataContentType = msg.ContentType

	if ds, ok := msg.Metadata[ceDataSchema]; ok {
		ce.DataSchema, err = url.Parse(ds)
		if err != nil {
			return nil, &errors.Client{
				Message: "cloud event invalid data schema header",
				Kind: errors.HeaderInvalid{
					HeaderName:  ceDataSchema,
					HeaderValue: ds,
				},
				Nested: err,
			}
		}
		if ce.DataSchema.Scheme == "" {
			return nil, &errors.Client{
				Message: "cloud event data schema URI not absolute",
				Kind: errors.HeaderInvalid{
					HeaderName:  ceDataSchema,
					HeaderValue: ds,
				},
			}
		}
	}

	ce.Subject = msg.Metadata[ceSubject]

	if t, ok := msg.Metadata[ceTime]; ok {
		ce.Time, err = iso8601.ParseString(t)
		if err != nil {
			return nil, &errors.Client{
				Message: "cloud event invalid time header",
				Kind: errors.HeaderInvalid{
					HeaderName:  ceTime,
					HeaderValue: t,
				},
				Nested: err,
			}
		}
	}

	return ce, nil
}