lambda/interop/model.go (269 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package interop import ( "encoding/json" "errors" "fmt" "io" "strings" "time" "go.amzn.com/lambda/core/statejson" "go.amzn.com/lambda/fatalerror" "go.amzn.com/lambda/supervisor/model" log "github.com/sirupsen/logrus" ) // MaxPayloadSize max event body size declared as LAMBDA_EVENT_BODY_SIZE const ( MaxPayloadSize = 6*1024*1024 + 100 // 6 MiB + 100 bytes ResponseBandwidthRate = 2 * 1024 * 1024 // default average rate of 2 MiB/s ResponseBandwidthBurstSize = 6 * 1024 * 1024 // default burst size of 6 MiB MinResponseBandwidthRate = 32 * 1024 // 32 KiB/s MaxResponseBandwidthRate = 64 * 1024 * 1024 // 64 MiB/s MinResponseBandwidthBurstSize = 32 * 1024 // 32 KiB MaxResponseBandwidthBurstSize = 64 * 1024 * 1024 // 64 MiB ) // ResponseMode are top-level constants used in combination with the various types of // modes we have for responses, such as invoke's response mode and function's response mode. // In the future we might have invoke's request mode or similar, so these help set the ground // for consistency. type ResponseMode string const ResponseModeBuffered = "Buffered" const ResponseModeStreaming = "Streaming" type InvokeResponseMode string const InvokeResponseModeBuffered InvokeResponseMode = ResponseModeBuffered const InvokeResponseModeStreaming InvokeResponseMode = ResponseModeStreaming var AllInvokeResponseModes = []string{ string(InvokeResponseModeBuffered), string(InvokeResponseModeStreaming), } // FunctionResponseMode is passed by Runtime to tell whether the response should be // streamed or not. type FunctionResponseMode string const FunctionResponseModeBuffered FunctionResponseMode = ResponseModeBuffered const FunctionResponseModeStreaming FunctionResponseMode = ResponseModeStreaming var AllFunctionResponseModes = []string{ string(FunctionResponseModeBuffered), string(FunctionResponseModeStreaming), } // TODO: move to directinvoke.go as we're trying to deprecate interop.* package // ConvertToFunctionResponseMode converts the given string to a FunctionResponseMode // It is case insensitive and if there is no match, an error is thrown. func ConvertToFunctionResponseMode(value string) (FunctionResponseMode, error) { // buffered if strings.EqualFold(value, string(FunctionResponseModeBuffered)) { return FunctionResponseModeBuffered, nil } // streaming if strings.EqualFold(value, string(FunctionResponseModeStreaming)) { return FunctionResponseModeStreaming, nil } // unknown allowedValues := strings.Join(AllFunctionResponseModes, ", ") log.Errorf("Unlable to map %s to %s.", value, allowedValues) return "", ErrInvalidFunctionResponseMode } // Message is a generic interop message. type Message interface{} // Invoke is an invocation request received from the slicer. type Invoke struct { // Tracing header. // https://docs.aws.amazon.com/xray/latest/devguide/xray-concepts.html#xray-concepts-tracingheader TraceID string LambdaSegmentID string ID string InvokedFunctionArn string CognitoIdentityID string CognitoIdentityPoolID string DeadlineNs string ClientContext string ContentType string Payload io.Reader NeedDebugLogs bool ReservationToken string VersionID string InvokeReceivedTime int64 InvokeResponseMetrics *InvokeResponseMetrics InvokeResponseMode InvokeResponseMode RestoreDurationNs int64 // equals 0 for non-snapstart functions RestoreStartTimeMonotime int64 // equals 0 for non-snapstart functions } type Token struct { ReservationToken string InvokeID string VersionID string FunctionTimeout time.Duration InvackDeadlineNs int64 TraceID string LambdaSegmentID string InvokeMetadata string NeedDebugLogs bool RestoreDurationNs int64 RestoreStartTimeMonotime int64 } // InvokeErrorTraceData is used by the tracer to mark segments as being invocation error type InvokeErrorTraceData struct { // Attached to invoke segment ErrorCause json.RawMessage `json:"ErrorCause,omitempty"` } func GetErrorResponseWithFormattedErrorMessage(errorType fatalerror.ErrorType, err error, invokeRequestID string) *ErrorInvokeResponse { var errorMessage string if invokeRequestID != "" { errorMessage = fmt.Sprintf("RequestId: %s Error: %v", invokeRequestID, err) } else { errorMessage = fmt.Sprintf("Error: %v", err) } jsonPayload, err := json.Marshal(FunctionError{ Type: errorType, Message: errorMessage, }) if err != nil { return &ErrorInvokeResponse{ Headers: InvokeResponseHeaders{}, FunctionError: FunctionError{ Type: fatalerror.SandboxFailure, Message: errorMessage, }, Payload: []byte{}, } } headers := InvokeResponseHeaders{} functionError := FunctionError{ Type: errorType, Message: errorMessage, } return &ErrorInvokeResponse{Headers: headers, FunctionError: functionError, Payload: jsonPayload} } // SandboxType identifies sandbox type (PreWarmed vs Classic) type SandboxType string const SandboxPreWarmed SandboxType = "PreWarmed" const SandboxClassic SandboxType = "Classic" // RuntimeInfo contains metadata about the runtime used by the Sandbox type RuntimeInfo struct { ImageJSON string // image config, e.g {\"layers\":[]} Arn string // runtime ARN, e.g. arn:awstest:lambda:us-west-2::runtime:python3.8::alpha Version string // human-readable runtime arn equivalent, e.g. python3.8.v999 } // Captures configuration of the operator and runtime domain // that are only known after INIT is received type DynamicDomainConfig struct { // extra hooks to execute at domain start. Currently used for filesystem and network hooks. // It can be empty. AdditionalStartHooks []model.Hook Mounts []model.Mount //TODO: other dynamic configurations for the domain go here } // Reset message is sent to rapid to initiate reset sequence type Reset struct { Reason string DeadlineNs int64 InvokeResponseMetrics *InvokeResponseMetrics TraceID string LambdaSegmentID string InvokeResponseMode InvokeResponseMode } // Restore message is sent to rapid to restore runtime to make it ready for consecutive invokes type Restore struct { AwsKey string AwsSecret string AwsSession string CredentialsExpiry time.Time RestoreHookTimeoutMs int64 LogStreamName string } type Resync struct { } // Shutdown message is sent to rapid to initiate graceful shutdown type Shutdown struct { DeadlineNs int64 } // Metrics for response status of LogsAPI/TelemetryAPI `/subscribe` calls type TelemetrySubscriptionMetrics map[string]int func MergeSubscriptionMetrics(logsAPIMetrics TelemetrySubscriptionMetrics, telemetryAPIMetrics TelemetrySubscriptionMetrics) TelemetrySubscriptionMetrics { metrics := make(map[string]int) for metric, value := range logsAPIMetrics { metrics[metric] = value } for metric, value := range telemetryAPIMetrics { metrics[metric] += value } return metrics } // InvokeResponseMetrics are produced while sending streaming invoke response to WP type InvokeResponseMetrics struct { // FIXME: this assumes a value in nanoseconds, let's rename it // to StartReadingResponseMonoTimeNs StartReadingResponseMonoTimeMs int64 // Same as the one above FinishReadingResponseMonoTimeMs int64 TimeShapedNs int64 ProducedBytes int64 OutboundThroughputBps int64 // in bytes per second FunctionResponseMode FunctionResponseMode RuntimeCalledResponse bool } func IsResponseStreamingMetrics(metrics *InvokeResponseMetrics) bool { if metrics == nil { return false } return metrics.FunctionResponseMode == FunctionResponseModeStreaming } type DoneMetadataMetricsDimensions struct { InvokeResponseMode InvokeResponseMode } func (dimensions DoneMetadataMetricsDimensions) String() string { var stringDimensions []string if dimensions.InvokeResponseMode != "" { dimension := string("invoke_response_mode=" + dimensions.InvokeResponseMode) stringDimensions = append(stringDimensions, dimension) } return strings.ToLower( strings.Join(stringDimensions, ","), ) } type DoneMetadata struct { NumActiveExtensions int ExtensionsResetMs int64 ExtensionNames string RuntimeRelease string // Metrics for response status of LogsAPI `/subscribe` calls LogsAPIMetrics TelemetrySubscriptionMetrics InvokeRequestReadTimeNs int64 InvokeRequestSizeBytes int64 InvokeCompletionTimeNs int64 InvokeReceivedTime int64 RuntimeReadyTime int64 RuntimeResponseLatencyMs float64 RuntimeTimeThrottledMs int64 RuntimeProducedBytes int64 RuntimeOutboundThroughputBps int64 MetricsDimensions DoneMetadataMetricsDimensions } type Done struct { WaitForExit bool ErrorType fatalerror.ErrorType Meta DoneMetadata } type DoneFail struct { ErrorType fatalerror.ErrorType Meta DoneMetadata } // ErrInvalidInvokeID is returned when invokeID provided in Invoke2 does not match one provided in Token var ErrInvalidInvokeID = fmt.Errorf("ErrInvalidInvokeID") // ErrInvalidReservationToken is returned when reservationToken provided in Invoke2 does not match one provided in Token var ErrInvalidReservationToken = fmt.Errorf("ErrInvalidReservationToken") // ErrInvalidFunctionVersion is returned when functionVersion provided in Invoke2 does not match one provided in Token var ErrInvalidFunctionVersion = fmt.Errorf("ErrInvalidFunctionVersion") // ErrInvalidFunctionResponseMode is returned when the value sent by runtime during Invoke2 // is not a constant of type interop.FunctionResponseMode var ErrInvalidFunctionResponseMode = fmt.Errorf("ErrInvalidFunctionResponseMode") // ErrInvalidInvokeResponseMode is returned when optional InvokeResponseMode header provided in Invoke2 is not a constant of type interop.InvokeResponseMode var ErrInvalidInvokeResponseMode = fmt.Errorf("ErrInvalidInvokeResponseMode") // ErrInvalidMaxPayloadSize is returned when optional MaxPayloadSize header provided in Invoke2 is invalid var ErrInvalidMaxPayloadSize = fmt.Errorf("ErrInvalidMaxPayloadSize") // ErrInvalidResponseBandwidthRate is returned when optional ResponseBandwidthRate header provided in Invoke2 is invalid var ErrInvalidResponseBandwidthRate = fmt.Errorf("ErrInvalidResponseBandwidthRate") // ErrInvalidResponseBandwidthBurstSize is returned when optional ResponseBandwidthBurstSize header provided in Invoke2 is invalid var ErrInvalidResponseBandwidthBurstSize = fmt.Errorf("ErrInvalidResponseBandwidthBurstSize") // ErrMalformedCustomerHeaders is returned when customer headers format is invalid var ErrMalformedCustomerHeaders = fmt.Errorf("ErrMalformedCustomerHeaders") // ErrResponseSent is returned when response with given invokeID was already sent. var ErrResponseSent = fmt.Errorf("ErrResponseSent") // ErrReservationExpired is returned when invoke arrived after InvackDeadline var ErrReservationExpired = fmt.Errorf("ErrReservationExpired") // ErrInternalPlatformError is returned when internal platform error occurred type ErrInternalPlatformError struct{} func (s *ErrInternalPlatformError) Error() string { return "ErrInternalPlatformError" } // ErrTruncatedResponse is returned when response is truncated type ErrTruncatedResponse struct{} func (s *ErrTruncatedResponse) Error() string { return "ErrTruncatedResponse" } // ErrorResponseTooLarge is returned when response Payload exceeds shared memory buffer size type ErrorResponseTooLarge struct { MaxResponseSize int ResponseSize int } // ErrorResponseTooLargeDI is used to reproduce ErrorResponseTooLarge behavior for Direct Invoke mode type ErrorResponseTooLargeDI struct { ErrorResponseTooLarge } // ErrorResponseTooLarge is returned when response provided by Runtime does not fit into shared memory buffer func (s *ErrorResponseTooLarge) Error() string { return fmt.Sprintf("Response payload size (%d bytes) exceeded maximum allowed payload size (%d bytes).", s.ResponseSize, s.MaxResponseSize) } // AsErrorResponse generates ErrorInvokeResponse from ErrorResponseTooLarge func (s *ErrorResponseTooLarge) AsErrorResponse() *ErrorInvokeResponse { functionError := FunctionError{ Type: fatalerror.FunctionOversizedResponse, Message: s.Error(), } jsonPayload, err := json.Marshal(functionError) if err != nil { panic("Failed to marshal interop.FunctionError") } headers := InvokeResponseHeaders{ContentType: "application/json"} return &ErrorInvokeResponse{Headers: headers, FunctionError: functionError, Payload: jsonPayload} } // Server used for sending messages and sharing data between the Runtime API handlers and the // internal platform facing servers. For example, // // responseCtx.SendResponse(...) // // will send the response payload and metadata provided by the runtime to the platform, through the internal // protocol used by the specific implementation // TODO: rename this to InvokeResponseContext, used to send responses from handlers to platform-facing server type Server interface { // GetCurrentInvokeID returns current invokeID. // NOTE, in case of INIT, when invokeID is not known in advance (e.g. provisioned concurrency), // returned invokeID will contain empty value. GetCurrentInvokeID() string // SendRuntimeReady sends a message indicating the runtime has called /invocation/next. // The checkpoint allows us to compute the overhead due to Extensions by substracting it // from the time when all extensions have called /next. // TODO: this method is a lifecycle event used only for metrics, and doesn't belong here SendRuntimeReady() error // SendInitErrorResponse does two separate things when init/error is called: // a) sends the init error response if called during invoke, and // b) notifies platform of a user fault if called, during both init or invoke // TODO: // separate the two concerns & unify with SendErrorResponse in response sender SendInitErrorResponse(response *ErrorInvokeResponse) error } type InternalStateGetter func() statejson.InternalStateDescription // ErrRestoreHookTimeout is returned as a response to `RESTORE` message // when function's restore hook takes more time to execute thatn // the timeout value. var ErrRestoreHookTimeout = errors.New("Runtime.RestoreHookUserTimeout") // ErrRestoreHookUserError is returned as a response to `RESTORE` message // when function's restore hook faces with an error on throws an exception. // UserError contains the error type that the runtime encountered. type ErrRestoreHookUserError struct { UserError FunctionError } func (err ErrRestoreHookUserError) Error() string { return "errRestoreHookUserError" } // ErrRestoreUpdateCredentials is returned as a response to `RESTORE` message // if RAPID cannot update the credentials served by credentials API // during the RESTORE phase. var ErrRestoreUpdateCredentials = errors.New("errRestoreUpdateCredentials") var ErrCannotParseCredentialsExpiry = errors.New("errCannotParseCredentialsExpiry") var ErrCannotParseRestoreHookTimeoutMs = errors.New("errCannotParseRestoreHookTimeoutMs") var ErrMissingRestoreCredentials = errors.New("errMissingRestoreCredentials")