lambda/rapid/handlers.go (790 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // Package rapid implements synchronous even dispatch loop. package rapid import ( "bytes" "context" "errors" "fmt" "os" "path" "strings" "sync" "time" "go.amzn.com/lambda/agents" "go.amzn.com/lambda/appctx" "go.amzn.com/lambda/core" "go.amzn.com/lambda/extensions" "go.amzn.com/lambda/fatalerror" "go.amzn.com/lambda/interop" "go.amzn.com/lambda/metering" "go.amzn.com/lambda/rapi" "go.amzn.com/lambda/rapi/rendering" "go.amzn.com/lambda/rapidcore/env" supvmodel "go.amzn.com/lambda/supervisor/model" "go.amzn.com/lambda/telemetry" "github.com/google/uuid" log "github.com/sirupsen/logrus" ) const ( RuntimeDomain = "runtime" OperatorDomain = "operator" defaultAgentLocation = "/opt/extensions" runtimeProcessName = "runtime" ) const ( // Same value as defined in LambdaSandbox minus 1. maxExtensionNamesLength = 127 standaloneShutdownReason = "spindown" ) var errResetReceived = errors.New("errResetReceived") type processSupervisor struct { supvmodel.ProcessSupervisor RootPath string } type rapidContext struct { interopServer interop.Server server *rapi.Server appCtx appctx.ApplicationContext initDone bool supervisor processSupervisor runtimeDomainGeneration uint32 initFlow core.InitFlowSynchronization invokeFlow core.InvokeFlowSynchronization registrationService core.RegistrationService renderingService *rendering.EventRenderingService telemetryAPIEnabled bool logsSubscriptionAPI telemetry.SubscriptionAPI telemetrySubscriptionAPI telemetry.SubscriptionAPI logsEgressAPI telemetry.StdLogsEgressAPI xray telemetry.Tracer standaloneMode bool eventsAPI interop.EventsAPI initCachingEnabled bool credentialsService core.CredentialsService handlerExecutionMutex sync.Mutex shutdownContext *shutdownContext logStreamName string RuntimeStartedTime int64 RuntimeOverheadStartedTime int64 InvokeResponseMetrics *interop.InvokeResponseMetrics } // Validate interface compliance var _ interop.RapidContext = (*rapidContext)(nil) type invokeMetrics struct { rendererMetrics rendering.InvokeRendererMetrics runtimeReadyTime int64 } func (c *rapidContext) HasActiveExtensions() bool { return extensions.AreEnabled() && c.registrationService.CountAgents() > 0 } func (c *rapidContext) GetExtensionNames() string { var extensionNamesList []string for _, agent := range c.registrationService.AgentsInfo() { extensionNamesList = append(extensionNamesList, agent.Name) } extensionNames := strings.Join(extensionNamesList, ";") if len(extensionNames) > maxExtensionNamesLength { if idx := strings.LastIndex(extensionNames[:maxExtensionNamesLength], ";"); idx != -1 { return extensionNames[:idx] } return "" } return extensionNames } func logAgentsInitStatus(execCtx *rapidContext) { for _, agent := range execCtx.registrationService.AgentsInfo() { extensionInitData := interop.ExtensionInitData{ AgentName: agent.Name, State: agent.State, ErrorType: agent.ErrorType, Subscriptions: agent.Subscriptions, } execCtx.eventsAPI.SendExtensionInit(extensionInitData) } } func agentLaunchError(agent *core.ExternalAgent, appCtx appctx.ApplicationContext, launchError error) { if err := agent.LaunchError(launchError); err != nil { log.Warnf("LaunchError transition fail for %s from %s: %s", agent, agent.GetState().Name(), err) } appctx.StoreFirstFatalError(appCtx, fatalerror.AgentLaunchError) } func doInitExtensions(domain string, agentPaths []string, execCtx *rapidContext, env *env.Environment) error { initFlow := execCtx.registrationService.InitFlow() // we don't bring it into the loop below because we don't want unnecessary broadcasts on agent gate if err := initFlow.SetExternalAgentsRegisterCount(uint16(len(agentPaths))); err != nil { return err } for _, agentPath := range agentPaths { // Using path.Base(agentPath) not agentName because the agent name is contact, as standalone can get the internal state. agent, err := execCtx.registrationService.CreateExternalAgent(path.Base(agentPath)) if err != nil { return err } if execCtx.registrationService.CountAgents() > core.MaxAgentsAllowed { agentLaunchError(agent, execCtx.appCtx, core.ErrTooManyExtensions) return core.ErrTooManyExtensions } env := env.AgentExecEnv() agentStdoutWriter, agentStderrWriter, err := execCtx.logsEgressAPI.GetExtensionSockets() if err != nil { return err } agentName := fmt.Sprintf("extension-%s-%d", path.Base(agentPath), execCtx.runtimeDomainGeneration) err = execCtx.supervisor.Exec(context.Background(), &supvmodel.ExecRequest{ Domain: domain, Name: agentName, Path: agentPath, Env: &env, Logging: supvmodel.Logging{ Managed: supvmodel.ManagedLogging{ Topic: supvmodel.RtExtensionManagedLoggingTopic, Formats: []supvmodel.ManagedLoggingFormat{ supvmodel.LineBasedManagedLogging, }, }, }, StdoutWriter: agentStdoutWriter, StderrWriter: agentStderrWriter, }) if err != nil { agentLaunchError(agent, execCtx.appCtx, err) return err } execCtx.shutdownContext.createExitedChannel(agentName) } if err := initFlow.AwaitExternalAgentsRegistered(); err != nil { return err } return nil } func doRuntimeBootstrap(execCtx *rapidContext, sbInfoFromInit interop.SandboxInfoFromInit) ([]string, map[string]string, string, []*os.File, error) { env := sbInfoFromInit.EnvironmentVariables runtimeBootstrap := sbInfoFromInit.RuntimeBootstrap bootstrapCmd, err := runtimeBootstrap.Cmd() if err != nil { if fatalError, formattedLog, hasError := runtimeBootstrap.CachedFatalError(err); hasError { appctx.StoreFirstFatalError(execCtx.appCtx, fatalError) execCtx.eventsAPI.SendImageErrorLog(interop.ImageErrorLogData(formattedLog)) } else { appctx.StoreFirstFatalError(execCtx.appCtx, fatalerror.InvalidEntrypoint) } return []string{}, map[string]string{}, "", []*os.File{}, err } bootstrapEnv := runtimeBootstrap.Env(env) bootstrapCwd, err := runtimeBootstrap.Cwd() if err != nil { if fatalError, formattedLog, hasError := runtimeBootstrap.CachedFatalError(err); hasError { appctx.StoreFirstFatalError(execCtx.appCtx, fatalError) execCtx.eventsAPI.SendImageErrorLog(interop.ImageErrorLogData(formattedLog)) } else { appctx.StoreFirstFatalError(execCtx.appCtx, fatalerror.InvalidWorkingDir) } return []string{}, map[string]string{}, "", []*os.File{}, err } bootstrapExtraFiles := runtimeBootstrap.ExtraFiles() return bootstrapCmd, bootstrapEnv, bootstrapCwd, bootstrapExtraFiles, nil } func (c *rapidContext) watchEvents(events <-chan supvmodel.Event) { for event := range events { var err error log.Debugf("The events handler received the event %+v.", event) if loss := event.Event.EventLoss(); loss != nil { log.Panicf("Lost %d events from supervisor", *loss) } termination := event.Event.ProcessTerminated() // If we are not shutting down then we care if an unexpected exit happens. if !c.shutdownContext.isShuttingDown() { runtimeProcessName := fmt.Sprintf("%s-%d", runtimeProcessName, c.runtimeDomainGeneration) // If event from the runtime. if *termination.Name == runtimeProcessName { if termination.Success() { err = fmt.Errorf("Runtime exited without providing a reason") } else { err = fmt.Errorf("Runtime exited with error: %s", termination.String()) } appctx.StoreFirstFatalError(c.appCtx, fatalerror.RuntimeExit) } else { if termination.Success() { err = fmt.Errorf("exit code 0") } else { err = fmt.Errorf("%s", termination.String()) } appctx.StoreFirstFatalError(c.appCtx, fatalerror.AgentCrash) } log.Warnf("Process %s exited: %+v", *termination.Name, termination) } // At the moment we only get termination events. // When their are other event types then we would need to be selective, // about what we send to handleShutdownEvent(). c.shutdownContext.handleProcessExit(*termination) c.registrationService.CancelFlows(err) } } // subscribe to /events for runtime domain in supervisor func setupEventsWatcher(execCtx *rapidContext) error { eventsRequest := supvmodel.EventsRequest{ Domain: RuntimeDomain, } events, err := execCtx.supervisor.Events(context.Background(), &eventsRequest) if err != nil { log.Errorf("Could not get events stream from supervisor: %s", err) return err } go execCtx.watchEvents(events) return nil } func doRuntimeDomainInit(execCtx *rapidContext, sbInfoFromInit interop.SandboxInfoFromInit, phase interop.LifecyclePhase) error { initStartTime := metering.Monotime() sendInitStartLogEvent(execCtx, sbInfoFromInit.SandboxType, phase) defer sendInitReportLogEvent(execCtx, sbInfoFromInit.SandboxType, initStartTime, phase) execCtx.xray.RecordInitStartTime() defer execCtx.xray.RecordInitEndTime() defer func() { if extensions.AreEnabled() { logAgentsInitStatus(execCtx) } }() execCtx.runtimeDomainGeneration++ if extensions.AreEnabled() { runtimeExtensions := agents.ListExternalAgentPaths(defaultAgentLocation, execCtx.supervisor.RootPath) if err := doInitExtensions(RuntimeDomain, runtimeExtensions, execCtx, sbInfoFromInit.EnvironmentVariables); err != nil { return err } } appctx.StoreSandboxType(execCtx.appCtx, sbInfoFromInit.SandboxType) initFlow := execCtx.registrationService.InitFlow() // Runtime state machine runtime := core.NewRuntime(initFlow, execCtx.invokeFlow) // Registration service keeps track of parties registered in the system and events they are registered for. // Runtime's use case is generalized, because runtime doesn't register itself, we preregister it in the system; // runtime is implicitly subscribed for certain lifecycle events. log.Debug("Preregister runtime") registrationService := execCtx.registrationService err := registrationService.PreregisterRuntime(runtime) if err != nil { return err } bootstrapCmd, bootstrapEnv, bootstrapCwd, bootstrapExtraFiles, err := doRuntimeBootstrap(execCtx, sbInfoFromInit) if err != nil { return err } runtimeStdoutWriter, runtimeStderrWriter, err := execCtx.logsEgressAPI.GetRuntimeSockets() if err != nil { return err } log.Debug("Start runtime") checkCredentials(execCtx, bootstrapEnv) name := fmt.Sprintf("%s-%d", runtimeProcessName, execCtx.runtimeDomainGeneration) err = execCtx.supervisor.Exec(context.Background(), &supvmodel.ExecRequest{ Domain: RuntimeDomain, Name: name, Cwd: &bootstrapCwd, Path: bootstrapCmd[0], Args: bootstrapCmd[1:], Env: &bootstrapEnv, Logging: supvmodel.Logging{ Managed: supvmodel.ManagedLogging{ Topic: supvmodel.RuntimeManagedLoggingTopic, Formats: []supvmodel.ManagedLoggingFormat{ supvmodel.LineBasedManagedLogging, supvmodel.MessageBasedManagedLogging, }, }, }, StdoutWriter: runtimeStdoutWriter, StderrWriter: runtimeStderrWriter, ExtraFiles: &bootstrapExtraFiles, }) runtimeDoneStatus := telemetry.RuntimeDoneSuccess defer func() { sendInitRuntimeDoneLogEvent(execCtx, sbInfoFromInit.SandboxType, runtimeDoneStatus, phase) }() if err != nil { if fatalError, formattedLog, hasError := sbInfoFromInit.RuntimeBootstrap.CachedFatalError(err); hasError { appctx.StoreFirstFatalError(execCtx.appCtx, fatalError) execCtx.eventsAPI.SendImageErrorLog(interop.ImageErrorLogData(formattedLog)) } else { appctx.StoreFirstFatalError(execCtx.appCtx, fatalerror.InvalidEntrypoint) } runtimeDoneStatus = telemetry.RuntimeDoneError return err } execCtx.shutdownContext.createExitedChannel(name) if err := initFlow.AwaitRuntimeRestoreReady(); err != nil { runtimeDoneStatus = telemetry.RuntimeDoneError return err } runtimeDoneStatus = telemetry.RuntimeDoneSuccess // Registration phase finished for agents - no more agents can be registered with the system registrationService.TurnOff() if extensions.AreEnabled() { // Initialize and activate the gate with the number of agent we wait to return ready if err := initFlow.SetAgentsReadyCount(registrationService.GetRegisteredAgentsSize()); err != nil { return err } if err := initFlow.AwaitAgentsReady(); err != nil { runtimeDoneStatus = telemetry.RuntimeDoneError return err } } // Logs API subscription phase finished for agents - no more agents can be subscribed to the Logs API if execCtx.telemetryAPIEnabled { execCtx.logsSubscriptionAPI.TurnOff() execCtx.telemetrySubscriptionAPI.TurnOff() } execCtx.initDone = true return nil } func doInvoke(execCtx *rapidContext, invokeRequest *interop.Invoke, mx *invokeMetrics, sbInfoFromInit interop.SandboxInfoFromInit, requestBuffer *bytes.Buffer) error { execCtx.eventsAPI.SetCurrentRequestID(interop.RequestID(invokeRequest.ID)) appCtx := execCtx.appCtx xray := execCtx.xray xray.Configure(invokeRequest) ctx := context.Background() return xray.CaptureInvokeSegment(ctx, xray.WithErrorCause(ctx, appCtx, func(ctx context.Context) error { telemetryTracingCtx := xray.BuildTracingCtxForStart() if !execCtx.initDone { // do inline init if err := xray.CaptureInitSubsegment(ctx, func(ctx context.Context) error { return doRuntimeDomainInit(execCtx, sbInfoFromInit, interop.LifecyclePhaseInvoke) }); err != nil { sendInvokeStartLogEvent(execCtx, invokeRequest.ID, telemetryTracingCtx) return err } } else if sbInfoFromInit.SandboxType != interop.SandboxPreWarmed && !execCtx.initCachingEnabled { xray.SendInitSubsegmentWithRecordedTimesOnce(ctx) } xray.SendRestoreSubsegmentWithRecordedTimesOnce(ctx) sendInvokeStartLogEvent(execCtx, invokeRequest.ID, telemetryTracingCtx) invokeFlow := execCtx.invokeFlow log.Debug("Initialize invoke flow barriers") err := invokeFlow.InitializeBarriers() if err != nil { return err } registrationService := execCtx.registrationService runtime := registrationService.GetRuntime() var intAgents []*core.InternalAgent var extAgents []*core.ExternalAgent if extensions.AreEnabled() { intAgents = registrationService.GetSubscribedInternalAgents(core.InvokeEvent) extAgents = registrationService.GetSubscribedExternalAgents(core.InvokeEvent) if err := invokeFlow.SetAgentsReadyCount(uint16(len(intAgents) + len(extAgents))); err != nil { return err } } // Invoke if err := xray.CaptureInvokeSubsegment(ctx, xray.WithError(ctx, appCtx, func(ctx context.Context) error { log.Debug("Set renderer for invoke") renderer := rendering.NewInvokeRenderer(ctx, invokeRequest, requestBuffer, xray.BuildTracingHeader()) defer func() { mx.rendererMetrics = renderer.GetMetrics() }() execCtx.renderingService.SetRenderer(renderer) if extensions.AreEnabled() { log.Debug("Release agents conditions") for _, agent := range extAgents { //TODO handle Supervisors listening channel agent.Release() } for _, agent := range intAgents { //TODO handle Supervisors listening channel agent.Release() } } log.Debug("Release runtime condition") //TODO handle Supervisors listening channel execCtx.SetRuntimeStartedTime(metering.Monotime()) runtime.Release() log.Debug("Await runtime response") //TODO handle Supervisors listening channel return invokeFlow.AwaitRuntimeResponse() })); err != nil { return err } // Runtime overhead if err := xray.CaptureOverheadSubsegment(ctx, func(ctx context.Context) error { log.Debug("Await runtime ready") execCtx.SetRuntimeOverheadStartedTime(metering.Monotime()) //TODO handle Supervisors listening channel return invokeFlow.AwaitRuntimeReady() }); err != nil { return err } mx.runtimeReadyTime = metering.Monotime() runtimeDoneEventData := interop.InvokeRuntimeDoneData{ Status: telemetry.RuntimeDoneSuccess, Metrics: telemetry.GetRuntimeDoneInvokeMetrics(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics, mx.runtimeReadyTime), InternalMetrics: invokeRequest.InvokeResponseMetrics, Tracing: xray.BuildTracingCtxAfterInvokeComplete(), Spans: execCtx.eventsAPI.GetRuntimeDoneSpans(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics, execCtx.RuntimeOverheadStartedTime, mx.runtimeReadyTime), } log.Info(runtimeDoneEventData.String()) if err := execCtx.eventsAPI.SendInvokeRuntimeDone(runtimeDoneEventData); err != nil { log.Errorf("Failed to send INVOKE RTDONE: %s", err) } // Extensions overhead if execCtx.HasActiveExtensions() { extensionOverheadStartTime := metering.Monotime() execCtx.interopServer.SendRuntimeReady() log.Debug("Await agents ready") //TODO handle Supervisors listening channel if err := invokeFlow.AwaitAgentsReady(); err != nil { log.Warnf("AwaitAgentsReady() = %s", err) return err } extensionOverheadEndTime := metering.Monotime() extensionOverheadMsSpan := interop.Span{ Name: "extensionOverhead", Start: telemetry.GetEpochTimeInISO8601FormatFromMonotime(extensionOverheadStartTime), DurationMs: telemetry.CalculateDuration(extensionOverheadStartTime, extensionOverheadEndTime), } if err := execCtx.eventsAPI.SendReportSpan(extensionOverheadMsSpan); err != nil { log.WithError(err).Error("Failed to create REPORT Span") } } return nil })) } // acceptInitRequest is a second initialization phase, performed after receiving START // initialized entities: _HANDLER, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN func (c *rapidContext) acceptInitRequest(initRequest *interop.Init) *interop.Init { initRequest.EnvironmentVariables.StoreEnvironmentVariablesFromInit( initRequest.CustomerEnvironmentVariables, initRequest.Handler, initRequest.AwsKey, initRequest.AwsSecret, initRequest.AwsSession, initRequest.FunctionName, initRequest.FunctionVersion) c.registrationService.SetFunctionMetadata(core.FunctionMetadata{ AccountID: initRequest.AccountID, FunctionName: initRequest.FunctionName, FunctionVersion: initRequest.FunctionVersion, InstanceMaxMemory: initRequest.InstanceMaxMemory, Handler: initRequest.Handler, RuntimeInfo: initRequest.RuntimeInfo, }) c.SetLogStreamName(initRequest.LogStreamName) return initRequest } func (c *rapidContext) acceptInitRequestForInitCaching(initRequest *interop.Init) (*interop.Init, error) { log.Info("Configure environment for Init Caching.") randomUUID, err := uuid.NewRandom() if err != nil { return initRequest, err } initCachingToken := randomUUID.String() initRequest.EnvironmentVariables.StoreEnvironmentVariablesFromInitForInitCaching( c.server.Host(), c.server.Port(), initRequest.CustomerEnvironmentVariables, initRequest.Handler, initRequest.FunctionName, initRequest.FunctionVersion, initCachingToken) c.registrationService.SetFunctionMetadata(core.FunctionMetadata{ AccountID: initRequest.AccountID, FunctionName: initRequest.FunctionName, FunctionVersion: initRequest.FunctionVersion, InstanceMaxMemory: initRequest.InstanceMaxMemory, Handler: initRequest.Handler, RuntimeInfo: initRequest.RuntimeInfo, }) c.SetLogStreamName(initRequest.LogStreamName) c.credentialsService.SetCredentials(initCachingToken, initRequest.AwsKey, initRequest.AwsSecret, initRequest.AwsSession, initRequest.CredentialsExpiry) return initRequest, nil } func handleInit(execCtx *rapidContext, initRequest *interop.Init, initSuccessResponse chan<- interop.InitSuccess, initFailureResponse chan<- interop.InitFailure) { if execCtx.initCachingEnabled { var err error if initRequest, err = execCtx.acceptInitRequestForInitCaching(initRequest); err != nil { // TODO: call handleInitError only after sending the RUNNING, since // Slicer will fail receiving DONEFAIL here as it is expecting RUNNING handleInitError(execCtx, initRequest.InvokeID, err, initFailureResponse) return } } else { initRequest = execCtx.acceptInitRequest(initRequest) } if err := setupEventsWatcher(execCtx); err != nil { handleInitError(execCtx, initRequest.InvokeID, err, initFailureResponse) return } if !initRequest.SuppressInit { // doRuntimeDomainInit() is used in both init/invoke, so the signature requires sbInfo arg sbInfo := interop.SandboxInfoFromInit{ EnvironmentVariables: initRequest.EnvironmentVariables, SandboxType: initRequest.SandboxType, RuntimeBootstrap: initRequest.Bootstrap, } if err := doRuntimeDomainInit(execCtx, sbInfo, interop.LifecyclePhaseInit); err != nil { handleInitError(execCtx, initRequest.InvokeID, err, initFailureResponse) return } } initSuccessMsg := interop.InitSuccess{ RuntimeRelease: appctx.GetRuntimeRelease(execCtx.appCtx), NumActiveExtensions: execCtx.registrationService.CountAgents(), ExtensionNames: execCtx.GetExtensionNames(), Ack: make(chan struct{}), } if execCtx.telemetryAPIEnabled { initSuccessMsg.LogsAPIMetrics = interop.MergeSubscriptionMetrics(execCtx.logsSubscriptionAPI.FlushMetrics(), execCtx.telemetrySubscriptionAPI.FlushMetrics()) } initSuccessResponse <- initSuccessMsg <-initSuccessMsg.Ack } func handleInvoke(execCtx *rapidContext, invokeRequest *interop.Invoke, sbInfoFromInit interop.SandboxInfoFromInit, requestBuffer *bytes.Buffer, responseSender interop.InvokeResponseSender) (interop.InvokeSuccess, *interop.InvokeFailure) { appctx.StoreResponseSender(execCtx.appCtx, responseSender) invokeMx := invokeMetrics{} if err := doInvoke(execCtx, invokeRequest, &invokeMx, sbInfoFromInit, requestBuffer); err != nil { log.WithError(err).WithField("InvokeID", invokeRequest.ID).Error("Invoke failed") invokeFailure := handleInvokeError(execCtx, invokeRequest, &invokeMx, err) invokeFailure.InvokeResponseMode = invokeRequest.InvokeResponseMode if invokeRequest.InvokeResponseMetrics != nil && interop.IsResponseStreamingMetrics(invokeRequest.InvokeResponseMetrics) { invokeFailure.ResponseMetrics = interop.ResponseMetrics{ RuntimeResponseLatencyMs: telemetry.CalculateDuration(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics.StartReadingResponseMonoTimeMs), RuntimeTimeThrottledMs: invokeRequest.InvokeResponseMetrics.TimeShapedNs / int64(time.Millisecond), RuntimeProducedBytes: invokeRequest.InvokeResponseMetrics.ProducedBytes, RuntimeOutboundThroughputBps: invokeRequest.InvokeResponseMetrics.OutboundThroughputBps, } } return interop.InvokeSuccess{}, invokeFailure } var invokeCompletionTimeNs int64 if responseTimeNs := execCtx.registrationService.GetRuntime().GetRuntimeDescription().State.ResponseTimeNs; responseTimeNs != 0 { invokeCompletionTimeNs = time.Now().UnixNano() - responseTimeNs } invokeSuccessMsg := interop.InvokeSuccess{ RuntimeRelease: appctx.GetRuntimeRelease(execCtx.appCtx), NumActiveExtensions: execCtx.registrationService.CountAgents(), ExtensionNames: execCtx.GetExtensionNames(), InvokeMetrics: interop.InvokeMetrics{ InvokeRequestReadTimeNs: invokeMx.rendererMetrics.ReadTime.Nanoseconds(), InvokeRequestSizeBytes: int64(invokeMx.rendererMetrics.SizeBytes), RuntimeReadyTime: invokeMx.runtimeReadyTime, }, InvokeCompletionTimeNs: invokeCompletionTimeNs, InvokeReceivedTime: invokeRequest.InvokeReceivedTime, InvokeResponseMode: invokeRequest.InvokeResponseMode, } if invokeRequest.InvokeResponseMetrics != nil && interop.IsResponseStreamingMetrics(invokeRequest.InvokeResponseMetrics) { invokeSuccessMsg.ResponseMetrics = interop.ResponseMetrics{ RuntimeResponseLatencyMs: telemetry.CalculateDuration(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics.StartReadingResponseMonoTimeMs), RuntimeTimeThrottledMs: invokeRequest.InvokeResponseMetrics.TimeShapedNs / int64(time.Millisecond), RuntimeProducedBytes: invokeRequest.InvokeResponseMetrics.ProducedBytes, RuntimeOutboundThroughputBps: invokeRequest.InvokeResponseMetrics.OutboundThroughputBps, } } if execCtx.telemetryAPIEnabled { invokeSuccessMsg.LogsAPIMetrics = interop.MergeSubscriptionMetrics(execCtx.logsSubscriptionAPI.FlushMetrics(), execCtx.telemetrySubscriptionAPI.FlushMetrics()) } return invokeSuccessMsg, nil } func reinitialize(execCtx *rapidContext) { execCtx.appCtx.Delete(appctx.AppCtxInvokeErrorTraceDataKey) execCtx.appCtx.Delete(appctx.AppCtxRuntimeReleaseKey) execCtx.appCtx.Delete(appctx.AppCtxFirstFatalErrorKey) execCtx.renderingService.SetRenderer(nil) execCtx.initDone = false execCtx.registrationService.Clear() execCtx.initFlow.Clear() execCtx.invokeFlow.Clear() if execCtx.telemetryAPIEnabled { execCtx.logsSubscriptionAPI.Clear() execCtx.telemetrySubscriptionAPI.Clear() } } // handle notification of reset func handleReset(execCtx *rapidContext, resetEvent *interop.Reset, runtimeStartedTime int64, invokeResponseMetrics *interop.InvokeResponseMetrics) (interop.ResetSuccess, *interop.ResetFailure) { log.Warnf("Reset initiated: %s", resetEvent.Reason) // Only send RuntimeDone event if we get a reset during an Invoke if resetEvent.Reason == "failure" || resetEvent.Reason == "timeout" { var errorType *string if resetEvent.Reason == "failure" { firstFatalError, found := appctx.LoadFirstFatalError(execCtx.appCtx) if !found { firstFatalError = fatalerror.SandboxFailure } stringifiedError := string(firstFatalError) errorType = &stringifiedError } var status string if resetEvent.Reason == "timeout" { status = "timeout" } else if strings.HasPrefix(*errorType, "Sandbox.") { status = "failure" } else { status = "error" } var runtimeReadyTime int64 = metering.Monotime() runtimeDoneEventData := interop.InvokeRuntimeDoneData{ Status: status, InternalMetrics: invokeResponseMetrics, Metrics: telemetry.GetRuntimeDoneInvokeMetrics(runtimeStartedTime, invokeResponseMetrics, runtimeReadyTime), Tracing: execCtx.xray.BuildTracingCtxAfterInvokeComplete(), Spans: execCtx.eventsAPI.GetRuntimeDoneSpans(runtimeStartedTime, invokeResponseMetrics, execCtx.RuntimeOverheadStartedTime, runtimeReadyTime), ErrorType: errorType, } if err := execCtx.eventsAPI.SendInvokeRuntimeDone(runtimeDoneEventData); err != nil { log.Errorf("Failed to send INVOKE RTDONE: %s", err) } } extensionsResetMs, resetTimeout, _ := execCtx.shutdownContext.shutdown(execCtx, resetEvent.DeadlineNs, resetEvent.Reason) execCtx.runtimeDomainGeneration++ // Only used by standalone for more indepth assertions. var fatalErrorType fatalerror.ErrorType if execCtx.standaloneMode { fatalErrorType, _ = appctx.LoadFirstFatalError(execCtx.appCtx) } // TODO: move interop.ResponseMetrics{} to a factory method and initialize it there. // Initialization is very similar in handleInvoke's invokeFailure.ResponseMetrics and // invokeSuccessMsg.ResponseMetrics var responseMetrics interop.ResponseMetrics if resetEvent.InvokeResponseMetrics != nil && interop.IsResponseStreamingMetrics(resetEvent.InvokeResponseMetrics) { responseMetrics.RuntimeResponseLatencyMs = telemetry.CalculateDuration(execCtx.RuntimeStartedTime, resetEvent.InvokeResponseMetrics.StartReadingResponseMonoTimeMs) responseMetrics.RuntimeTimeThrottledMs = resetEvent.InvokeResponseMetrics.TimeShapedNs / int64(time.Millisecond) responseMetrics.RuntimeProducedBytes = resetEvent.InvokeResponseMetrics.ProducedBytes responseMetrics.RuntimeOutboundThroughputBps = resetEvent.InvokeResponseMetrics.OutboundThroughputBps } if resetTimeout { return interop.ResetSuccess{}, &interop.ResetFailure{ ExtensionsResetMs: extensionsResetMs, ErrorType: fatalErrorType, ResponseMetrics: responseMetrics, InvokeResponseMode: resetEvent.InvokeResponseMode, } } return interop.ResetSuccess{ ExtensionsResetMs: extensionsResetMs, ErrorType: fatalErrorType, ResponseMetrics: responseMetrics, InvokeResponseMode: resetEvent.InvokeResponseMode, }, nil } // handle notification of shutdown func handleShutdown(execCtx *rapidContext, shutdownEvent *interop.Shutdown, reason string) interop.ShutdownSuccess { log.Warnf("Shutdown initiated: %s", reason) // TODO Handle shutdown error _, _, _ = execCtx.shutdownContext.shutdown(execCtx, shutdownEvent.DeadlineNs, reason) // Only used by standalone for more indepth assertions. var fatalErrorType fatalerror.ErrorType if execCtx.standaloneMode { fatalErrorType, _ = appctx.LoadFirstFatalError(execCtx.appCtx) } return interop.ShutdownSuccess{ErrorType: fatalErrorType} } func handleRestore(execCtx *rapidContext, restore *interop.Restore) (interop.RestoreResult, error) { err := execCtx.credentialsService.UpdateCredentials(restore.AwsKey, restore.AwsSecret, restore.AwsSession, restore.CredentialsExpiry) restoreStatus := telemetry.RuntimeDoneSuccess restoreResult := interop.RestoreResult{} defer func() { sendRestoreRuntimeDoneLogEvent(execCtx, restoreStatus) }() if err != nil { log.Infof("error when updating credentials: %s", err) return restoreResult, interop.ErrRestoreUpdateCredentials } renderer := rendering.NewRestoreRenderer() execCtx.renderingService.SetRenderer(renderer) registrationService := execCtx.registrationService runtime := registrationService.GetRuntime() execCtx.SetLogStreamName(restore.LogStreamName) // If runtime has not called /restore/next then just return // instead of releasing the Runtime since there is no need to release. // Then the runtime should be released only during Invoke if runtime.GetState() != runtime.RuntimeRestoreReadyState { restoreStatus = telemetry.RuntimeDoneSuccess log.Infof("Runtime is in state: %s just returning", runtime.GetState().Name()) return restoreResult, nil } deadlineNs := time.Now().Add(time.Duration(restore.RestoreHookTimeoutMs) * time.Millisecond).UnixNano() ctx, ctxCancel := context.WithDeadline(context.Background(), time.Unix(0, deadlineNs)) defer ctxCancel() startTime := metering.Monotime() runtime.Release() initFlow := execCtx.initFlow err = initFlow.AwaitRuntimeReadyWithDeadline(ctx) fatalErrorType, fatalErrorFound := appctx.LoadFirstFatalError(execCtx.appCtx) // If there is an error occured when waiting runtime to complete the restore hook execution, // check if there is any error stored in appctx to get the root cause error type // Runtime.ExitError is an example to such a scenario if fatalErrorFound { err = fmt.Errorf("%s", string(fatalErrorType)) } if err != nil { restoreStatus = telemetry.RuntimeDoneError } endTime := metering.Monotime() restoreDuration := time.Duration(endTime - startTime) restoreResult.RestoreMs = restoreDuration.Milliseconds() return restoreResult, err } func startRuntimeAPI(ctx context.Context, execCtx *rapidContext) { // Start Runtime API Server err := execCtx.server.Listen() if err != nil { log.WithError(err).Panic("Runtime API Server failed to listen") } execCtx.server.Serve(ctx) // blocking until server exits // Note, most of initialization code should run before blocking to receive START, // code before START runs in parallel with code downloads. } func getFirstFatalError(execCtx *rapidContext, status string) *string { if status == telemetry.RuntimeDoneSuccess { return nil } firstFatalError, found := appctx.LoadFirstFatalError(execCtx.appCtx) if !found { // We will set errorType to "Runtime.Unknown" in case of INIT timeout and RESTORE timeout // This is a trade-off we are willing to make. We will improve this later firstFatalError = fatalerror.RuntimeUnknown } stringifiedError := string(firstFatalError) return &stringifiedError } func sendRestoreRuntimeDoneLogEvent(execCtx *rapidContext, status string) { firstFatalError := getFirstFatalError(execCtx, status) restoreRuntimeDoneData := interop.RestoreRuntimeDoneData{ Status: status, ErrorType: firstFatalError, } if err := execCtx.eventsAPI.SendRestoreRuntimeDone(restoreRuntimeDoneData); err != nil { log.Errorf("Failed to send RESTORE RTDONE: %s", err) } } func sendInitStartLogEvent(execCtx *rapidContext, sandboxType interop.SandboxType, phase interop.LifecyclePhase) { initPhase, err := telemetry.InitPhaseFromLifecyclePhase(phase) if err != nil { log.Errorf("failed to convert lifecycle phase into init phase: %s", err) return } functionMetadata := execCtx.registrationService.GetFunctionMetadata() initStartData := interop.InitStartData{ InitializationType: telemetry.InferInitType(execCtx.initCachingEnabled, sandboxType), RuntimeVersion: functionMetadata.RuntimeInfo.Version, RuntimeVersionArn: functionMetadata.RuntimeInfo.Arn, FunctionName: functionMetadata.FunctionName, FunctionVersion: functionMetadata.FunctionVersion, // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/faas.md // we're sending the logStream as the instance id InstanceID: execCtx.logStreamName, InstanceMaxMemory: functionMetadata.InstanceMaxMemory, Phase: initPhase, } log.Info(initStartData.String()) if err := execCtx.eventsAPI.SendInitStart(initStartData); err != nil { log.Errorf("Failed to send INIT START: %s", err) } } func sendInitRuntimeDoneLogEvent(execCtx *rapidContext, sandboxType interop.SandboxType, status string, phase interop.LifecyclePhase) { initPhase, err := telemetry.InitPhaseFromLifecyclePhase(phase) if err != nil { log.Errorf("failed to convert lifecycle phase into init phase: %s", err) return } firstFatalError := getFirstFatalError(execCtx, status) initRuntimeDoneData := interop.InitRuntimeDoneData{ InitializationType: telemetry.InferInitType(execCtx.initCachingEnabled, sandboxType), Status: status, Phase: initPhase, ErrorType: firstFatalError, } log.Info(initRuntimeDoneData.String()) if err := execCtx.eventsAPI.SendInitRuntimeDone(initRuntimeDoneData); err != nil { log.Errorf("Failed to send INIT RTDONE: %s", err) } } func sendInitReportLogEvent( execCtx *rapidContext, sandboxType interop.SandboxType, initStartMonotime int64, phase interop.LifecyclePhase, ) { initPhase, err := telemetry.InitPhaseFromLifecyclePhase(phase) if err != nil { log.Errorf("failed to convert lifecycle phase into init phase: %s", err) return } initReportData := interop.InitReportData{ InitializationType: telemetry.InferInitType(execCtx.initCachingEnabled, sandboxType), Metrics: interop.InitReportMetrics{ DurationMs: telemetry.CalculateDuration(initStartMonotime, metering.Monotime()), }, Phase: initPhase, } log.Info(initReportData.String()) if err = execCtx.eventsAPI.SendInitReport(initReportData); err != nil { log.Errorf("Failed to send INIT REPORT: %s", err) } } func sendInvokeStartLogEvent(execCtx *rapidContext, invokeRequestID string, tracingCtx *interop.TracingCtx) { invokeStartData := interop.InvokeStartData{ RequestID: invokeRequestID, Version: execCtx.registrationService.GetFunctionMetadata().FunctionVersion, Tracing: tracingCtx, } log.Info(invokeStartData.String()) if err := execCtx.eventsAPI.SendInvokeStart(invokeStartData); err != nil { log.Errorf("Failed to send INVOKE START: %s", err) } } // This function will log a line if AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, or AWS_SESSION_TOKEN is missing // This is expected to happen in cases when credentials provider is not needed func checkCredentials(execCtx *rapidContext, bootstrapEnv map[string]string) { credentialsKeys := []string{"AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN"} missingCreds := []string{} for _, credEnvVar := range credentialsKeys { if val, keyExists := bootstrapEnv[credEnvVar]; !keyExists || val == "" { missingCreds = append(missingCreds, credEnvVar) } } if len(missingCreds) > 0 { log.Infof("Starting runtime without %s , Expected?: %t", strings.Join(missingCreds[:], ", "), execCtx.initCachingEnabled) } }