func convertBackgroundToCloudEventRequest()

in funcframework/events.go [288:398]


func convertBackgroundToCloudEventRequest(r *http.Request) error {
	body, err := readHTTPRequestBody(r)
	if err != nil {
		return err
	}

	md, d, err := getBackgroundEvent(body, r.URL.Path)
	if err != nil {
		return fmt.Errorf("parsing background event body %s: %v", string(body), err)
	}

	if md == nil || d == nil {
		return fmt.Errorf("unable to extract background event from %s", string(body))
	}

	r.Header.Set(contentTypeHeader, jsonContentType)

	t, ok := typeBackgroundToCloudEvent[md.EventType]
	if !ok {
		return fmt.Errorf("unable to find CloudEvent equivalent event type for %s", md.EventType)
	}

	service := md.Resource.Service
	if service == "" {
		for bService, ceService := range serviceBackgroundToCloudEvent {
			if strings.HasPrefix(md.EventType, bService) {
				service = ceService
			}
		}
		// If service is still empty, we didn't find a match in the map. Return the error.
		if service == "" {
			return fmt.Errorf("unable to find CloudEvent equivalent service for %s", md.EventType)
		}
	}

	resource := md.Resource.Name
	if resource == "" {
		resource = md.Resource.RawPath
	}

	var subject string
	resource, subject, err = splitResource(service, resource)
	if err != nil {
		return err
	}

	ce := map[string]interface{}{
		"id":              md.EventID,
		"time":            md.Timestamp,
		"specversion":     ceSpecVersion,
		"datacontenttype": "application/json",
		"type":            t,
		"source":          fmt.Sprintf("//%s/%s", service, resource),
		"data":            d,
	}

	if subject != "" {
		ce["subject"] = subject
	}

	switch service {
	case pubSubCEService:
		data, ok := d.(map[string]interface{})
		if !ok {
			return fmt.Errorf(`invalid "data" field in event payload, "data": %q`, d)
		}

		data["publishTime"] = md.Timestamp
		data["messageId"] = md.EventID

		// In a Pub/Sub CloudEvent "data" is wrapped by "message".
		ce["data"] = struct {
			Message interface{} `json:"message"`
		}{
			Message: data,
		}
	case firebaseAuthCEService:
		convertBackgroundFirebaseAuthMetadata(d)

		if s, err := firebaseAuthSubject(d); err == nil && s != "" {
			ce["subject"] = s
		}
	case firebaseDBCEService:
		var dbDomain struct {
			Domain string `json:"domain"`
		}
		if err := json.Unmarshal(body, &dbDomain); err != nil {
			return fmt.Errorf("unable to unmarshal %q domain from event payload %q: %v", firebaseDBCEService, string(body), err)
		}

		location := "us-central1"
		if dbDomain.Domain != "firebaseio.com" {
			domainSplit := strings.SplitN(dbDomain.Domain, ".", 2)
			if len(domainSplit) != 2 {
				return fmt.Errorf("invalid %q domain: %q", firebaseDBCEService, dbDomain.Domain)
			}
			location = domainSplit[0]
		}

		ce["source"] = fmt.Sprintf("//%s/projects/_/locations/%s/%s", service, location, resource)
	}

	encoded, err := json.Marshal(ce)
	if err != nil {
		return fmt.Errorf("unable to marshal CloudEvent %v: %v", ce, err)
	}

	r.Body = ioutil.NopCloser(bytes.NewReader(encoded))
	r.Header.Set(contentLengthHeader, fmt.Sprint(len(encoded)))
	return nil
}