func()

in go/protocol/telemetry_cloud_event.go [93:190]


func (ce *CloudEvent) toMessage(msg *mqtt.Message) error {
	// Cloud events were not specified; just bail out.
	if ce == nil {
		return nil
	}

	for _, key := range ceReserved {
		if _, ok := msg.UserProperties[key]; ok {
			return &errors.Client{
				Message: "metadata key reserved for cloud event",
				Kind: errors.ConfigurationInvalid{
					PropertyName:  "Metadata",
					PropertyValue: key,
				},
			}
		}
	}

	if ce.ID != "" {
		msg.UserProperties[ceID] = ce.ID
	} else {
		id, err := errutil.NewUUID()
		if err != nil {
			return err
		}
		msg.UserProperties[ceID] = id
	}

	// We have reasonable defaults for all other values; source, however, is
	// both required and something the caller must specify.
	if ce.Source == nil {
		return &errors.Client{
			Message: "source must be defined",
			Kind: errors.ConfigurationInvalid{
				PropertyName: "CloudEvent",
			},
		}
	}
	msg.UserProperties[ceSource] = ce.Source.String()

	if ce.SpecVersion != "" {
		msg.UserProperties[ceSpecVersion] = ce.SpecVersion
	} else {
		msg.UserProperties[ceSpecVersion] = DefaultCloudEventSpecVersion
	}

	if ce.Type != "" {
		msg.UserProperties[ceType] = ce.Type
	} else {
		msg.UserProperties[ceType] = DefaultCloudEventType
	}

	if ce.DataContentType != "" && ce.DataContentType != msg.ContentType {
		return &errors.Client{
			Message: "cloud event content type mismatch",
			Kind: errors.ConfigurationInvalid{
				PropertyName:  "DataContentType",
				PropertyValue: ce.DataContentType,
			},
		}
	}

	if !contentTypeRegex.MatchString(msg.ContentType) {
		return &errors.Client{
			Message: "cloud event invalid content type",
			Kind: errors.ConfigurationInvalid{
				PropertyName:  "DataContentType",
				PropertyValue: msg.ContentType,
			},
		}
	}

	if ce.DataSchema != nil {
		if ce.DataSchema.Scheme == "" {
			return &errors.Client{
				Message: "cloud event data schema URI not absolute",
				Kind: errors.ConfigurationInvalid{
					PropertyName: "CloudEvent",
				},
			}
		}
		msg.UserProperties[ceDataSchema] = ce.DataSchema.String()
	}

	if ce.Subject != "" {
		msg.UserProperties[ceSubject] = ce.Subject
	} else {
		msg.UserProperties[ceSubject] = msg.Topic
	}

	if !ce.Time.IsZero() {
		msg.UserProperties[ceTime] = ce.Time.Format(time.RFC3339)
	} else {
		msg.UserProperties[ceTime] = time.Now().UTC().Format(time.RFC3339)
	}

	return nil
}