fc/function.go (190 lines of code) (raw):

// Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Copyright 2021 Alibaba Group Holding Limited. All Rights Reserved. package fc import ( "context" "errors" "fmt" "log" "net/http" "time" "github.com/aliyun/fc-runtime-go-sdk/fc/messages" "github.com/aliyun/fc-runtime-go-sdk/fccontext" ) type functionType uint8 const ( // function type, do not modify!! handleFunction functionType = 0 initializerFunction functionType = 1 preStopFunction functionType = 2 preFreezeFunction functionType = 3 // base function type eventFunction functionType = 101 httpFunction functionType = 102 ) // Function struct which wrap the Handler type Function struct { ctx context.Context funcType functionType handler Handler httpHandler HttpHandler // life cycle handler initializeHandler LifeCycleHandler preFreezeHandler LifeCycleHandler preStopHandler LifeCycleHandler } // NewFunction which creates a Function with a given Handler func NewFunction(handler interface{}, funcType functionType) *Function { f := &Function{ funcType: funcType, } if f.funcType == eventFunction { f.handler = NewHandler(handler) } else { f.httpHandler = NewHttpHandler(handler) } return f } // RegistryLifeCycleHandler ... func (fn *Function) RegistryLifeCycleHandler(lifeCycleHandlers []handlerWrapper) { for _, item := range lifeCycleHandlers { switch item.funcType { case initializerFunction: fn.initializeHandler = NewLifeCycleHandler(item.handler) case preFreezeFunction: fn.preFreezeHandler = NewLifeCycleHandler(item.handler) case preStopFunction: fn.preStopHandler = NewLifeCycleHandler(item.handler) default: // TODO ... } } } // Ping method which given a PingRequest and a PingResponse parses the PingResponse func (fn *Function) Ping(req *messages.PingRequest, response *messages.PingResponse) error { *response = messages.PingResponse{} return nil } // Invoke method try to perform a command given an InvokeRequest and an InvokeResponse func (fn *Function) Invoke(req *messages.InvokeRequest, response *messages.InvokeResponse, invokeFuncType functionType) (err error) { defer func() { if e := recover(); e != nil { response.Error = fcPanicResponse(e) fn.printPanicLog(req.RequestId, response.Error.ToJson()) fn.printEndLog(invokeFuncType, req.RequestId, false) err = fmt.Errorf("%v", e) } else if response.Error != nil { fn.printPanicLog(req.RequestId, response.Error.ToJson()) fn.printEndLog(invokeFuncType, req.RequestId, false) } else { fn.printEndLog(invokeFuncType, req.RequestId, true) } }() deadline := time.Unix(req.Deadline.Seconds, req.Deadline.Nanos).UTC() invokeContext, cancel := context.WithDeadline(fn.context(), deadline) defer cancel() lc := &req.Context lc.RequestID = req.RequestId invokeContext = fccontext.NewContext(invokeContext, lc) fn.printStartLog(invokeFuncType, req.RequestId) if invokeFuncType == initializerFunction { if fn.initializeHandler == nil { fn.initializeHandler = errorLifeCycleHandler(errors.New("no initializer handler registered")) } fn.initializeHandler.Invoke(invokeContext) return nil } if invokeFuncType == preFreezeFunction { if fn.preFreezeHandler == nil { fn.preFreezeHandler = errorLifeCycleHandler(errors.New("no prefreeze handler registered")) } fn.preFreezeHandler.Invoke(invokeContext) return nil } if invokeFuncType == preStopFunction { if fn.preStopHandler == nil { fn.preStopHandler = errorLifeCycleHandler(errors.New("no prestop handler registered")) } fn.preStopHandler.Invoke(invokeContext) response.Payload = []byte{} return nil } if fn.funcType == eventFunction { return fn.invokeEventFunc(invokeContext, req.Payload, response) } return fn.invokeHttpFunc(invokeContext, req.HttpParams, req.Payload, response) } func (fn *Function) invokeHttpFunc(invokeContext context.Context, httpParams *string, reqPayload []byte, response *messages.InvokeResponse) error { if httpParams == nil { handler := errorHttpHandler(fmt.Errorf("no httpParams found in request")) err := handler(invokeContext, newFcResponse(&http.Request{}), &http.Request{}) response.Error = fcErrorResponse(err) return nil } req, err := genHttpRequest(*httpParams, reqPayload) if err != nil { response.Error = fcErrorResponse(err) return nil } resp := newFcResponse(req) err = fn.httpHandler.Invoke(invokeContext, resp, req) if err != nil { response.Error = fcErrorResponse(err) return nil } response.Payload = resp.Body() response.HttpParam, err = resp.HttpParam() if err != nil { response.Error = fcErrorResponse(err) return nil } return nil } func (fn *Function) invokeEventFunc(invokeContext context.Context, reqPayload []byte, response *messages.InvokeResponse) error { respPayload, err := fn.handler.Invoke(invokeContext, reqPayload) if err != nil { response.Error = fcErrorResponse(err) return nil } response.Payload = respPayload return nil } // context returns the base context used for the fn. func (fn *Function) context() context.Context { if fn.ctx == nil { return context.Background() } return fn.ctx } // withContext returns a shallow copy of Function with its context changed // to the provided ctx. If the provided ctx is non-nil a Background context is set. func (fn *Function) withContext(ctx context.Context) *Function { if ctx == nil { ctx = context.Background() } fn2 := new(Function) *fn2 = *fn fn2.ctx = ctx return fn2 } func (fn *Function) printPanicLog(requestId, errorMessage string) { if !enableInvokePanicLog { return } log.Printf(" %s [ERROR] %s\n", requestId, errorMessage) } func (fn *Function) printEndLog(funcType functionType, requestId string, isHandled bool) { suffix := "" if !isHandled { suffix = ", Error: Unhandled function error" } switch funcType { case initializerFunction: fmt.Printf("FC Initialize End RequestId: %s%s\n", requestId, suffix) case preStopFunction: fmt.Printf("FC PreStop End RequestId: %s%s\n", requestId, suffix) case preFreezeFunction: fmt.Printf("FC PreFreeze End RequestId: %s%s\n", requestId, suffix) default: fmt.Printf("FC Invoke End RequestId: %s%s\n", requestId, suffix) } } func (fn *Function) printStartLog(funcType functionType, requestId string) { switch funcType { case initializerFunction: fmt.Printf("FC Initialize Start RequestId: %s\n", requestId) case preStopFunction: fmt.Printf("FC PreStop Start RequestId: %s\n", requestId) case preFreezeFunction: fmt.Printf("FC PreFreeze Start RequestId: %s\n", requestId) default: fmt.Printf("FC Invoke Start RequestId: %s\n", requestId) } }