in lambda/rapid/shutdown.go [215:303]
func (s *shutdownContext) shutdownAgents(execCtx *rapidContext, start time.Time, deadline time.Time, reason string) {
// For each external agent, if agent is launched:
// 1. Send Shutdown event if subscribed for it, else send SIGKILL to process group
// 2. Wait for all Shutdown-subscribed agents to exit with deadline
// 3. Send SIGKILL to process group for Shutdown-subscribed agents on deadline
log.Debug("Shutting down the agents.")
execCtx.renderingService.SetRenderer(
&rendering.ShutdownRenderer{
AgentEvent: model.AgentShutdownEvent{
AgentEvent: &model.AgentEvent{
EventType: "SHUTDOWN",
DeadlineMs: deadline.UnixNano() / (1000 * 1000),
},
ShutdownReason: reason,
},
})
var wg sync.WaitGroup
// clear agentsAwaitingExit from last shutdownAgents
s.agentsAwaitingExit = make(map[string]*core.ExternalAgent)
for _, a := range execCtx.registrationService.GetExternalAgents() {
name := fmt.Sprintf("extension-%s-%d", a.Name, execCtx.runtimeDomainGeneration)
exitedChannel, found := s.getExitedChannel(name)
if !found {
log.Warnf("Agent %s failed to launch, therefore skipping shutting it down.", a)
continue
}
wg.Add(1)
if a.IsSubscribed(core.ShutdownEvent) {
log.Debugf("Agent %s is registered for the shutdown event.", a)
s.agentsAwaitingExit[name] = a
go func(name string, agent *core.ExternalAgent) {
defer wg.Done()
agent.Release()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if execCtx.standaloneMode {
ctx, cancel = context.WithDeadline(ctx, deadline)
defer cancel()
}
select {
case <-ctx.Done():
log.Warnf("Deadline: the agent %s did not exit after deadline %s; Killing it.", name, deadline)
err := execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{
Domain: RuntimeDomain,
Name: name,
Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis),
})
if err != nil {
// We are not reporting the error upstream because we will anyway
// shut the domain out at the end of the shutdown sequence
log.WithError(err).Warn("Failed sending Kill signal to agent")
}
case <-exitedChannel:
}
}(name, a)
} else {
log.Debugf("Agent %s is not registered for the shutdown event, so just killing it.", a)
go func(name string) {
defer wg.Done()
err := execCtx.supervisor.Kill(context.Background(), &supvmodel.KillRequest{
Domain: RuntimeDomain,
Name: name,
Deadline: time.Now().Add(time.Millisecond * supervisorBlockingMaxMillis),
})
if err != nil {
log.WithError(err).Warn("Failed sending Kill signal to agent")
}
}(name)
}
}
// Wait on the agents subscribed to the shutdown event to voluntary shutting down after receiving the shutdown event or be sigkilled.
// In addition to waiting on the agents not subscribed to the shutdown event being sigkilled.
wg.Wait()
log.Debug("Shutdown the agents.")
}