in pkg/component/runtime/command.go [148:269]
func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error {
cmdSpec := c.getCommandSpec()
checkinPeriod := cmdSpec.Timeouts.Checkin
restartPeriod := cmdSpec.Timeouts.Restart
c.forceCompState(client.UnitStateStarting, "Starting")
t := time.NewTicker(checkinPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case as := <-c.actionCh:
c.actionState = as
switch as {
case actionStart:
if err := c.start(comm); err != nil {
c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
}
t.Reset(checkinPeriod)
case actionStop, actionTeardown:
if err := c.stop(ctx); err != nil {
c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
}
}
case ps := <-c.procCh:
// ignores old processes
if ps.proc == c.proc {
c.proc = nil
if c.handleProc(ps.state) {
// start again after restart period
t.Reset(restartPeriod)
}
}
case newComp := <-c.compCh:
c.current = newComp
c.syncLogLevels()
sendExpected := c.state.syncExpected(&newComp)
changed := c.state.syncUnits(&newComp)
if sendExpected || c.state.unsettled() {
comm.CheckinExpected(c.state.toCheckinExpected(), nil)
}
if changed {
c.sendObserved()
}
case checkin := <-comm.CheckinObserved():
sendExpected := false
changed := false
if c.state.State == client.UnitStateStarting {
// first observation after start set component to healthy
c.state.State = client.UnitStateHealthy
c.state.Message = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID)
changed = true
}
if c.lastCheckin.IsZero() {
// first check-in
sendExpected = true
}
// Warning lastCheckin must contain a
// monotonic clock. Functions like Local(),
// UTC(), Round(), AddDate(), etc. remove the
// monotonic clock. See
// https://pkg.go.dev/time
c.lastCheckin = time.Now()
if c.state.syncCheckin(checkin) {
changed = true
}
if c.state.unsettled() {
sendExpected = true
}
if sendExpected {
checkinExpected := c.state.toCheckinExpected()
comm.CheckinExpected(checkinExpected, checkin)
}
if changed {
c.sendObserved()
}
if c.state.cleanupStopped() {
c.sendObserved()
}
case <-t.C:
t.Reset(checkinPeriod)
if c.actionState == actionStart {
if c.proc == nil {
// not running, but should be running
if err := c.start(comm); err != nil {
c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
}
} else {
// running and should be running
//
// Warning now must contain a
// monotonic clock. Functions like Local(),
// UTC(), Round(), AddDate(), etc. remove the
// monotonic clock. See
// https://pkg.go.dev/time
now := time.Now()
if now.Sub(c.lastCheckin) <= checkinPeriod {
c.missedCheckins = 0
} else {
c.missedCheckins++
c.log.Debugf("Last check-in was: %s, now is: %s. The diff %s is higher than allowed %s.", c.lastCheckin.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), now.Sub(c.lastCheckin), checkinPeriod)
}
if c.missedCheckins == 0 {
c.compState(client.UnitStateHealthy)
} else if c.missedCheckins > 0 && c.missedCheckins < maxCheckinMisses {
c.compState(client.UnitStateDegraded)
} else if c.missedCheckins >= maxCheckinMisses {
// something is wrong; the command should be checking in
//
// at this point it is assumed the sub-process has locked up and will not respond to a nice
// termination signal, so we jump directly to killing the process
msg := fmt.Sprintf("Failed: pid '%d' missed %d check-ins and will be killed", c.proc.PID, maxCheckinMisses)
c.forceCompState(client.UnitStateFailed, msg)
_ = c.proc.Kill() // watcher will handle it from here
}
}
}
}
}
}