func()

in pkg/component/runtime/command.go [148:269]


func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error {
	cmdSpec := c.getCommandSpec()
	checkinPeriod := cmdSpec.Timeouts.Checkin
	restartPeriod := cmdSpec.Timeouts.Restart
	c.forceCompState(client.UnitStateStarting, "Starting")
	t := time.NewTicker(checkinPeriod)
	defer t.Stop()
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case as := <-c.actionCh:
			c.actionState = as
			switch as {
			case actionStart:
				if err := c.start(comm); err != nil {
					c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
				}
				t.Reset(checkinPeriod)
			case actionStop, actionTeardown:
				if err := c.stop(ctx); err != nil {
					c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
				}
			}
		case ps := <-c.procCh:
			// ignores old processes
			if ps.proc == c.proc {
				c.proc = nil
				if c.handleProc(ps.state) {
					// start again after restart period
					t.Reset(restartPeriod)
				}
			}
		case newComp := <-c.compCh:
			c.current = newComp
			c.syncLogLevels()

			sendExpected := c.state.syncExpected(&newComp)
			changed := c.state.syncUnits(&newComp)
			if sendExpected || c.state.unsettled() {
				comm.CheckinExpected(c.state.toCheckinExpected(), nil)
			}

			if changed {
				c.sendObserved()
			}
		case checkin := <-comm.CheckinObserved():
			sendExpected := false
			changed := false
			if c.state.State == client.UnitStateStarting {
				// first observation after start set component to healthy
				c.state.State = client.UnitStateHealthy
				c.state.Message = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID)
				changed = true
			}
			if c.lastCheckin.IsZero() {
				// first check-in
				sendExpected = true
			}
			// Warning lastCheckin must contain a
			// monotonic clock.  Functions like Local(),
			// UTC(), Round(), AddDate(), etc. remove the
			// monotonic clock.  See
			// https://pkg.go.dev/time
			c.lastCheckin = time.Now()
			if c.state.syncCheckin(checkin) {
				changed = true
			}
			if c.state.unsettled() {
				sendExpected = true
			}
			if sendExpected {
				checkinExpected := c.state.toCheckinExpected()
				comm.CheckinExpected(checkinExpected, checkin)
			}
			if changed {
				c.sendObserved()
			}
			if c.state.cleanupStopped() {
				c.sendObserved()
			}
		case <-t.C:
			t.Reset(checkinPeriod)
			if c.actionState == actionStart {
				if c.proc == nil {
					// not running, but should be running
					if err := c.start(comm); err != nil {
						c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
					}
				} else {
					// running and should be running
					//
					// Warning now must contain a
					// monotonic clock.  Functions like Local(),
					// UTC(), Round(), AddDate(), etc. remove the
					// monotonic clock.  See
					// https://pkg.go.dev/time
					now := time.Now()
					if now.Sub(c.lastCheckin) <= checkinPeriod {
						c.missedCheckins = 0
					} else {
						c.missedCheckins++
						c.log.Debugf("Last check-in was: %s, now is: %s. The diff %s is higher than allowed %s.", c.lastCheckin.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), now.Sub(c.lastCheckin), checkinPeriod)
					}
					if c.missedCheckins == 0 {
						c.compState(client.UnitStateHealthy)
					} else if c.missedCheckins > 0 && c.missedCheckins < maxCheckinMisses {
						c.compState(client.UnitStateDegraded)
					} else if c.missedCheckins >= maxCheckinMisses {
						// something is wrong; the command should be checking in
						//
						// at this point it is assumed the sub-process has locked up and will not respond to a nice
						// termination signal, so we jump directly to killing the process
						msg := fmt.Sprintf("Failed: pid '%d' missed %d check-ins and will be killed", c.proc.PID, maxCheckinMisses)
						c.forceCompState(client.UnitStateFailed, msg)
						_ = c.proc.Kill() // watcher will handle it from here
					}
				}
			}
		}
	}
}