lambda/rapid/shutdown.go (250 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // Package rapid implements synchronous even dispatch loop. package rapid import ( "context" "errors" "fmt" "sync" "time" "go.amzn.com/lambda/appctx" "go.amzn.com/lambda/core" "go.amzn.com/lambda/metering" "go.amzn.com/lambda/rapi/model" "go.amzn.com/lambda/rapi/rendering" supvmodel "go.amzn.com/lambda/supervisor/model" log "github.com/sirupsen/logrus" ) const ( // supervisor shutdown and kill operations block until the exit status of the // interested process has been collected, or until the specified deadline expires // Note that this deadline is mainly relevant when any of the domain // processes are in uninterruptible sleep state (notable examples: syscall // to read/write a networked driver) // // We set a non nil value for these timeouts so that RAPID doesn't block // forever in one of the cases above. supervisorBlockingMaxMillis = 9000 runtimeDeadlineShare = 0.3 maxProcessExitWait = 2 * time.Second ) // TODO: aggregate struct's methods into an interface, so that we can mock in tests type shutdownContext struct { // Adding a mutex around shuttingDown because there may be concurrent reads/writes. // Because the code in shutdown() and the seperate go routine created in setupEventsWatcher() // could be concurrently accessing the field shuttingDown. shuttingDownMutex sync.Mutex shuttingDown bool agentsAwaitingExit map[string]*core.ExternalAgent // Adding a mutex around runtimeDomainExited because there may be concurrent reads/writes. // The first reason this can be caused is by different go routines reading/writing different keys. // The second reason this can be caused is between the code shutting down the runtime/extensions and // handleProcessExit in a separate go routine, reading and writing to the same key. Caused by // unexpected exits. runtimeDomainExitedMutex sync.Mutex // used to synchronize on processes exits. We create the channel when a // process is started and we close it upon exit notification from // supervisor. Closing the channel is basically a persistent broadcast of process exit. // We never write anything to the channels runtimeDomainExited map[string]chan struct{} } func newShutdownContext() *shutdownContext { return &shutdownContext{ shuttingDownMutex: sync.Mutex{}, shuttingDown: false, agentsAwaitingExit: make(map[string]*core.ExternalAgent), runtimeDomainExited: make(map[string]chan struct{}), runtimeDomainExitedMutex: sync.Mutex{}, } } func (s *shutdownContext) isShuttingDown() bool { s.shuttingDownMutex.Lock() defer s.shuttingDownMutex.Unlock() return s.shuttingDown } func (s *shutdownContext) setShuttingDown(value bool) { s.shuttingDownMutex.Lock() defer s.shuttingDownMutex.Unlock() s.shuttingDown = value } func (s *shutdownContext) handleProcessExit(termination supvmodel.ProcessTermination) { name := *termination.Name agent, found := s.agentsAwaitingExit[name] // If it is an agent registered to receive a shutdown event. if found { log.Debugf("Handling termination for %s", name) exitStatus := termination.Exited() if exitStatus != nil && *exitStatus == 0 { // If the agent exited by itself after receiving the shutdown event. stateErr := agent.Exited() if stateErr != nil { log.Warnf("%s failed to transition to EXITED: %s (current state: %s)", agent.String(), stateErr, agent.GetState().Name()) } } else { // If the agent did not exit by itself, had to be SIGKILLed (only in standalone mode). stateErr := agent.ShutdownFailed() if stateErr != nil { log.Warnf("%s failed to transition to ShutdownFailed: %s (current state: %s)", agent, stateErr, agent.GetState().Name()) } } } exitedChannel, found := s.getExitedChannel(name) if !found { log.Panicf("Unable to find an exitedChannel for '%s', it should have been created just after it was execed.", name) } // we close the channel so that whoever is blocked on it // or will try to block on it in the future unblocks immediately close(exitedChannel) } func (s *shutdownContext) getExitedChannel(name string) (chan struct{}, bool) { s.runtimeDomainExitedMutex.Lock() defer s.runtimeDomainExitedMutex.Unlock() exitedChannel, found := s.runtimeDomainExited[name] return exitedChannel, found } func (s *shutdownContext) createExitedChannel(name string) { s.runtimeDomainExitedMutex.Lock() defer s.runtimeDomainExitedMutex.Unlock() _, found := s.runtimeDomainExited[name] if found { log.Panicf("Tried to create an exited channel for '%s' but one already exists.", name) } s.runtimeDomainExited[name] = make(chan struct{}) } // Blocks until all the processes in the runtime domain generation have exited. // This helps us have a nice sync point on Shutdown where we know for sure that // all the processes have exited and the state has been cleared. The exception // to that rule is that if any of the processes don't exit within // maxProcessExitWait from the beginning of the waiting period, an error is // returned, in order to prevent it from waiting forever if any of the processes // cannot be killed. // // It is OK not to hold the lock because we know that this is called only during // shutdown and nobody will start a new process during shutdown func (s *shutdownContext) clearExitedChannel() error { s.runtimeDomainExitedMutex.Lock() mapLen := len(s.runtimeDomainExited) channels := make([]chan struct{}, 0, mapLen) for _, v := range s.runtimeDomainExited { channels = append(channels, v) } s.runtimeDomainExitedMutex.Unlock() exitTimeout := time.After(maxProcessExitWait) for _, v := range channels { select { case <-v: case <-exitTimeout: return errors.New("timed out waiting for runtime processes to exit") } } s.runtimeDomainExitedMutex.Lock() s.runtimeDomainExited = make(map[string]chan struct{}, mapLen) s.runtimeDomainExitedMutex.Unlock() return nil } func (s *shutdownContext) shutdownRuntime(execCtx *rapidContext, start time.Time, deadline time.Time) { // If runtime is started: // 1. SIGTERM and wait until deadline // 2. SIGKILL on deadline log.Debug("Shutting down the runtime.") name := fmt.Sprintf("%s-%d", runtimeProcessName, execCtx.runtimeDomainGeneration) exitedChannel, found := s.getExitedChannel(name) if found { err := execCtx.supervisor.Terminate(context.Background(), &supvmodel.TerminateRequest{ Domain: RuntimeDomain, Name: name, }) if err != nil { // We are not reporting the error upstream because we will anyway // shut the domain out at the end of the shutdown sequence log.WithError(err).Warn("Failed sending Termination signal to runtime") } ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() select { case <-ctx.Done(): log.Warnf("Deadline: The runtime did not exit after deadline %s; Killing it.", deadline) err = execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{ Domain: RuntimeDomain, Name: name, Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis), }) if err != nil { // We are not reporting the error upstream because we will anyway // shut the domain out at the end of the shutdown sequence log.WithError(err).Warn("Failed sending Kill signal to runtime") } case <-exitedChannel: } } else { log.Warn("The runtime was not started.") } log.Debug("Shutdown the runtime.") } func (s *shutdownContext) shutdownAgents(execCtx *rapidContext, start time.Time, deadline time.Time, reason string) { // For each external agent, if agent is launched: // 1. Send Shutdown event if subscribed for it, else send SIGKILL to process group // 2. Wait for all Shutdown-subscribed agents to exit with deadline // 3. Send SIGKILL to process group for Shutdown-subscribed agents on deadline log.Debug("Shutting down the agents.") execCtx.renderingService.SetRenderer( &rendering.ShutdownRenderer{ AgentEvent: model.AgentShutdownEvent{ AgentEvent: &model.AgentEvent{ EventType: "SHUTDOWN", DeadlineMs: deadline.UnixNano() / (1000 * 1000), }, ShutdownReason: reason, }, }) var wg sync.WaitGroup // clear agentsAwaitingExit from last shutdownAgents s.agentsAwaitingExit = make(map[string]*core.ExternalAgent) for _, a := range execCtx.registrationService.GetExternalAgents() { name := fmt.Sprintf("extension-%s-%d", a.Name, execCtx.runtimeDomainGeneration) exitedChannel, found := s.getExitedChannel(name) if !found { log.Warnf("Agent %s failed to launch, therefore skipping shutting it down.", a) continue } wg.Add(1) if a.IsSubscribed(core.ShutdownEvent) { log.Debugf("Agent %s is registered for the shutdown event.", a) s.agentsAwaitingExit[name] = a go func(name string, agent *core.ExternalAgent) { defer wg.Done() agent.Release() ctx, cancel := context.WithCancel(context.Background()) defer cancel() if execCtx.standaloneMode { ctx, cancel = context.WithDeadline(ctx, deadline) defer cancel() } select { case <-ctx.Done(): log.Warnf("Deadline: the agent %s did not exit after deadline %s; Killing it.", name, deadline) err := execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{ Domain: RuntimeDomain, Name: name, Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis), }) if err != nil { // We are not reporting the error upstream because we will anyway // shut the domain out at the end of the shutdown sequence log.WithError(err).Warn("Failed sending Kill signal to agent") } case <-exitedChannel: } }(name, a) } else { log.Debugf("Agent %s is not registered for the shutdown event, so just killing it.", a) go func(name string) { defer wg.Done() err := execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{ Domain: RuntimeDomain, Name: name, Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis), }) if err != nil { log.WithError(err).Warn("Failed sending Kill signal to agent") } }(name) } } // Wait on the agents subscribed to the shutdown event to voluntary shutting down after receiving the shutdown event or be sigkilled. // In addition to waiting on the agents not subscribed to the shutdown event being sigkilled. wg.Wait() log.Debug("Shutdown the agents.") } func (s *shutdownContext) shutdown(execCtx *rapidContext, deadlineNs int64, reason string) (int64, bool, error) { var err error s.setShuttingDown(true) defer s.setShuttingDown(false) // Fatal errors such as Runtime exit and Extension.Crash // are ignored by the events watcher when shutting down execCtx.appCtx.Delete(appctx.AppCtxFirstFatalErrorKey) runtimeDomainProfiler := &metering.ExtensionsResetDurationProfiler{} // We do not spend any compute time on runtime graceful shutdown if there are no agents if execCtx.registrationService.CountAgents() == 0 { name := fmt.Sprintf("%s-%d", runtimeProcessName, execCtx.runtimeDomainGeneration) _, found := s.getExitedChannel(name) if found { log.Debug("SIGKILLing the runtime as no agents are registered.") err = execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{ Domain: RuntimeDomain, Name: name, Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis), }) if err != nil { // We are not reporting the error upstream because we will anyway // shut the domain out at the end of the shutdown sequence log.WithError(err).Warn("Failed sending Kill signal to runtime") } } else { log.Debugf("Could not find runtime process %s in processes map. Already exited/never started", name) } } else { mono := metering.Monotime() availableNs := deadlineNs - mono if availableNs < 0 { log.Warnf("Deadline is in the past: %v, %v, %v", mono, deadlineNs, availableNs) availableNs = 0 } start := time.Now() runtimeDeadline := start.Add(time.Duration(float64(availableNs) * runtimeDeadlineShare)) agentsDeadline := start.Add(time.Duration(availableNs)) runtimeDomainProfiler.AvailableNs = availableNs runtimeDomainProfiler.Start() s.shutdownRuntime(execCtx, start, runtimeDeadline) s.shutdownAgents(execCtx, start, agentsDeadline, reason) runtimeDomainProfiler.NumAgentsRegisteredForShutdown = len(s.agentsAwaitingExit) } log.Info("Waiting for runtime domain processes termination") if err := s.clearExitedChannel(); err != nil { log.Error(err) } runtimeDomainProfiler.Stop() extensionsResetMs, timeout := runtimeDomainProfiler.CalculateExtensionsResetMs() return extensionsResetMs, timeout, err }