pkg/component/runtime/command.go (471 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package runtime
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/process"
"github.com/elastic/elastic-agent/pkg/utils"
)
type actionMode int
const (
actionTeardown = actionMode(-1)
actionStop = actionMode(0)
actionStart = actionMode(1)
runDirMod = 0770
envAgentComponentID = "AGENT_COMPONENT_ID"
envAgentComponentType = "AGENT_COMPONENT_TYPE"
stateUnknownMessage = "Unknown"
)
func (m actionMode) String() string {
switch m {
case actionTeardown:
return "teardown"
case actionStop:
return "stop"
case actionStart:
return "start"
}
return ""
}
type MonitoringManager interface {
// EnrichArgs enriches arguments provided to application, in
// order to enable monitoring of a component.Component identified
// by its ID and the binary that provides it. The binary name is
// obtained from component.Component.BinaryName for the component.
EnrichArgs(id, binary string, args []string) []string
// Prepare and Cleanup set up and release resources required
// for monitoring the component.
Prepare(id string) error
Cleanup(id string) error
}
type procState struct {
proc *process.Info
state *os.ProcessState
}
// commandRuntime provides the command runtime for running a component as a subprocess.
type commandRuntime struct {
log *logger.Logger
logStd *logWriter
logErr *logWriter
current component.Component
monitor MonitoringManager
// ch is the reporting channel for the current state. When a policy change
// or client checkin affects the component state, its new value is sent
// here and handled by (componentRuntimeState).runLoop.
ch chan ComponentState
// When the managed process closes, its termination status is sent on procCh
// by the watching goroutine, and handled by (*commandRuntime).Run.
procCh chan procState
// compCh forwards new component metadata from the runtime manager to
// the command runtime. It is written by calls to (*commandRuntime).Update
// and read by the run loop in (*commandRuntime).Run.
compCh chan component.Component
// The most recent mode received on actionCh. The mode will be either
// actionStart (indicating the process should be running, and should be
// created if it is not), or actionStop or actionTeardown (indicating that
// it should terminate).
actionState actionMode
// actionState is changed by sending its new value on actionCh, where it is
// handled by (*commandRuntime).Run.
actionCh chan actionMode
proc *process.Info
state ComponentState
lastCheckin time.Time
missedCheckins int
restartBucket *rate.Limiter
}
// newCommandRuntime creates a new command runtime for the provided component.
func newCommandRuntime(comp component.Component, log *logger.Logger, monitor MonitoringManager) (*commandRuntime, error) {
c := &commandRuntime{
log: log,
current: comp,
monitor: monitor,
ch: make(chan ComponentState),
actionCh: make(chan actionMode, 1),
procCh: make(chan procState),
compCh: make(chan component.Component, 1),
actionState: actionStop,
state: newComponentState(&comp),
}
cmdSpec := c.getCommandSpec()
if cmdSpec == nil {
return nil, errors.New("must have command defined in specification")
}
ll, unitLevels := getLogLevels(comp)
c.logStd = createLogWriter(c.current, log, c.getCommandSpec(), c.getSpecType(), c.getSpecBinaryName(), ll, unitLevels, logSourceStdout)
ll, unitLevels = getLogLevels(comp) // don't want to share mapping of units (so new map is generated)
c.logErr = createLogWriter(c.current, log, c.getCommandSpec(), c.getSpecType(), c.getSpecBinaryName(), ll, unitLevels, logSourceStderr)
c.restartBucket = newRateLimiter(cmdSpec.RestartMonitoringPeriod, cmdSpec.MaxRestartsPerPeriod)
return c, nil
}
// Run starts the runtime for the component.
//
// Called by Manager inside a goroutine. Run does not return until the passed in context is done. Run is always
// called before any of the other methods in the interface and once the context is done none of those methods should
// ever be called again.
func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error {
cmdSpec := c.getCommandSpec()
checkinPeriod := cmdSpec.Timeouts.Checkin
restartPeriod := cmdSpec.Timeouts.Restart
c.forceCompState(client.UnitStateStarting, "Starting")
t := time.NewTicker(checkinPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case as := <-c.actionCh:
c.actionState = as
switch as {
case actionStart:
if err := c.start(comm); err != nil {
c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
}
t.Reset(checkinPeriod)
case actionStop, actionTeardown:
if err := c.stop(ctx); err != nil {
c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
}
}
case ps := <-c.procCh:
// ignores old processes
if ps.proc == c.proc {
c.proc = nil
if c.handleProc(ps.state) {
// start again after restart period
t.Reset(restartPeriod)
}
}
case newComp := <-c.compCh:
c.current = newComp
c.syncLogLevels()
sendExpected := c.state.syncExpected(&newComp)
changed := c.state.syncUnits(&newComp)
if sendExpected || c.state.unsettled() {
comm.CheckinExpected(c.state.toCheckinExpected(), nil)
}
if changed {
c.sendObserved()
}
case checkin := <-comm.CheckinObserved():
sendExpected := false
changed := false
if c.state.State == client.UnitStateStarting {
// first observation after start set component to healthy
c.state.State = client.UnitStateHealthy
c.state.Message = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID)
changed = true
}
if c.lastCheckin.IsZero() {
// first check-in
sendExpected = true
}
// Warning lastCheckin must contain a
// monotonic clock. Functions like Local(),
// UTC(), Round(), AddDate(), etc. remove the
// monotonic clock. See
// https://pkg.go.dev/time
c.lastCheckin = time.Now()
if c.state.syncCheckin(checkin) {
changed = true
}
if c.state.unsettled() {
sendExpected = true
}
if sendExpected {
checkinExpected := c.state.toCheckinExpected()
comm.CheckinExpected(checkinExpected, checkin)
}
if changed {
c.sendObserved()
}
if c.state.cleanupStopped() {
c.sendObserved()
}
case <-t.C:
t.Reset(checkinPeriod)
if c.actionState == actionStart {
if c.proc == nil {
// not running, but should be running
if err := c.start(comm); err != nil {
c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
}
} else {
// running and should be running
//
// Warning now must contain a
// monotonic clock. Functions like Local(),
// UTC(), Round(), AddDate(), etc. remove the
// monotonic clock. See
// https://pkg.go.dev/time
now := time.Now()
if now.Sub(c.lastCheckin) <= checkinPeriod {
c.missedCheckins = 0
} else {
c.missedCheckins++
c.log.Debugf("Last check-in was: %s, now is: %s. The diff %s is higher than allowed %s.", c.lastCheckin.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), now.Sub(c.lastCheckin), checkinPeriod)
}
if c.missedCheckins == 0 {
c.compState(client.UnitStateHealthy)
} else if c.missedCheckins > 0 && c.missedCheckins < maxCheckinMisses {
c.compState(client.UnitStateDegraded)
} else if c.missedCheckins >= maxCheckinMisses {
// something is wrong; the command should be checking in
//
// at this point it is assumed the sub-process has locked up and will not respond to a nice
// termination signal, so we jump directly to killing the process
msg := fmt.Sprintf("Failed: pid '%d' missed %d check-ins and will be killed", c.proc.PID, maxCheckinMisses)
c.forceCompState(client.UnitStateFailed, msg)
_ = c.proc.Kill() // watcher will handle it from here
}
}
}
}
}
}
// Watch returns the channel that sends component state.
//
// Channel should send a new state anytime a state for a unit or the whole component changes.
func (c *commandRuntime) Watch() <-chan ComponentState {
return c.ch
}
// Start starts the component.
//
// Non-blocking and never returns an error.
func (c *commandRuntime) Start() error {
// clear channel so it's the latest action
select {
case <-c.actionCh:
default:
}
c.actionCh <- actionStart
return nil
}
// Update updates the currComp runtime with a new-revision for the component definition.
//
// Non-blocking and never returns an error.
func (c *commandRuntime) Update(comp component.Component) error {
// clear channel so it's the latest component
select {
case <-c.compCh:
default:
}
c.compCh <- comp
return nil
}
// Stop stops the component.
//
// Non-blocking and never returns an error.
func (c *commandRuntime) Stop() error {
// clear channel so it's the latest action
select {
case <-c.actionCh:
default:
}
c.actionCh <- actionStop
return nil
}
// Teardown tears down the component.
//
// Non-blocking and never returns an error.
func (c *commandRuntime) Teardown(_ *component.Signed) error {
// clear channel so it's the latest action
select {
case <-c.actionCh:
default:
}
c.actionCh <- actionTeardown
return nil
}
// forceCompState force updates the state for the entire component, forcing that state on all units.
func (c *commandRuntime) forceCompState(state client.UnitState, msg string) {
if c.state.forceState(state, msg) {
c.sendObserved()
}
}
// compState updates just the component state not all the units.
func (c *commandRuntime) compState(state client.UnitState) {
msg := stateUnknownMessage
if state == client.UnitStateHealthy {
msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID)
} else if state == client.UnitStateDegraded {
if c.missedCheckins == 1 {
msg = fmt.Sprintf("Degraded: pid '%d' missed 1 check-in", c.proc.PID)
} else {
msg = fmt.Sprintf("Degraded: pid '%d' missed %d check-ins", c.proc.PID, c.missedCheckins)
}
}
if c.state.compState(state, msg) {
c.sendObserved()
}
}
func (c *commandRuntime) sendObserved() {
c.ch <- c.state.Copy()
}
func (c *commandRuntime) start(comm Communicator) error {
if c.proc != nil {
// already running
return nil
}
cmdSpec := c.getCommandSpec()
env := make([]string, 0, len(cmdSpec.Env)+2)
for _, e := range cmdSpec.Env {
env = append(env, fmt.Sprintf("%s=%s", e.Name, e.Value))
}
env = append(env, fmt.Sprintf("%s=%s", envAgentComponentID, c.current.ID))
env = append(env, fmt.Sprintf("%s=%s", envAgentComponentType, c.getSpecType()))
uid, gid := os.Geteuid(), os.Getegid()
workDir, err := c.workDir(uid, gid)
if err != nil {
return err
}
path, err := filepath.Abs(c.getSpecBinaryPath())
if err != nil {
return fmt.Errorf("failed to determine absolute path: %w", err)
}
err = utils.HasStrictExecPerms(path, uid)
if err != nil {
return fmt.Errorf("execution of component prevented: %w", err)
}
if err := c.monitor.Prepare(c.current.ID); err != nil {
return err
}
args := c.monitor.EnrichArgs(c.current.ID, c.getSpecBinaryName(), cmdSpec.Args)
// differentiate data paths
dataPath := filepath.Join(paths.Run(), c.current.ID)
_ = os.MkdirAll(dataPath, 0755)
args = append(args, "-E", "path.data="+dataPath)
// reset checkin state before starting the process.
c.lastCheckin = time.Time{}
c.missedCheckins = 0
proc, err := process.Start(path,
process.WithArgs(args),
process.WithEnv(env),
process.WithCmdOptions(attachOutErr(c.logStd, c.logErr), dirPath(workDir)))
if err != nil {
return err
}
c.proc = proc
c.forceCompState(client.UnitStateStarting, fmt.Sprintf("Starting: spawned pid '%d'", c.proc.PID))
c.startWatcher(proc, comm)
return nil
}
func (c *commandRuntime) stop(ctx context.Context) error {
if c.proc == nil {
// already stopped, ensure that state of the component is also stopped
if c.state.State != client.UnitStateStopped {
if c.state.State == client.UnitStateFailed {
c.forceCompState(client.UnitStateStopped, "Stopped: never started successfully")
} else {
c.forceCompState(client.UnitStateStopped, "Stopped: already stopped")
}
}
return nil
}
// cleanup reserved resources related to monitoring
defer c.monitor.Cleanup(c.current.ID) //nolint:errcheck // this is ok
cmdSpec := c.getCommandSpec()
go func(info *process.Info, timeout time.Duration) {
t := time.NewTimer(timeout)
defer t.Stop()
select {
case <-ctx.Done():
return
case <-t.C:
// kill no matter what (might already be stopped)
_ = info.Kill()
}
}(c.proc, cmdSpec.Timeouts.Stop)
return c.proc.Stop()
}
func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {
go func() {
err := comm.WriteStartUpInfo(info.Stdin)
if err != nil {
_, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)))
// kill instantly
_ = info.Kill()
} else {
_ = info.Stdin.Close()
}
ch := info.Wait()
s := <-ch
c.procCh <- procState{
proc: info,
state: s,
}
}()
}
func (c *commandRuntime) handleProc(state *os.ProcessState) bool {
switch c.actionState {
case actionStart:
if c.restartBucket != nil && c.restartBucket.Allow() {
stopMsg := fmt.Sprintf("Suppressing FAILED state due to restart for '%d' exited with code '%d'", state.Pid(), state.ExitCode())
c.forceCompState(client.UnitStateStopped, stopMsg)
} else {
// report failure only if bucket is full of restart events
stopMsg := fmt.Sprintf("Failed: pid '%d' exited with code '%d'", state.Pid(), state.ExitCode())
c.forceCompState(client.UnitStateFailed, stopMsg)
}
return true
case actionStop, actionTeardown:
// stopping (should have exited)
if c.actionState == actionTeardown {
// teardown so the entire component has been removed (cleanup work directory)
_ = os.RemoveAll(c.workDirPath())
}
stopMsg := fmt.Sprintf("Stopped: pid '%d' exited with code '%d'", state.Pid(), state.ExitCode())
c.forceCompState(client.UnitStateStopped, stopMsg)
}
return false
}
func (c *commandRuntime) workDirPath() string {
return filepath.Join(paths.Run(), c.current.ID)
}
func (c *commandRuntime) workDir(uid int, gid int) (string, error) {
path := c.workDirPath()
err := os.MkdirAll(path, runDirMod)
if err != nil {
return "", fmt.Errorf("failed to create path %q: %w", path, err)
}
if runtime.GOOS == component.Windows {
return path, nil
}
err = os.Chown(path, uid, gid)
if err != nil {
return "", fmt.Errorf("failed to chown %q: %w", path, err)
}
err = os.Chmod(path, runDirMod)
if err != nil {
return "", fmt.Errorf("failed to chmod %q: %w", path, err)
}
return path, nil
}
func (c *commandRuntime) getSpecType() string {
if c.current.InputSpec != nil {
return c.current.InputSpec.InputType
}
return ""
}
func (c *commandRuntime) getSpecBinaryName() string {
return c.current.BinaryName()
}
func (c *commandRuntime) getSpecBinaryPath() string {
if c.current.InputSpec != nil {
return c.current.InputSpec.BinaryPath
}
return ""
}
func (c *commandRuntime) getCommandSpec() *component.CommandSpec {
if c.current.InputSpec != nil {
return c.current.InputSpec.Spec.Command
}
return nil
}
func (c *commandRuntime) syncLogLevels() {
ll, unitLevels := getLogLevels(c.current)
c.logStd.SetLevels(ll, unitLevels)
ll, unitLevels = getLogLevels(c.current) // don't want to share mapping of units (so new map is generated)
c.logErr.SetLevels(ll, unitLevels)
}
func attachOutErr(stdOut *logWriter, stdErr *logWriter) process.CmdOption {
return func(cmd *exec.Cmd) error {
cmd.Stdout = stdOut
cmd.Stderr = stdErr
return nil
}
}
func createLogWriter(comp component.Component, baseLog *logger.Logger, cmdSpec *component.CommandSpec, typeStr string, binaryName string, ll zapcore.Level, unitLevels map[string]zapcore.Level, src logSource) *logWriter {
dataset := fmt.Sprintf("elastic_agent.%s", strings.ReplaceAll(strings.ReplaceAll(binaryName, "-", "_"), "/", "_"))
logger := baseLog.With(
"component", map[string]interface{}{
"id": comp.ID,
"type": typeStr,
"binary": binaryName,
"dataset": dataset,
},
"log", map[string]interface{}{
"source": comp.ID,
},
)
return newLogWriter(logger.Core(), cmdSpec.Log, ll, unitLevels, src)
}
// getLogLevels returns the lowest log level and a mapping between each unit and its defined log level.
func getLogLevels(comp component.Component) (zapcore.Level, map[string]zapcore.Level) {
baseLevel := zapcore.ErrorLevel
unitLevels := make(map[string]zapcore.Level)
for _, unit := range comp.Units {
ll := toZapcoreLevel(unit.LogLevel)
unitLevels[unit.ID] = ll
if ll < baseLevel {
baseLevel = ll
}
}
return baseLevel, unitLevels
}
func toZapcoreLevel(unitLevel client.UnitLogLevel) zapcore.Level {
switch unitLevel {
case client.UnitLogLevelError:
return zapcore.ErrorLevel
case client.UnitLogLevelWarn:
return zapcore.WarnLevel
case client.UnitLogLevelInfo:
return zapcore.InfoLevel
case client.UnitLogLevelDebug:
return zapcore.DebugLevel
case client.UnitLogLevelTrace:
// zap doesn't support trace
return zapcore.DebugLevel
}
// unknown level (default to info)
return zapcore.InfoLevel
}
func dirPath(path string) process.CmdOption {
return func(cmd *exec.Cmd) error {
cmd.Dir = path
return nil
}
}
func newRateLimiter(restartMonitoringPeriod time.Duration, maxEventsPerPeriod int) *rate.Limiter {
if restartMonitoringPeriod <= 0 || maxEventsPerPeriod <= 0 {
return nil
}
period := restartMonitoringPeriod.Seconds()
events := float64(maxEventsPerPeriod)
frequency := events / period
if frequency > 0 {
bucketSize := rate.Limit(frequency)
return rate.NewLimiter(bucketSize, maxEventsPerPeriod)
}
return nil
}