func()

in pkg/component/runtime/service.go [111:291]


func (s *serviceRuntime) Run(ctx context.Context, comm Communicator) (err error) {
	// The teardownCheckingTimeout is set to the same amount as the checkin timeout for now
	teardownCheckinTimeout := s.checkinPeriod()
	teardownCheckinTimer := time.NewTimer(teardownCheckinTimeout)
	defer teardownCheckinTimer.Stop()

	// Stop teardown checkin timeout timer initially
	teardownCheckinTimer.Stop()

	checkinTimer := time.NewTimer(s.checkinPeriod())
	defer checkinTimer.Stop()

	// Stop the check-ins timer initially
	checkinTimer.Stop()

	var (
		cis            *connInfoServer
		lastCheckin    time.Time
		missedCheckins int
		tearingDown    bool
		// flag that signals if we are already stopping
		stopping       bool
		ignoreCheckins bool
	)

	cisStop := func() {
		if cis != nil {
			_ = cis.stop()
			cis = nil
		}
	}
	defer cisStop()

	onStop := func(am actionMode) {
		if stopping {
			s.log.Debugf("service %s is already stopping: skipping...", s.name())
			return
		}
		// the flag is set once and never reset since the serviceRuntime object
		// is not supposed to be reused once it's stopping
		stopping = true
		// Stop check-in timer
		s.log.Debugf("stop check-in timer for %s service", s.name())
		checkinTimer.Stop()

		// Stop connection info
		s.log.Debugf("stop connection info for %s service", s.name())
		cisStop()

		// Stop service
		s.stop(ctx, comm, lastCheckin, am == actionTeardown)

		// Service is stopped, ignore checkins in order to avoid clogging the watch channel
		ignoreCheckins = true
	}

	processTeardown := func(am actionMode, signed *component.Signed) {
		s.log.Debugf("start teardown for %s service", s.name())
		// Inject new signed
		newComp, err := injectSigned(s.comp, signed)
		if err != nil {
			s.log.Errorf("failed to inject signed configuration for %s service, err: %v", s.name(), err)
		}

		s.log.Debugf("set teardown timer %v for %s service", teardownCheckinTimeout, s.name())
		// Set teardown timeout timer
		teardownCheckinTimer.Reset(teardownCheckinTimeout)

		// Process newComp update
		// This should send component update that should cause service checkin
		s.log.Debugf("process new comp config for %s service", s.name())
		s.updateCompConfig(newComp, comm)
	}

	onTeardown := func(as actionModeSigned) {
		tamperProtection := features.TamperProtection()
		s.log.Debugf("got teardown for %s service, tearingDown: %v, tamperProtection: %v", s.name(), tearingDown, tamperProtection)

		// If tamper protection is disabled do the old behavior
		if !tamperProtection {
			onStop(as.actionMode)
			return
		}

		if !tearingDown {
			tearingDown = true
			processTeardown(as.actionMode, as.signed)
		}
	}

	for {
		var err error
		select {
		case <-ctx.Done():
			s.log.Debug("context is done. exiting.")
			return ctx.Err()
		case as := <-s.actionCh:
			s.log.Debugf("got action %v for %s service", as.actionMode, s.name())
			switch as.actionMode {
			case actionStart:
				// Initial state on start
				lastCheckin = time.Time{}
				ignoreCheckins = false
				missedCheckins = 0
				checkinTimer.Stop()
				cisStop()

				// Start connection info
				if cis == nil {
					var address string
					// [gRPC:8.15] Uncomment after 8.14 when Endpoint is ready for local gRPC
					// isLocal := s.isLocal

					// [gRPC:8.15] Set connection info to local socket always for 8.14. Remove when Endpoint is ready for local gRPC
					isLocal := true
					address, err = getConnInfoServerAddress(runtime.GOOS, isLocal, s.comp.InputSpec.Spec.Service.CPort, s.comp.InputSpec.Spec.Service.CSocket)
					if err != nil {
						err = fmt.Errorf("failed to create connection info service address for %s: %w", s.name(), err)
						break
					}
					s.log.Infof("Creating connection info server for %s service, address: %v", s.name(), address)
					cis, err = newConnInfoServer(s.log, comm, address)
					if err != nil {
						err = fmt.Errorf("failed to start connection info service %s: %w", s.name(), err)
						break
					}
				}

				// Start service
				err = s.start(ctx)
				if err != nil {
					cisStop()
					break
				}

				// Start check-in timer
				checkinTimer.Reset(s.checkinPeriod())
			case actionStop:
				onStop(as.actionMode)
			case actionTeardown:
				onTeardown(as)
			}
			if err != nil {
				s.forceCompState(client.UnitStateFailed, err.Error())
			}
		case newComp := <-s.compCh:
			s.processNewComp(newComp, comm)
		case checkin := <-comm.CheckinObserved():
			s.log.Debugf("got check-in for %s service, tearingDown: %v, ignoreCheckins: %v", s.name(), tearingDown, ignoreCheckins)
			tamperProtection := features.TamperProtection()
			if tamperProtection { // If tamper protection feature flag is enabled, new behavior
				// Got check-in upon teardown update
				// tearingDown can be set to true only if tamper protection feature is enabled
				if tearingDown {
					tearingDown = false
					teardownCheckinTimer.Stop()
					onStop(actionTeardown)
				} else {
					// Ignore checkins if the service was stopped by the action
					if !ignoreCheckins {
						// Only process checkin if in "tearing down" sequence
						s.processCheckin(checkin, comm, &lastCheckin)
					}
				}
			} else { // If tamper protection feature flag is disabled, old behavior
				s.processCheckin(checkin, comm, &lastCheckin)
			}
		case <-checkinTimer.C:
			s.checkStatus(s.checkinPeriod(), &lastCheckin, &missedCheckins)
			checkinTimer.Reset(s.checkinPeriod())
		case <-teardownCheckinTimer.C:
			s.log.Debugf("got tearing down timeout for %s service", s.name())
			// Teardown timed out
			// tearingDown can be set to true only if tamper protection feature is enabled
			if tearingDown {
				tearingDown = false
				onStop(actionTeardown)
			}
		}
	}
}