in funcframework/events.go [288:398]
func convertBackgroundToCloudEventRequest(r *http.Request) error {
body, err := readHTTPRequestBody(r)
if err != nil {
return err
}
md, d, err := getBackgroundEvent(body, r.URL.Path)
if err != nil {
return fmt.Errorf("parsing background event body %s: %v", string(body), err)
}
if md == nil || d == nil {
return fmt.Errorf("unable to extract background event from %s", string(body))
}
r.Header.Set(contentTypeHeader, jsonContentType)
t, ok := typeBackgroundToCloudEvent[md.EventType]
if !ok {
return fmt.Errorf("unable to find CloudEvent equivalent event type for %s", md.EventType)
}
service := md.Resource.Service
if service == "" {
for bService, ceService := range serviceBackgroundToCloudEvent {
if strings.HasPrefix(md.EventType, bService) {
service = ceService
}
}
// If service is still empty, we didn't find a match in the map. Return the error.
if service == "" {
return fmt.Errorf("unable to find CloudEvent equivalent service for %s", md.EventType)
}
}
resource := md.Resource.Name
if resource == "" {
resource = md.Resource.RawPath
}
var subject string
resource, subject, err = splitResource(service, resource)
if err != nil {
return err
}
ce := map[string]interface{}{
"id": md.EventID,
"time": md.Timestamp,
"specversion": ceSpecVersion,
"datacontenttype": "application/json",
"type": t,
"source": fmt.Sprintf("//%s/%s", service, resource),
"data": d,
}
if subject != "" {
ce["subject"] = subject
}
switch service {
case pubSubCEService:
data, ok := d.(map[string]interface{})
if !ok {
return fmt.Errorf(`invalid "data" field in event payload, "data": %q`, d)
}
data["publishTime"] = md.Timestamp
data["messageId"] = md.EventID
// In a Pub/Sub CloudEvent "data" is wrapped by "message".
ce["data"] = struct {
Message interface{} `json:"message"`
}{
Message: data,
}
case firebaseAuthCEService:
convertBackgroundFirebaseAuthMetadata(d)
if s, err := firebaseAuthSubject(d); err == nil && s != "" {
ce["subject"] = s
}
case firebaseDBCEService:
var dbDomain struct {
Domain string `json:"domain"`
}
if err := json.Unmarshal(body, &dbDomain); err != nil {
return fmt.Errorf("unable to unmarshal %q domain from event payload %q: %v", firebaseDBCEService, string(body), err)
}
location := "us-central1"
if dbDomain.Domain != "firebaseio.com" {
domainSplit := strings.SplitN(dbDomain.Domain, ".", 2)
if len(domainSplit) != 2 {
return fmt.Errorf("invalid %q domain: %q", firebaseDBCEService, dbDomain.Domain)
}
location = domainSplit[0]
}
ce["source"] = fmt.Sprintf("//%s/projects/_/locations/%s/%s", service, location, resource)
}
encoded, err := json.Marshal(ce)
if err != nil {
return fmt.Errorf("unable to marshal CloudEvent %v: %v", ce, err)
}
r.Body = ioutil.NopCloser(bytes.NewReader(encoded))
r.Header.Set(contentLengthHeader, fmt.Sprint(len(encoded)))
return nil
}