funcframework/framework.go (322 lines of code) (raw):
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package funcframework is a Functions Framework implementation for Go. It allows you to register
// HTTP and event functions, then start an HTTP server serving those functions.
package funcframework
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"reflect"
"runtime/debug"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/functions-framework-go/internal/registry"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
const (
functionStatusHeader = "X-Google-Status"
crashStatus = "crash"
errorStatus = "error"
panicMessageTmpl = "A panic occurred during %s. Please see logs for more details."
fnErrorMessageStderrTmpl = "Function error: %v"
)
var errorType = reflect.TypeOf((*error)(nil)).Elem()
// recoverPanic recovers from a panic in a consistent manner. panicSrc should
// describe what was happening when the panic was encountered, for example
// "user function execution". w is an http.ResponseWriter to write a generic
// response body to that does not expose the details of the panic; w can be
// nil to skip this. If panic needs to be recovered by different caller
// set shouldPanic to true.
func recoverPanic(w http.ResponseWriter, panicSrc string, shouldPanic bool) {
if r := recover(); r != nil {
genericMsg := fmt.Sprintf(panicMessageTmpl, panicSrc)
fmt.Fprintf(os.Stderr, "%s\npanic message: %v\nstack trace: %v\n%s", genericMsg, r, r, debug.Stack())
if w != nil {
writeHTTPErrorResponse(w, http.StatusInternalServerError, crashStatus, genericMsg)
}
if shouldPanic {
panic(r)
}
}
}
// RegisterHTTPFunction registers fn as an HTTP function.
// Maintained for backward compatibility. Please use RegisterHTTPFunctionContext instead.
func RegisterHTTPFunction(path string, fn interface{}) {
defer recoverPanic(nil, "function registration", false)
fnHTTP, ok := fn.(func(http.ResponseWriter, *http.Request))
if !ok {
panic("expected function to have signature func(http.ResponseWriter, *http.Request)")
}
ctx := context.Background()
if err := RegisterHTTPFunctionContext(ctx, path, fnHTTP); err != nil {
panic(fmt.Sprintf("unexpected error in RegisterEventFunctionContext: %v", err))
}
}
// RegisterEventFunction registers fn as an event function.
// Maintained for backward compatibility. Please use RegisterEventFunctionContext instead.
func RegisterEventFunction(path string, fn interface{}) {
ctx := context.Background()
defer recoverPanic(nil, "function registration", false)
if err := RegisterEventFunctionContext(ctx, path, fn); err != nil {
panic(fmt.Sprintf("unexpected error in RegisterEventFunctionContext: %v", err))
}
}
// RegisterHTTPFunctionContext registers fn as an HTTP function.
func RegisterHTTPFunctionContext(ctx context.Context, path string, fn func(http.ResponseWriter, *http.Request)) error {
return registry.Default().RegisterHTTP(fn, registry.WithPath(path))
}
// RegisterEventFunctionContext registers fn as an event function. The function must have two arguments, a
// context.Context and a struct type depending on the event, and return an error. If fn has the
// wrong signature, RegisterEventFunction returns an error.
func RegisterEventFunctionContext(ctx context.Context, path string, fn interface{}) error {
return registry.Default().RegisterEvent(fn, registry.WithPath(path))
}
// RegisterCloudEventFunctionContext registers fn as an cloudevent function.
func RegisterCloudEventFunctionContext(ctx context.Context, path string, fn func(context.Context, cloudevents.Event) error) error {
return registry.Default().RegisterCloudEvent(fn, registry.WithPath(path))
}
// Start serves an HTTP server with registered function(s).
func Start(port string) error {
return StartHostPort("", port)
}
// StartHostPort serves an HTTP server with registered function(s) on the given host and port.
func StartHostPort(hostname, port string) error {
server, err := initServer()
if err != nil {
return err
}
return http.ListenAndServe(fmt.Sprintf("%s:%s", hostname, port), server)
}
func initServer() (*http.ServeMux, error) {
server := http.NewServeMux()
// If FUNCTION_TARGET is set, only serve this target function at path "/".
// If not set, serve all functions at the registered paths.
if target := os.Getenv("FUNCTION_TARGET"); len(target) > 0 {
var targetFn *registry.RegisteredFunction
fn, ok := registry.Default().GetRegisteredFunction(target)
if ok {
targetFn = fn
} else if lastFnWithoutName := registry.Default().GetLastFunctionWithoutName(); lastFnWithoutName != nil {
// If no function was found with the target name, assume the last function that's not registered declaratively
// should be served at '/'.
targetFn = lastFnWithoutName
} else {
return nil, fmt.Errorf("no matching function found with name: %q", target)
}
h, err := wrapFunction(targetFn)
if err != nil {
return nil, fmt.Errorf("failed to serve function %q: %v", target, err)
}
server.Handle("/", h)
return server, nil
}
fns := registry.Default().GetAllFunctions()
for _, fn := range fns {
h, err := wrapFunction(fn)
if err != nil {
return nil, fmt.Errorf("failed to serve function at path %q: %v", fn.Path, err)
}
server.Handle(fn.Path, h)
}
return server, nil
}
func wrapFunction(fn *registry.RegisteredFunction) (http.Handler, error) {
// Check if we have a function resource set, and if so, log progress.
if os.Getenv("FUNCTION_TARGET") == "" {
fmt.Printf("Serving function: %q\n", fn.Name)
}
if fn.HTTPFn != nil {
handler, err := wrapHTTPFunction(fn.HTTPFn)
if err != nil {
return nil, fmt.Errorf("unexpected error in wrapHTTPFunction: %v", err)
}
return handler, nil
} else if fn.CloudEventFn != nil {
handler, err := wrapCloudEventFunction(context.Background(), fn.CloudEventFn)
if err != nil {
return nil, fmt.Errorf("unexpected error in wrapCloudEventFunction: %v", err)
}
return handler, nil
} else if fn.EventFn != nil {
handler, err := wrapEventFunction(fn.EventFn)
if err != nil {
return nil, fmt.Errorf("unexpected error in wrapEventFunction: %v", err)
}
return handler, nil
} else if fn.TypedFn != nil {
handler, err := wrapTypedFunction(fn.TypedFn)
if err != nil {
return nil, fmt.Errorf("unexpected error in wrapTypedFunction: %v", err)
}
return handler, nil
}
return nil, fmt.Errorf("missing function entry in %v", fn)
}
func wrapHTTPFunction(fn func(http.ResponseWriter, *http.Request)) (http.Handler, error) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if os.Getenv("K_SERVICE") != "" {
// Force flush of logs after every function trigger when running on GCF.
defer fmt.Println()
defer fmt.Fprintln(os.Stderr)
}
r, cancel := setupRequestContext(r)
if cancel != nil {
defer cancel()
}
defer recoverPanic(w, "user function execution", false)
fn(w, r)
}), nil
}
func wrapEventFunction(fn interface{}) (http.Handler, error) {
err := validateEventFunction(fn)
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if os.Getenv("K_SERVICE") != "" {
// Force flush of logs after every function trigger when running on GCF.
defer fmt.Println()
defer fmt.Fprintln(os.Stderr)
}
r, cancel := setupRequestContext(r)
if cancel != nil {
defer cancel()
}
if shouldConvertCloudEventToBackgroundRequest(r) {
if err := convertCloudEventToBackgroundRequest(r); err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("error converting CloudEvent to Background Event: %v", err))
}
}
handleEventFunction(w, r, fn)
}), nil
}
func wrapTypedFunction(fn interface{}) (http.Handler, error) {
inputType, err := validateTypedFunction(fn)
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := readHTTPRequestBody(r)
if err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("%v", err))
return
}
argVal := inputType
if err := json.Unmarshal(body, argVal.Interface()); err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("Error while converting input data. %s", err.Error()))
return
}
defer recoverPanic(w, "user function execution", false)
funcReturn := reflect.ValueOf(fn).Call([]reflect.Value{
argVal.Elem(),
})
handleTypedReturn(w, funcReturn)
}), nil
}
func handleTypedReturn(w http.ResponseWriter, funcReturn []reflect.Value) {
if len(funcReturn) == 0 {
return
}
errorVal := funcReturn[len(funcReturn)-1].Interface() // last return must be of type error
if errorVal != nil && reflect.TypeOf(errorVal).AssignableTo(errorType) {
writeHTTPErrorResponse(w, http.StatusInternalServerError, errorStatus, fmtFunctionError(errorVal))
return
}
firstVal := funcReturn[0].Interface()
if !reflect.TypeOf(firstVal).AssignableTo(errorType) {
returnVal, _ := json.Marshal(firstVal)
fmt.Fprintf(w, string(returnVal))
}
}
func validateTypedFunction(fn interface{}) (*reflect.Value, error) {
ft := reflect.TypeOf(fn)
if ft.NumIn() != 1 {
return nil, fmt.Errorf("expected function to have one parameters, found %d", ft.NumIn())
}
if ft.NumOut() > 2 {
return nil, fmt.Errorf("expected function to have maximum two return values")
}
if ft.NumOut() > 0 && !ft.Out(ft.NumOut()-1).AssignableTo(errorType) {
return nil, fmt.Errorf("expected last return type to be of error")
}
var inputType = reflect.New(ft.In(0))
return &inputType, nil
}
func wrapCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error) (http.Handler, error) {
p, err := cloudevents.NewHTTP()
if err != nil {
return nil, fmt.Errorf("failed to create protocol: %v", err)
}
// Always log errors returned by the function to stderr
logErrFn := func(ctx context.Context, ce cloudevents.Event) error {
defer recoverPanic(nil, "user function execution", true)
err := fn(ctx, ce)
if err != nil {
fmt.Fprintf(os.Stderr, fmtFunctionError(err))
}
return err
}
h, err := cloudevents.NewHTTPReceiveHandler(ctx, p, logErrFn)
if err != nil {
return nil, fmt.Errorf("failed to create handler: %v", err)
}
return convertBackgroundToCloudEvent(h), nil
}
func handleEventFunction(w http.ResponseWriter, r *http.Request, fn interface{}) {
body, err := readHTTPRequestBody(r)
if err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("%v", err))
return
}
// Background events have data and an associated metadata, so parse those and run if present.
if metadata, data, err := getBackgroundEvent(body, r.URL.Path); err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("Error: %s, parsing background event: %s", err.Error(), string(body)))
return
} else if data != nil && metadata != nil {
runBackgroundEvent(w, r, metadata, data, fn)
return
}
// Otherwise, we assume the body is a JSON blob containing the user-specified data structure.
runUserFunction(w, r, body, fn)
}
func readHTTPRequestBody(r *http.Request) ([]byte, error) {
if r.Body == nil {
return nil, fmt.Errorf("request body not found")
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("could not read request body %s: %v", r.Body, err)
}
return body, nil
}
func runUserFunction(w http.ResponseWriter, r *http.Request, data []byte, fn interface{}) {
runUserFunctionWithContext(r.Context(), w, r, data, fn)
}
func runUserFunctionWithContext(ctx context.Context, w http.ResponseWriter, r *http.Request, data []byte, fn interface{}) {
argVal := reflect.New(reflect.TypeOf(fn).In(1))
if err := json.Unmarshal(data, argVal.Interface()); err != nil {
writeHTTPErrorResponse(w, http.StatusBadRequest, crashStatus, fmt.Sprintf("Error: %s, while converting event data: %s", err.Error(), string(data)))
return
}
defer recoverPanic(w, "user function execution", false)
userFunErr := reflect.ValueOf(fn).Call([]reflect.Value{
reflect.ValueOf(ctx),
argVal.Elem(),
})
if userFunErr[0].Interface() != nil {
writeHTTPErrorResponse(w, http.StatusInternalServerError, errorStatus, fmtFunctionError(userFunErr[0].Interface()))
return
}
}
func fmtFunctionError(err interface{}) string {
formatted := fmt.Sprintf(fnErrorMessageStderrTmpl, err)
if !strings.HasSuffix(formatted, "\n") {
formatted += "\n"
}
return formatted
}
func writeHTTPErrorResponse(w http.ResponseWriter, statusCode int, status, msg string) {
// Ensure logs end with a newline otherwise they are grouped incorrectly in SD.
if !strings.HasSuffix(msg, "\n") {
msg += "\n"
}
fmt.Fprint(os.Stderr, msg)
// Flush stdout and stderr when running on GCF. This must be done before writing
// the HTTP response in order for all logs to appear in GCF.
if os.Getenv("K_SERVICE") != "" {
fmt.Println()
fmt.Fprintln(os.Stderr)
}
w.Header().Set(functionStatusHeader, status)
w.WriteHeader(statusCode)
fmt.Fprint(w, msg)
}
func setupRequestContext(r *http.Request) (*http.Request, func()) {
r, cancel := setContextTimeoutIfRequested(r)
r = addLoggingIDsToRequest(r)
return r, cancel
}
// setContextTimeoutIfRequested replaces the request's context with a cancellation if requested
func setContextTimeoutIfRequested(r *http.Request) (*http.Request, func()) {
timeoutStr := os.Getenv("CLOUD_RUN_TIMEOUT_SECONDS")
if timeoutStr == "" {
return r, nil
}
timeoutSecs, err := strconv.Atoi(timeoutStr)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not parse CLOUD_RUN_TIMEOUT_SECONDS as an integer value in seconds: %v\n", err)
return r, nil
}
ctx, cancel := context.WithTimeout(r.Context(), time.Duration(timeoutSecs)*time.Second)
return r.WithContext(ctx), cancel
}