funcframework/events.go (419 lines of code) (raw):
package funcframework
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"reflect"
"regexp"
"strings"
"cloud.google.com/go/functions/metadata"
"github.com/GoogleCloudPlatform/functions-framework-go/internal/events/pubsub"
"github.com/GoogleCloudPlatform/functions-framework-go/internal/fftypes"
)
const (
ceIDHeader = "Ce-Id"
contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length"
ceSpecVersion = "1.0"
jsonContentType = "application/cloudevents+json"
firebaseAuthCEService = "firebaseauth.googleapis.com"
firebaseCEService = "firebase.googleapis.com"
firebaseDBCEService = "firebasedatabase.googleapis.com"
firestoreCEService = "firestore.googleapis.com"
pubSubCEService = "pubsub.googleapis.com"
storageCEService = "storage.googleapis.com"
pubsubMessageType = "type.googleapis.com/google.pubsub.v1.PubsubMessage"
)
var (
typeBackgroundToCloudEvent = map[string]string{
"google.pubsub.topic.publish": "google.cloud.pubsub.topic.v1.messagePublished",
"providers/cloud.pubsub/eventTypes/topic.publish": "google.cloud.pubsub.topic.v1.messagePublished",
"google.storage.object.finalize": "google.cloud.storage.object.v1.finalized",
"google.storage.object.delete": "google.cloud.storage.object.v1.deleted",
"google.storage.object.archive": "google.cloud.storage.object.v1.archived",
"google.storage.object.metadataUpdate": "google.cloud.storage.object.v1.metadataUpdated",
"providers/cloud.firestore/eventTypes/document.write": "google.cloud.firestore.document.v1.written",
"providers/cloud.firestore/eventTypes/document.create": "google.cloud.firestore.document.v1.created",
"providers/cloud.firestore/eventTypes/document.update": "google.cloud.firestore.document.v1.updated",
"providers/cloud.firestore/eventTypes/document.delete": "google.cloud.firestore.document.v1.deleted",
"providers/firebase.auth/eventTypes/user.create": "google.firebase.auth.user.v1.created",
"providers/firebase.auth/eventTypes/user.delete": "google.firebase.auth.user.v1.deleted",
"providers/google.firebase.analytics/eventTypes/event.log": "google.firebase.analytics.log.v1.written",
"providers/google.firebase.database/eventTypes/ref.create": "google.firebase.database.ref.v1.created",
"providers/google.firebase.database/eventTypes/ref.write": "google.firebase.database.ref.v1.written",
"providers/google.firebase.database/eventTypes/ref.update": "google.firebase.database.ref.v1.updated",
"providers/google.firebase.database/eventTypes/ref.delete": "google.firebase.database.ref.v1.deleted",
"providers/cloud.storage/eventTypes/object.change": "google.cloud.storage.object.v1.finalized",
}
typeCloudToBackgroundEvent = map[string]string{
"google.cloud.pubsub.topic.v1.messagePublished": "google.pubsub.topic.publish",
"google.cloud.storage.object.v1.finalized": "google.storage.object.finalize",
"google.cloud.storage.object.v1.deleted": "google.storage.object.delete",
"google.cloud.storage.object.v1.archived": "google.storage.object.archive",
"google.cloud.storage.object.v1.metadataUpdated": "google.storage.object.metadataUpdate",
"google.cloud.firestore.document.v1.written": "providers/cloud.firestore/eventTypes/document.write",
"google.cloud.firestore.document.v1.created": "providers/cloud.firestore/eventTypes/document.create",
"google.cloud.firestore.document.v1.updated": "providers/cloud.firestore/eventTypes/document.update",
"google.cloud.firestore.document.v1.deleted": "providers/cloud.firestore/eventTypes/document.delete",
"google.firebase.auth.user.v1.created": "providers/firebase.auth/eventTypes/user.create",
"google.firebase.auth.user.v1.deleted": "providers/firebase.auth/eventTypes/user.delete",
"google.firebase.analytics.log.v1.written": "providers/google.firebase.analytics/eventTypes/event.log",
"google.firebase.database.ref.v1.created": "providers/google.firebase.database/eventTypes/ref.create",
"google.firebase.database.ref.v1.written": "providers/google.firebase.database/eventTypes/ref.write",
"google.firebase.database.ref.v1.updated": "providers/google.firebase.database/eventTypes/ref.update",
"google.firebase.database.ref.v1.deleted": "providers/google.firebase.database/eventTypes/ref.delete",
}
serviceBackgroundToCloudEvent = map[string]string{
"providers/cloud.firestore/": firestoreCEService,
"providers/google.firebase.analytics/": firebaseCEService,
"providers/firebase.auth/": firebaseAuthCEService,
"providers/google.firebase.database/": firebaseDBCEService,
"providers/cloud.pubsub/": pubSubCEService,
"providers/cloud.storage/": storageCEService,
"google.pubsub": pubSubCEService,
"google.storage": storageCEService,
}
// ceServiceToResourceRe maps CloudEvent service strings to regexps used to split
// a background event resource string into CloudEvent resource and subject strings.
// Each regexp must have exactly two submatches (a.k.a. capture groups): the first
// for the resource and the second for the subject. See splitResource for more info.
ceServiceToResourceRe = map[string]*regexp.Regexp{
firebaseCEService: regexp.MustCompile("^(projects/[^/]+)/(events/[^/]+)$"),
firebaseDBCEService: regexp.MustCompile("^projects/_/(instances/[^/]+)/(refs/.+)$"),
firestoreCEService: regexp.MustCompile("^(projects/[^/]+/databases/\\(default\\))/(documents/.+)$"),
storageCEService: regexp.MustCompile("^(projects/_/buckets/[^/]+)/(objects/.+)$"),
}
// firebaseAuthMetadataFieldsBackgroundToCloudEvent maps Firebase Auth background event metadata field
// names to their equivalent CloudEvent field names.
firebaseAuthMetadataFieldsBackgroundToCloudEvent = map[string]string{
"createdAt": "createTime",
"lastSignedInAt": "lastSignInTime",
}
)
func getBackgroundEvent(body []byte, path string) (*metadata.Metadata, interface{}, error) {
// Known background event types that the incoming request could represent.
// Event types are mutually exclusive. During unmarshalling, only the field
// for the matching type is populated.
type possibleEvents struct {
*pubsub.LegacyPushSubscriptionEvent
*fftypes.BackgroundEvent
}
// Attempt to unmarshal into one of the known background event types.
possible := possibleEvents{}
if err := json.Unmarshal(body, &possible); err != nil {
return nil, nil, err
}
event := possible.BackgroundEvent
// If the background event payload is missing, check if it's a legacy
// Pub/Sub event.
if possible.BackgroundEvent == nil && possible.LegacyPushSubscriptionEvent != nil {
topic, err := pubsub.ExtractTopicFromRequestPath(path)
if err != nil {
fmt.Printf("WARNING: %s", err)
}
event = possible.LegacyPushSubscriptionEvent.ToBackgroundEvent(topic)
}
// If there is no "data" payload, this isn't a background event, but that's okay.
if event == nil || event.Data == nil {
return nil, nil, nil
}
// If the "context" field was present, we have a complete event and so return.
if event.Metadata != nil {
return event.Metadata, event.Data, nil
}
// Otherwise, try to directly populate a metadata object.
m := &metadata.Metadata{}
if err := json.Unmarshal(body, m); err != nil {
return nil, nil, err
}
// Check for event ID to see if this is a background event, but if not that's okay.
if m.EventID == "" {
return nil, nil, nil
}
return m, event.Data, nil
}
func runBackgroundEvent(w http.ResponseWriter, r *http.Request, m *metadata.Metadata, data, fn interface{}) {
b, err := encodeData(data)
if err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("Unable to encode data %v: %s", data, err.Error()))
return
}
ctx := metadata.NewContext(r.Context(), m)
runUserFunctionWithContext(ctx, w, r, b, fn)
}
func validateEventFunction(fn interface{}) error {
ft := reflect.TypeOf(fn)
if ft.NumIn() != 2 {
return fmt.Errorf("expected function to have two parameters, found %d", ft.NumIn())
}
var err error
errorType := reflect.TypeOf(&err).Elem()
if ft.NumOut() != 1 || !ft.Out(0).AssignableTo(errorType) {
return fmt.Errorf("expected function to return only an error")
}
var ctx context.Context
ctxType := reflect.TypeOf(&ctx).Elem()
if !ctxType.AssignableTo(ft.In(0)) {
return fmt.Errorf("expected first parameter to be context.Context")
}
return nil
}
func convertBackgroundToCloudEvent(ceHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// If the incoming request is not CloudEvent, make it so.
if r.Header.Get(ceIDHeader) == "" && !strings.Contains(r.Header.Get(contentTypeHeader), "cloudevents") {
if err := convertBackgroundToCloudEventRequest(r); err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("%v", err))
return
}
}
r, cancel := setContextTimeoutIfRequested(r)
if cancel != nil {
defer cancel()
}
ceHandler.ServeHTTP(w, r)
})
}
func encodeData(d interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.SetEscapeHTML(false)
if err := enc.Encode(d); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// splitResource takes a background event resource string, which contains the full path to
// a resource, for example:
//
// - Cloud Storage bucket and object within it
// - Datastore database and entry within it
//
// and splits those two elements into separate strings; in CloudEvents the former is the
// "resource" and the latter is the "subject". Splitting is performed based on a regexp
// associated with the given CloudEvent service. See ceServiceToResourceRe for the regexp
// mapping. For example,
//
// "projects/_/buckets/some-bucket/objects/folder/test.txt"
//
// would be split to create the strings "projects/_/buckets/some-bucket"
// and "objects/folder/test.txt". This function returns the resource string, the
// subject string, and an error, which will be non-nil if a regexp failed to match.
// If there is no regexp for the given service then the resource is returned unchanged
// along with a nil error.
func splitResource(service, resource string) (string, string, error) {
re, ok := ceServiceToResourceRe[service]
if !ok {
return resource, "", nil
}
match := re.FindStringSubmatch(resource)
if match == nil {
return resource, "", fmt.Errorf("resource regexp did not match")
}
if len(match) != 3 {
return resource, "", fmt.Errorf("expected 2 match groups, got %v", len(match)-1)
}
return match[1], match[2], nil
}
// convertBackgroundFirebaseAuthMetadata converts Firebase Auth background event metadata to CloudEvent metadata.
// The given data is only modified if it is a map with the requisite keys, so modifications occur in place.
func convertBackgroundFirebaseAuthMetadata(data interface{}) {
d, ok := data.(map[string]interface{})
if !ok {
return
}
if _, ok := d["metadata"]; !ok {
return
}
metadata, ok := d["metadata"].(map[string]interface{})
if !ok {
return
}
for old, new := range firebaseAuthMetadataFieldsBackgroundToCloudEvent {
if _, ok := metadata[old]; ok {
metadata[new] = metadata[old]
delete(metadata, old)
}
}
}
// firebaseAuthSubject creates the CloudEvent subject from the "uid" field in the data.
func firebaseAuthSubject(data interface{}) (string, error) {
d, ok := data.(map[string]interface{})
if !ok {
return "", fmt.Errorf("data is not a map from string to interface")
}
if _, ok := d["uid"]; !ok {
return "", fmt.Errorf("data does not contain field \"uid\"")
}
return fmt.Sprintf("users/%v", d["uid"]), nil
}
func convertBackgroundToCloudEventRequest(r *http.Request) error {
body, err := readHTTPRequestBody(r)
if err != nil {
return err
}
md, d, err := getBackgroundEvent(body, r.URL.Path)
if err != nil {
return fmt.Errorf("parsing background event body %s: %v", string(body), err)
}
if md == nil || d == nil {
return fmt.Errorf("unable to extract background event from %s", string(body))
}
r.Header.Set(contentTypeHeader, jsonContentType)
t, ok := typeBackgroundToCloudEvent[md.EventType]
if !ok {
return fmt.Errorf("unable to find CloudEvent equivalent event type for %s", md.EventType)
}
service := md.Resource.Service
if service == "" {
for bService, ceService := range serviceBackgroundToCloudEvent {
if strings.HasPrefix(md.EventType, bService) {
service = ceService
}
}
// If service is still empty, we didn't find a match in the map. Return the error.
if service == "" {
return fmt.Errorf("unable to find CloudEvent equivalent service for %s", md.EventType)
}
}
resource := md.Resource.Name
if resource == "" {
resource = md.Resource.RawPath
}
var subject string
resource, subject, err = splitResource(service, resource)
if err != nil {
return err
}
ce := map[string]interface{}{
"id": md.EventID,
"time": md.Timestamp,
"specversion": ceSpecVersion,
"datacontenttype": "application/json",
"type": t,
"source": fmt.Sprintf("//%s/%s", service, resource),
"data": d,
}
if subject != "" {
ce["subject"] = subject
}
switch service {
case pubSubCEService:
data, ok := d.(map[string]interface{})
if !ok {
return fmt.Errorf(`invalid "data" field in event payload, "data": %q`, d)
}
data["publishTime"] = md.Timestamp
data["messageId"] = md.EventID
// In a Pub/Sub CloudEvent "data" is wrapped by "message".
ce["data"] = struct {
Message interface{} `json:"message"`
}{
Message: data,
}
case firebaseAuthCEService:
convertBackgroundFirebaseAuthMetadata(d)
if s, err := firebaseAuthSubject(d); err == nil && s != "" {
ce["subject"] = s
}
case firebaseDBCEService:
var dbDomain struct {
Domain string `json:"domain"`
}
if err := json.Unmarshal(body, &dbDomain); err != nil {
return fmt.Errorf("unable to unmarshal %q domain from event payload %q: %v", firebaseDBCEService, string(body), err)
}
location := "us-central1"
if dbDomain.Domain != "firebaseio.com" {
domainSplit := strings.SplitN(dbDomain.Domain, ".", 2)
if len(domainSplit) != 2 {
return fmt.Errorf("invalid %q domain: %q", firebaseDBCEService, dbDomain.Domain)
}
location = domainSplit[0]
}
ce["source"] = fmt.Sprintf("//%s/projects/_/locations/%s/%s", service, location, resource)
}
encoded, err := json.Marshal(ce)
if err != nil {
return fmt.Errorf("unable to marshal CloudEvent %v: %v", ce, err)
}
r.Body = ioutil.NopCloser(bytes.NewReader(encoded))
r.Header.Set(contentLengthHeader, fmt.Sprint(len(encoded)))
return nil
}
func shouldConvertCloudEventToBackgroundRequest(r *http.Request) bool {
_, ok := typeCloudToBackgroundEvent[r.Header.Get("ce-type")]
return ok &&
r.Header.Get("ce-source") != "" &&
r.Header.Get("ce-specversion") != "" &&
r.Header.Get("ce-id") != ""
}
func convertCloudEventToBackgroundRequest(r *http.Request) error {
body, err := readHTTPRequestBody(r)
if err != nil {
return err
}
var data map[string]interface{}
if err := json.Unmarshal(body, &data); err != nil {
return fmt.Errorf("unable to unmarshal CloudEvent data: %s, error: %v", string(body), err)
}
ceCtx := struct {
Type string
Source string
Subject string
Id string
Time string
}{
Type: r.Header.Get("ce-type"),
Source: r.Header.Get("ce-source"),
Subject: r.Header.Get("ce-subject"),
Id: r.Header.Get("ce-id"),
Time: r.Header.Get("ce-time"),
}
eventType, ok := typeCloudToBackgroundEvent[ceCtx.Type]
if !ok {
return fmt.Errorf("incoming event has unsupported event type: %q", ceCtx.Type)
}
/*
Ex 1: "//firebaseauth.googleapis.com/projects/my-project-id"
matches: ["//firebaseauth.googleapis.com/projects/my-project-id",
"firebaseauth.googleapis.com", "projects/my-project-id"]
Ex 2: "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test"
matches: ["//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
"pubsub.googleapis.com", "projects/sample-project/topics/gcf-test"]
*/
matches := regexp.MustCompile(`//([^/]+)/(.+)`).FindStringSubmatch(ceCtx.Source)
if len(matches) != 3 {
return fmt.Errorf("unable to parse CloudEvent source into resource service and name: %q", ceCtx.Source)
}
// 0th match is the entire input string
service := matches[1]
name := matches[2]
resource := fmt.Sprintf("%s/%s", name, ceCtx.Subject)
// Use custom metadata struct to control the exact formatting when
// fields are serialized to JSON.
type Metadata struct {
EventID string `json:"eventId"`
Timestamp string `json:"timestamp"`
EventType string `json:"eventType"`
// Resource can be a single string or a struct, depending on the
// event type.
Resource interface{} `json:"resource"`
}
type backgroundEvent struct {
Data map[string]interface{} `json:"data"`
Metadata `json:"context"`
}
be := backgroundEvent{
Data: data,
Metadata: Metadata{
EventID: ceCtx.Id,
Timestamp: ceCtx.Time,
EventType: eventType,
Resource: resource,
},
}
type splitResource struct {
Name string `json:"name"`
Service string `json:"service"`
Type interface{} `json:"type"`
}
switch service {
case pubSubCEService:
be.Resource = splitResource{
Name: name,
Service: service,
Type: pubsubMessageType,
}
// Lift the "message" field into the main "data" field.
if message, ok := be.Data["message"]; ok {
if md, ok := message.(map[string]interface{}); ok {
be.Data = md
}
}
delete(be.Data, "messageId")
delete(be.Data, "publishTime")
case firebaseAuthCEService:
be.Metadata.Resource = name
// Some keys in the metadata are inconsistent between CloudEvents
// and Background Events.
if metadata, ok := be.Data["metadata"]; ok {
if md, ok := metadata.(map[string]interface{}); ok {
if createTime, ok := md["createTime"]; ok {
md["createdAt"] = createTime
delete(md, "createTime")
}
if lastSignInTime, ok := md["lastSignInTime"]; ok {
md["lastSignedInAt"] = lastSignInTime
delete(md, "lastSignInTime")
}
}
}
case firebaseDBCEService:
be.Resource = regexp.MustCompile(`/locations/[^/]+`).ReplaceAllString(resource, "")
case storageCEService:
splitRes := splitResource{
Name: resource,
Service: service,
}
if dataKind, ok := be.Data["kind"]; ok {
splitRes.Type = dataKind
}
be.Resource = splitRes
}
encoded, err := json.Marshal(be)
if err != nil {
return fmt.Errorf("unable to marshal Background event %v: %v", be, err)
}
r.Body = ioutil.NopCloser(bytes.NewReader(encoded))
r.Header.Set(contentTypeHeader, jsonContentType)
r.Header.Set(contentLengthHeader, fmt.Sprint(len(encoded)))
return nil
}