func()

in lambda/rapid/shutdown.go [215:303]


func (s *shutdownContext) shutdownAgents(execCtx *rapidContext, start time.Time, deadline time.Time, reason string) {
	// For each external agent, if agent is launched:
	// 1. Send Shutdown event if subscribed for it, else send SIGKILL to process group
	// 2. Wait for all Shutdown-subscribed agents to exit with deadline
	// 3. Send SIGKILL to process group for Shutdown-subscribed agents on deadline

	log.Debug("Shutting down the agents.")
	execCtx.renderingService.SetRenderer(
		&rendering.ShutdownRenderer{
			AgentEvent: model.AgentShutdownEvent{
				AgentEvent: &model.AgentEvent{
					EventType:  "SHUTDOWN",
					DeadlineMs: deadline.UnixNano() / (1000 * 1000),
				},
				ShutdownReason: reason,
			},
		})

	var wg sync.WaitGroup

	// clear agentsAwaitingExit from last shutdownAgents
	s.agentsAwaitingExit = make(map[string]*core.ExternalAgent)

	for _, a := range execCtx.registrationService.GetExternalAgents() {
		name := fmt.Sprintf("extension-%s-%d", a.Name, execCtx.runtimeDomainGeneration)
		exitedChannel, found := s.getExitedChannel(name)

		if !found {
			log.Warnf("Agent %s failed to launch, therefore skipping shutting it down.", a)
			continue
		}

		wg.Add(1)

		if a.IsSubscribed(core.ShutdownEvent) {
			log.Debugf("Agent %s is registered for the shutdown event.", a)
			s.agentsAwaitingExit[name] = a

			go func(name string, agent *core.ExternalAgent) {
				defer wg.Done()

				agent.Release()

				ctx, cancel := context.WithCancel(context.Background())
				defer cancel()
				if execCtx.standaloneMode {
					ctx, cancel = context.WithDeadline(ctx, deadline)
					defer cancel()
				}

				select {
				case <-ctx.Done():
					log.Warnf("Deadline: the agent %s did not exit after deadline %s; Killing it.", name, deadline)
					err := execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{
						Domain:   RuntimeDomain,
						Name:     name,
						Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis),
					})
					if err != nil {
						// We are not reporting the error upstream because we will anyway
						// shut the domain out at the end of the shutdown sequence
						log.WithError(err).Warn("Failed sending Kill signal to agent")
					}
				case <-exitedChannel:
				}
			}(name, a)
		} else {
			log.Debugf("Agent %s is not registered for the shutdown event, so just killing it.", a)

			go func(name string) {
				defer wg.Done()

				err := execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{
					Domain:   RuntimeDomain,
					Name:     name,
					Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis),
				})
				if err != nil {
					log.WithError(err).Warn("Failed sending Kill signal to agent")
				}
			}(name)
		}
	}

	// Wait on the agents subscribed to the shutdown event to voluntary shutting down after receiving the shutdown event or be sigkilled.
	// In addition to waiting on the agents not subscribed to the shutdown event being sigkilled.
	wg.Wait()
	log.Debug("Shutdown the agents.")
}