in pkg/component/runtime/service.go [111:291]
func (s *serviceRuntime) Run(ctx context.Context, comm Communicator) (err error) {
// The teardownCheckingTimeout is set to the same amount as the checkin timeout for now
teardownCheckinTimeout := s.checkinPeriod()
teardownCheckinTimer := time.NewTimer(teardownCheckinTimeout)
defer teardownCheckinTimer.Stop()
// Stop teardown checkin timeout timer initially
teardownCheckinTimer.Stop()
checkinTimer := time.NewTimer(s.checkinPeriod())
defer checkinTimer.Stop()
// Stop the check-ins timer initially
checkinTimer.Stop()
var (
cis *connInfoServer
lastCheckin time.Time
missedCheckins int
tearingDown bool
// flag that signals if we are already stopping
stopping bool
ignoreCheckins bool
)
cisStop := func() {
if cis != nil {
_ = cis.stop()
cis = nil
}
}
defer cisStop()
onStop := func(am actionMode) {
if stopping {
s.log.Debugf("service %s is already stopping: skipping...", s.name())
return
}
// the flag is set once and never reset since the serviceRuntime object
// is not supposed to be reused once it's stopping
stopping = true
// Stop check-in timer
s.log.Debugf("stop check-in timer for %s service", s.name())
checkinTimer.Stop()
// Stop connection info
s.log.Debugf("stop connection info for %s service", s.name())
cisStop()
// Stop service
s.stop(ctx, comm, lastCheckin, am == actionTeardown)
// Service is stopped, ignore checkins in order to avoid clogging the watch channel
ignoreCheckins = true
}
processTeardown := func(am actionMode, signed *component.Signed) {
s.log.Debugf("start teardown for %s service", s.name())
// Inject new signed
newComp, err := injectSigned(s.comp, signed)
if err != nil {
s.log.Errorf("failed to inject signed configuration for %s service, err: %v", s.name(), err)
}
s.log.Debugf("set teardown timer %v for %s service", teardownCheckinTimeout, s.name())
// Set teardown timeout timer
teardownCheckinTimer.Reset(teardownCheckinTimeout)
// Process newComp update
// This should send component update that should cause service checkin
s.log.Debugf("process new comp config for %s service", s.name())
s.updateCompConfig(newComp, comm)
}
onTeardown := func(as actionModeSigned) {
tamperProtection := features.TamperProtection()
s.log.Debugf("got teardown for %s service, tearingDown: %v, tamperProtection: %v", s.name(), tearingDown, tamperProtection)
// If tamper protection is disabled do the old behavior
if !tamperProtection {
onStop(as.actionMode)
return
}
if !tearingDown {
tearingDown = true
processTeardown(as.actionMode, as.signed)
}
}
for {
var err error
select {
case <-ctx.Done():
s.log.Debug("context is done. exiting.")
return ctx.Err()
case as := <-s.actionCh:
s.log.Debugf("got action %v for %s service", as.actionMode, s.name())
switch as.actionMode {
case actionStart:
// Initial state on start
lastCheckin = time.Time{}
ignoreCheckins = false
missedCheckins = 0
checkinTimer.Stop()
cisStop()
// Start connection info
if cis == nil {
var address string
// [gRPC:8.15] Uncomment after 8.14 when Endpoint is ready for local gRPC
// isLocal := s.isLocal
// [gRPC:8.15] Set connection info to local socket always for 8.14. Remove when Endpoint is ready for local gRPC
isLocal := true
address, err = getConnInfoServerAddress(runtime.GOOS, isLocal, s.comp.InputSpec.Spec.Service.CPort, s.comp.InputSpec.Spec.Service.CSocket)
if err != nil {
err = fmt.Errorf("failed to create connection info service address for %s: %w", s.name(), err)
break
}
s.log.Infof("Creating connection info server for %s service, address: %v", s.name(), address)
cis, err = newConnInfoServer(s.log, comm, address)
if err != nil {
err = fmt.Errorf("failed to start connection info service %s: %w", s.name(), err)
break
}
}
// Start service
err = s.start(ctx)
if err != nil {
cisStop()
break
}
// Start check-in timer
checkinTimer.Reset(s.checkinPeriod())
case actionStop:
onStop(as.actionMode)
case actionTeardown:
onTeardown(as)
}
if err != nil {
s.forceCompState(client.UnitStateFailed, err.Error())
}
case newComp := <-s.compCh:
s.processNewComp(newComp, comm)
case checkin := <-comm.CheckinObserved():
s.log.Debugf("got check-in for %s service, tearingDown: %v, ignoreCheckins: %v", s.name(), tearingDown, ignoreCheckins)
tamperProtection := features.TamperProtection()
if tamperProtection { // If tamper protection feature flag is enabled, new behavior
// Got check-in upon teardown update
// tearingDown can be set to true only if tamper protection feature is enabled
if tearingDown {
tearingDown = false
teardownCheckinTimer.Stop()
onStop(actionTeardown)
} else {
// Ignore checkins if the service was stopped by the action
if !ignoreCheckins {
// Only process checkin if in "tearing down" sequence
s.processCheckin(checkin, comm, &lastCheckin)
}
}
} else { // If tamper protection feature flag is disabled, old behavior
s.processCheckin(checkin, comm, &lastCheckin)
}
case <-checkinTimer.C:
s.checkStatus(s.checkinPeriod(), &lastCheckin, &missedCheckins)
checkinTimer.Reset(s.checkinPeriod())
case <-teardownCheckinTimer.C:
s.log.Debugf("got tearing down timeout for %s service", s.name())
// Teardown timed out
// tearingDown can be set to true only if tamper protection feature is enabled
if tearingDown {
tearingDown = false
onStop(actionTeardown)
}
}
}
}