lambda/supervisor/local_supervisor.go (236 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package supervisor import ( "context" "errors" "fmt" "os/exec" "runtime" "sync" "syscall" "time" log "github.com/sirupsen/logrus" "go.amzn.com/lambda/supervisor/model" ) // typecheck interface compliance var _ model.SupervisorClient = (*LocalSupervisor)(nil) type process struct { // pid of the running process pid int // channel that can be use to block // while waiting on process termination. termination chan struct{} } type LocalSupervisor struct { events chan model.Event processMapLock sync.Mutex processMap map[string]process freezeThawCycleStart time.Time RootPath string } func NewLocalSupervisor() *LocalSupervisor { return &LocalSupervisor{ events: make(chan model.Event), processMap: make(map[string]process), RootPath: "/", } } func (*LocalSupervisor) Start(ctx context.Context, req *model.StartRequest) error { return nil } func (*LocalSupervisor) Configure(ctx context.Context, req *model.ConfigureRequest) error { return nil } func (*LocalSupervisor) Exit(ctx context.Context) {} func (s *LocalSupervisor) Exec(ctx context.Context, req *model.ExecRequest) error { if req.Domain != "runtime" { log.Debug("Exec is a no op if domain != runtime") return nil } command := exec.Command(req.Path, req.Args...) if req.Env != nil { envStrings := make([]string, 0, len(*req.Env)) for key, value := range *req.Env { envStrings = append(envStrings, key+"="+value) } command.Env = envStrings } if req.Cwd != nil && *req.Cwd != "" { command.Dir = *req.Cwd } if req.ExtraFiles != nil { command.ExtraFiles = *req.ExtraFiles } command.Stdout = req.StdoutWriter command.Stderr = req.StderrWriter command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} err := command.Start() if err != nil { return err // TODO Use supevisor specific error } pid := command.Process.Pid termination := make(chan struct{}) s.processMapLock.Lock() s.processMap[req.Name] = process{ pid: pid, termination: termination, } s.processMapLock.Unlock() // The first freeze thaw cycle starts on Exec() at init time s.freezeThawCycleStart = time.Now() go func() { err = command.Wait() // close the termination channel to unblock whoever's blocked on // it (used to implement kill's blocking behaviour) close(termination) var cell int32 var exitStatus *int32 var signo *int32 var exitErr *exec.ExitError if err == nil { exitStatus = &cell } else if errors.As(err, &exitErr) { if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { if code := status.ExitStatus(); code >= 0 { cell = int32(code) exitStatus = &cell } else { cell = int32(status.Signal()) signo = &cell } } } if signo == nil && exitStatus == nil { log.Error("Cannot convert process exit status to unix WaitStatus. This is unexpected. Assuming ExitStatus 1") cell = 1 exitStatus = &cell } s.events <- model.Event{ Time: uint64(time.Now().UnixMilli()), Event: model.EventData{ Domain: &req.Domain, Name: &req.Name, Signo: signo, ExitStatus: exitStatus, }, } }() return nil } func kill(p process, name string, deadline time.Time) error { // kill should report success if the process terminated by the time //supervisor receives the request. select { // if this case is selected, the channel is closed, // which means the process is terminated case <-p.termination: log.Debugf("Process %s already terminated.", name) return nil default: log.Infof("Sending SIGKILL to %s(%d).", name, p.pid) } if (time.Since(deadline)) > 0 { return fmt.Errorf("invalid timeout while killing %s", name) } pgid, err := syscall.Getpgid(p.pid) if err == nil { // Negative pid sends signal to all in process group syscall.Kill(-pgid, syscall.SIGKILL) } else { syscall.Kill(p.pid, syscall.SIGKILL) } ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() // block until the (main) process exits // or the timeout fires select { case <-p.termination: return nil case <-ctx.Done(): return fmt.Errorf("timed out while trying to SIGKILL %s", name) } } func (s *LocalSupervisor) Kill(ctx context.Context, req *model.KillRequest) error { if req.Domain != "runtime" { log.Debug("Kill is a no op if domain != runtime") return nil } s.processMapLock.Lock() process, ok := s.processMap[req.Name] s.processMapLock.Unlock() if !ok { msg := "Unknown process" return &model.SupervisorError{ Kind: model.NoSuchEntity, Message: &msg, } } return kill(process, req.Name, req.Deadline) } func (s *LocalSupervisor) Terminate(ctx context.Context, req *model.TerminateRequest) error { if req.Domain != "runtime" { log.Debug("Terminate is no op if domain != runtime") return nil } s.processMapLock.Lock() process, ok := s.processMap[req.Name] pid := process.pid s.processMapLock.Unlock() if !ok { msg := "Unknown process" err := &model.SupervisorError{ Kind: model.NoSuchEntity, Message: &msg, } log.WithError(err).Errorf("Process %s not found in local supervisor map", req.Name) return err } pgid, err := syscall.Getpgid(pid) if err == nil { // Negative pid sends signal to all in process group // best effort, ignore errors _ = syscall.Kill(-pgid, syscall.SIGTERM) } else { _ = syscall.Kill(pid, syscall.SIGTERM) } return nil } func (s *LocalSupervisor) Stop(ctx context.Context, req *model.StopRequest) (*model.StopResponse, error) { if req.Domain != "runtime" { log.Debug("Shutdown is no op if domain != runtime") return &model.StopResponse{}, nil } // shut down kills all the processes in the map s.processMapLock.Lock() defer s.processMapLock.Unlock() nprocs := len(s.processMap) successes := make(chan struct{}) errors := make(chan error) for name, proc := range s.processMap { go func(n string, p process) { log.Debugf("Killing %s", n) err := kill(p, n, req.Deadline) if err != nil { errors <- err } else { successes <- struct{}{} } }(name, proc) } var err error for i := 0; i < nprocs; i++ { select { case <-successes: case e := <-errors: if err == nil { err = fmt.Errorf("shutdown failed: %s", e.Error()) } } } s.processMap = make(map[string]process) return nil, err } func (s *LocalSupervisor) Freeze(ctx context.Context, req *model.FreezeRequest) (*model.FreezeResponse, error) { // We return mocked freeze/thaw cycle metrics to mimic usage metrics in standalone mode var m runtime.MemStats runtime.ReadMemStats(&m) return &model.FreezeResponse{ CycleDeltaMetrics: model.CycleDeltaMetrics{ DomainCPURunNs: uint64(time.Since(s.freezeThawCycleStart).Nanoseconds()), DomainRunNs: uint64(time.Since(s.freezeThawCycleStart).Nanoseconds()), DomainMaxMemoryUsageBytes: m.Alloc, MicrovmCPURunNs: uint64(time.Since(s.freezeThawCycleStart).Nanoseconds()), }, }, nil } func (s *LocalSupervisor) Thaw(ctx context.Context, req *model.ThawRequest) error { s.freezeThawCycleStart = time.Now() return nil } func (s *LocalSupervisor) Ping(ctx context.Context) error { return nil } func (s *LocalSupervisor) Events(ctx context.Context, req *model.EventsRequest) (<-chan model.Event, error) { return s.events, nil }