func shutdownAgents()

in lambda/rapid/graceful_shutdown.go [126:198]


func shutdownAgents(execCtx *rapidContext, start time.Time, profiler *metering.ExtensionsResetDurationProfiler, deadline time.Time, killAgents bool, reason string, processesExited, sigkilledPids map[int]bool) (map[int]bool, map[int]bool) {
	// For each external agent, if agent is launched:
	// 1. Send Shutdown event if subscribed for it, else send SIGKILL to process group
	// 2. Wait for all Shutdown-subscribed agents to exit with timeout
	// 3. Send SIGKILL to process group for Shutdown-subscribed agents on timeout

	log.Debug("shutdown agents")
	execCtx.renderingService.SetRenderer(
		&rendering.ShutdownRenderer{
			AgentEvent: model.AgentShutdownEvent{
				AgentEvent: &model.AgentEvent{
					EventType:  "SHUTDOWN",
					DeadlineMs: deadline.UnixNano() / (1000 * 1000),
				},
				ShutdownReason: reason,
			},
		})

	pidsToShutdown := make(map[int]*core.ExternalAgent)
	for _, a := range execCtx.registrationService.GetExternalAgents() {
		if a.Pid == 0 {
			log.Warnf("Agent %s failed not launched; skipping shutdown", a)
			continue
		}
		if a.IsSubscribed(core.ShutdownEvent) {
			pidsToShutdown[a.Pid] = a
			a.Release()
		} else {
			if !processesExited[a.Pid] {
				sigkilledPids = sigkillProcessGroup(a.Pid, sigkilledPids)
			}
		}
	}
	profiler.NumAgentsRegisteredForShutdown = len(pidsToShutdown)

	var timerChan <-chan time.Time // default timerChan
	if killAgents {
		timerChan = time.NewTimer(deadline.Sub(start)).C // timerChan with deadline
	}

	timeoutExceeded := false
	for !timeoutExceeded && len(pidsToShutdown) != 0 {
		select {
		case pid := <-execCtx.exitPidChan:
			processesExited[pid] = true
			a, found := pidsToShutdown[pid]
			if !found {
				log.Warnf("Process %d exited unexpectedly", pid)
			} else {
				if err := a.Exited(); err != nil {
					log.Warnf("%s failed to transition to EXITED: %s (current state: %s)", a.String(), err, a.GetState().Name())
				}
				delete(pidsToShutdown, pid)
			}
		case <-timerChan:
			timeoutExceeded = true
		}
	}

	if len(pidsToShutdown) != 0 {
		for pid, agent := range pidsToShutdown {
			if err := agent.ShutdownFailed(); err != nil {
				log.Warnf("%s failed to transition to ShutdownFailed: %s (current state: %s)", agent, err, agent.GetState().Name())
			}
			log.Warnf("Killing agent %s which failed to shutdown", agent)
			if !processesExited[pid] {
				sigkilledPids = sigkillProcessGroup(pid, sigkilledPids)
			}
		}
	}

	return processesExited, sigkilledPids
}