fc/invoke_loop.go (168 lines of code) (raw):
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved
// Copyright 2021 Alibaba Group Holding Limited. All Rights Reserved.
package fc
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/aliyun/fc-runtime-go-sdk/fc/messages"
"github.com/aliyun/fc-runtime-go-sdk/fccontext"
)
const (
msPerS = int64(time.Second / time.Millisecond)
nsPerMS = int64(time.Millisecond / time.Nanosecond)
)
type handlerWrapper struct {
handler interface{}
funcType functionType
}
// startRuntimeAPILoop will return an error if handling a particular invoke resulted in a non-recoverable error
// func startRuntimeAPILoop(ctx context.Context, api string, handler interface{}, funcType functionType) error {
func startRuntimeAPILoop(ctx context.Context, api string, baseHandler handlerWrapper, lifeCycleHandlers []handlerWrapper) (e error) {
defer func() {
if r := recover(); r != nil {
e = fmt.Errorf("%v", r)
}
}()
client := newRuntimeAPIClient(api)
function := NewFunction(baseHandler.handler, baseHandler.funcType).withContext(ctx)
function.RegistryLifeCycleHandler(lifeCycleHandlers)
for {
req, err := client.next()
if err != nil {
logPrintf("failed to get invoke request due to %v", err)
continue
}
go func(req *invoke, f *Function) {
err = handleInvoke(req, f)
if err != nil {
logPrintf("failed to invoke function due to %v", err)
}
}(req, function)
}
}
// handleInvoke returns an error if the function panics, or some other non-recoverable error occurred
func handleInvoke(invokeInstance *invoke, function *Function) error {
functionRequest, err := convertInvokeRequest(invokeInstance)
if err != nil {
return fmt.Errorf("unexpected error occurred when parsing the invoke: %v", err)
}
functionResponse := &messages.InvokeResponse{}
ivkErr := function.Invoke(functionRequest, functionResponse, convertInvokeFunctionType(invokeInstance))
if functionResponse.Error != nil {
payload := safeMarshal(functionResponse.Error)
if err := invokeInstance.failure(payload, contentTypeJSON); err != nil {
return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err)
}
if functionResponse.Error.ShouldExit {
return fmt.Errorf("calling the handler function resulted in a panic")
}
return ivkErr
}
if ivkErr != nil {
return ivkErr
}
if err := invokeInstance.success(functionResponse.Payload, contentTypeJSON, functionResponse.HttpParam); err != nil {
return fmt.Errorf("unexpected error occurred when sending the function functionResponse to the API: %v", err)
}
return nil
}
func convertInvokeFunctionType(invokeInstance *invoke) functionType {
funcType, err := strconv.ParseInt(invokeInstance.headers.Get(headerFunctionType), 10, 64)
if err != nil {
return handleFunction
}
switch funcType {
case int64(initializerFunction):
return initializerFunction
case int64(preFreezeFunction):
return preFreezeFunction
case int64(preStopFunction):
return preStopFunction
default:
return handleFunction
}
}
// convertInvokeRequest converts an invoke from the Runtime API, and unpacks it to be compatible with the shape of a `lambda.Function` InvokeRequest.
func convertInvokeRequest(invokeInstance *invoke) (*messages.InvokeRequest, error) {
deadlineEpochMS, err := strconv.ParseInt(invokeInstance.headers.Get(headerDeadlineMS), 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header: %s", headerDeadlineMS)
}
deadlineS := deadlineEpochMS / msPerS
deadlineNS := (deadlineEpochMS % msPerS) * nsPerMS
functionTimeoutSec, err := strconv.Atoi(invokeInstance.headers.Get(headerFunctionTimeout))
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header: %s", headerFunctionTimeout)
}
retryCount := 0
if retryCountStr := invokeInstance.headers.Get(headerRetryCount); retryCountStr != "" {
retryCount, err = strconv.Atoi(retryCountStr)
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header: %s", headerFunctionTimeout)
}
}
spanBaggages := make(map[string]string)
if base64SpanBaggages := invokeInstance.headers.Get(headerOpenTracingSpanBaggages); base64SpanBaggages != "" {
spanBaggagesByte, err := base64.StdEncoding.DecodeString(base64SpanBaggages)
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header %s: %s", headerOpenTracingSpanContext, base64SpanBaggages)
}
if err := json.Unmarshal(spanBaggagesByte, &spanBaggages); err != nil {
return nil, fmt.Errorf("failed to parse contents of header %s: %s", headerOpenTracingSpanContext, base64SpanBaggages)
}
}
res := &messages.InvokeRequest{
RequestId: invokeInstance.id,
Deadline: messages.InvokeRequest_Timestamp{
Seconds: deadlineS,
Nanos: deadlineNS,
},
Payload: invokeInstance.payload,
Context: fccontext.FcContext{
RequestID: invokeInstance.id,
Credentials: fccontext.Credentials{
AccessKeyId: invokeInstance.headers.Get(headerAccessKeyId),
AccessKeySecret: invokeInstance.headers.Get(headerAccessKeySecret),
SecurityToken: invokeInstance.headers.Get(headerSecurityToken),
},
Function: fccontext.Function{
Name: invokeInstance.headers.Get(headerFunctionName),
Handler: invokeInstance.headers.Get(headerFunctionHandler),
Memory: invokeInstance.headers.Get(headerFunctionMemory),
Timeout: functionTimeoutSec,
},
Service: fccontext.Service{
Name: invokeInstance.headers.Get(headerServiceName),
LogProject: invokeInstance.headers.Get(headerServiceLogproject),
LogStore: invokeInstance.headers.Get(headerServiceLogstore),
Qualifier: invokeInstance.headers.Get(headerQualifier),
VersionId: invokeInstance.headers.Get(headerVersionId),
},
Tracing: fccontext.Tracing{
OpenTracingSpanContext: invokeInstance.headers.Get(headerOpenTracingSpanContext),
OpenTracingSpanBaggages: spanBaggages,
JaegerEndpoint: invokeInstance.headers.Get(headerJaegerEndpoint),
},
Region: invokeInstance.headers.Get(headerRegion),
AccountId: invokeInstance.headers.Get(headerAccountId),
RetryCount: retryCount,
},
}
if httpParams := invokeInstance.headers.Get(headerHttpParams); httpParams != "" {
res.HttpParams = &httpParams
}
return res, nil
}
func safeMarshal(v interface{}) []byte {
payload, err := json.Marshal(v)
if err != nil {
v := &messages.InvokeResponse_Error{
Type: "Runtime.SerializationError",
Message: err.Error(),
}
payload, err := json.Marshal(v)
if err != nil {
panic(err) // never reach
}
return payload
}
return payload
}