pkg/component/runtime/service.go (490 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package runtime
import (
"context"
"errors"
"fmt"
"net/url"
"runtime"
"time"
"github.com/kardianos/service"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/features"
)
type actionModeSigned struct {
actionMode
signed *component.Signed
}
const (
defaultCheckServiceStatusInterval = 30 * time.Second // 30 seconds default for now, consistent with the command check-in interval
)
var (
// ErrOperationSpecUndefined error for missing specification.
ErrOperationSpecUndefined = errors.New("operation spec undefined")
// ErrInvalidServiceSpec error invalid service specification.
ErrInvalidServiceSpec = errors.New("invalid service spec")
)
type executeServiceCommandFunc func(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error
// serviceRuntime provides the command runtime for running a component as a service.
// an instance of serviceRuntime is not reused: after being stopped, it cannot be started again.
type serviceRuntime struct {
comp component.Component
log *logger.Logger
ch chan ComponentState
actionCh chan actionModeSigned
compCh chan component.Component
statusCh chan service.Status
state ComponentState
executeServiceCommandImpl executeServiceCommandFunc
isLocal bool // true if rpc is domain socket, or named pipe
}
// newServiceRuntime creates a new command runtime for the provided component.
func newServiceRuntime(comp component.Component, logger *logger.Logger, isLocal bool) (*serviceRuntime, error) {
if comp.InputSpec == nil {
return nil, errors.New("service runtime requires an input specification to be defined")
}
if comp.InputSpec.Spec.Service == nil {
return nil, errors.New("must have service defined in specification")
}
state := newComponentState(&comp)
s := &serviceRuntime{
comp: comp,
log: logger.Named("service_runtime"),
ch: make(chan ComponentState),
actionCh: make(chan actionModeSigned, 1),
compCh: make(chan component.Component, 1),
statusCh: make(chan service.Status),
state: state,
executeServiceCommandImpl: executeServiceCommand,
isLocal: isLocal,
}
// Set initial state as STOPPED
s.state.compState(client.UnitStateStopped, fmt.Sprintf("Stopped: %s service", s.name()))
return s, nil
}
// Run starts the runtime for the component.
//
// Called by Manager inside a goroutine. Run does not return until the passed in context is done. Run is always
// called before any of the other methods in the interface and once the context is done none of those methods should
// ever be called again.
//
// ==================================================================================================
//
// Updated teardown sequence:
//
// 1. if tearing down already (tearingDown == true), continue with stop/uninstall
//
// 2. if not tearing down already (tearingDown == false)
// a. inject new signed payload for component
// b. reset check-in timer
// c. set teardown timeout timer
// d. set tearingDown=true
// c. send component update (with new signed payload)
// d. await for check-in after update or teardown timeout
// e. upon receiving check-in if (tearingDown == true), send teardown action to itself
//
// ==================================================================================================
func (s *serviceRuntime) Run(ctx context.Context, comm Communicator) (err error) {
// The teardownCheckingTimeout is set to the same amount as the checkin timeout for now
teardownCheckinTimeout := s.checkinPeriod()
teardownCheckinTimer := time.NewTimer(teardownCheckinTimeout)
defer teardownCheckinTimer.Stop()
// Stop teardown checkin timeout timer initially
teardownCheckinTimer.Stop()
checkinTimer := time.NewTimer(s.checkinPeriod())
defer checkinTimer.Stop()
// Stop the check-ins timer initially
checkinTimer.Stop()
var (
cis *connInfoServer
lastCheckin time.Time
missedCheckins int
tearingDown bool
// flag that signals if we are already stopping
stopping bool
ignoreCheckins bool
)
cisStop := func() {
if cis != nil {
_ = cis.stop()
cis = nil
}
}
defer cisStop()
onStop := func(am actionMode) {
if stopping {
s.log.Debugf("service %s is already stopping: skipping...", s.name())
return
}
// the flag is set once and never reset since the serviceRuntime object
// is not supposed to be reused once it's stopping
stopping = true
// Stop check-in timer
s.log.Debugf("stop check-in timer for %s service", s.name())
checkinTimer.Stop()
// Stop connection info
s.log.Debugf("stop connection info for %s service", s.name())
cisStop()
// Stop service
s.stop(ctx, comm, lastCheckin, am == actionTeardown)
// Service is stopped, ignore checkins in order to avoid clogging the watch channel
ignoreCheckins = true
}
processTeardown := func(am actionMode, signed *component.Signed) {
s.log.Debugf("start teardown for %s service", s.name())
// Inject new signed
newComp, err := injectSigned(s.comp, signed)
if err != nil {
s.log.Errorf("failed to inject signed configuration for %s service, err: %v", s.name(), err)
}
s.log.Debugf("set teardown timer %v for %s service", teardownCheckinTimeout, s.name())
// Set teardown timeout timer
teardownCheckinTimer.Reset(teardownCheckinTimeout)
// Process newComp update
// This should send component update that should cause service checkin
s.log.Debugf("process new comp config for %s service", s.name())
s.updateCompConfig(newComp, comm)
}
onTeardown := func(as actionModeSigned) {
tamperProtection := features.TamperProtection()
s.log.Debugf("got teardown for %s service, tearingDown: %v, tamperProtection: %v", s.name(), tearingDown, tamperProtection)
// If tamper protection is disabled do the old behavior
if !tamperProtection {
onStop(as.actionMode)
return
}
if !tearingDown {
tearingDown = true
processTeardown(as.actionMode, as.signed)
}
}
for {
var err error
select {
case <-ctx.Done():
s.log.Debug("context is done. exiting.")
return ctx.Err()
case as := <-s.actionCh:
s.log.Debugf("got action %v for %s service", as.actionMode, s.name())
switch as.actionMode {
case actionStart:
// Initial state on start
lastCheckin = time.Time{}
ignoreCheckins = false
missedCheckins = 0
checkinTimer.Stop()
cisStop()
// Start connection info
if cis == nil {
var address string
// [gRPC:8.15] Uncomment after 8.14 when Endpoint is ready for local gRPC
// isLocal := s.isLocal
// [gRPC:8.15] Set connection info to local socket always for 8.14. Remove when Endpoint is ready for local gRPC
isLocal := true
address, err = getConnInfoServerAddress(runtime.GOOS, isLocal, s.comp.InputSpec.Spec.Service.CPort, s.comp.InputSpec.Spec.Service.CSocket)
if err != nil {
err = fmt.Errorf("failed to create connection info service address for %s: %w", s.name(), err)
break
}
s.log.Infof("Creating connection info server for %s service, address: %v", s.name(), address)
cis, err = newConnInfoServer(s.log, comm, address)
if err != nil {
err = fmt.Errorf("failed to start connection info service %s: %w", s.name(), err)
break
}
}
// Start service
err = s.start(ctx)
if err != nil {
cisStop()
break
}
// Start check-in timer
checkinTimer.Reset(s.checkinPeriod())
case actionStop:
onStop(as.actionMode)
case actionTeardown:
onTeardown(as)
}
if err != nil {
s.forceCompState(client.UnitStateFailed, err.Error())
}
case newComp := <-s.compCh:
s.processNewComp(newComp, comm)
case checkin := <-comm.CheckinObserved():
s.log.Debugf("got check-in for %s service, tearingDown: %v, ignoreCheckins: %v", s.name(), tearingDown, ignoreCheckins)
tamperProtection := features.TamperProtection()
if tamperProtection { // If tamper protection feature flag is enabled, new behavior
// Got check-in upon teardown update
// tearingDown can be set to true only if tamper protection feature is enabled
if tearingDown {
tearingDown = false
teardownCheckinTimer.Stop()
onStop(actionTeardown)
} else {
// Ignore checkins if the service was stopped by the action
if !ignoreCheckins {
// Only process checkin if in "tearing down" sequence
s.processCheckin(checkin, comm, &lastCheckin)
}
}
} else { // If tamper protection feature flag is disabled, old behavior
s.processCheckin(checkin, comm, &lastCheckin)
}
case <-checkinTimer.C:
s.checkStatus(s.checkinPeriod(), &lastCheckin, &missedCheckins)
checkinTimer.Reset(s.checkinPeriod())
case <-teardownCheckinTimer.C:
s.log.Debugf("got tearing down timeout for %s service", s.name())
// Teardown timed out
// tearingDown can be set to true only if tamper protection feature is enabled
if tearingDown {
tearingDown = false
onStop(actionTeardown)
}
}
}
}
var errEmptySocketValue = errors.New("empty socket value")
func getConnInfoServerAddress(os string, isLocal bool, port int, socket string) (string, error) {
if isLocal {
// Return an empty string if socket string is empty
// The connectionInfo server fails on empty address
if socket == "" {
return "", errEmptySocketValue
}
u := url.URL{}
u.Path = "/"
if os == "windows" {
u.Scheme = "npipe"
return u.JoinPath("/", socket).String(), nil
}
u.Scheme = "unix"
// Use the path that is relative to path.top which corresponds to the agent binary directory in all installation types
return u.JoinPath(paths.Top(), socket).String(), nil
}
return fmt.Sprintf("127.0.0.1:%d", port), nil
}
func injectSigned(comp component.Component, signed *component.Signed) (component.Component, error) {
if signed == nil {
return comp, nil
}
const signedKey = "signed"
for i, unit := range comp.Units {
if unit.Type == client.UnitTypeInput {
unitCfgMap := unit.Config.Source.AsMap()
unitCfgMap[signedKey] = map[string]interface{}{
"data": signed.Data,
"signature": signed.Signature,
}
unitCfg, err := component.ExpectedConfig(unitCfgMap)
if err != nil {
return comp, err
}
unit.Config = unitCfg
comp.Units[i] = unit
}
}
return comp, nil
}
func (s *serviceRuntime) start(ctx context.Context) (err error) {
name := s.name()
// Set state to starting
s.forceCompState(client.UnitStateStarting, fmt.Sprintf("Starting: %s service runtime", name))
// Call the check command of the service
s.log.Infof("check if %s service is installed", name)
err = s.check(ctx)
s.log.Infof("after check if %s service is installed, err: %v", name, err)
if err != nil {
// Check failed, call the install command of the service
s.log.Infof("failed check %s service: %v, try install", name, err)
err = s.install(ctx)
if err != nil {
return fmt.Errorf("failed install %s service: %w", name, err)
}
}
// The service should start on it's own, expecting check-ins
return nil
}
func (s *serviceRuntime) stop(ctx context.Context, comm Communicator, lastCheckin time.Time, teardown bool) {
name := s.name()
s.log.Infof("stopping %s service runtime", name)
checkedIn := !lastCheckin.IsZero()
if teardown {
// If checked in before, send STOPPING
if s.isRunning() {
// If never checked in await for the checkin with the timeout
if !checkedIn {
timeout := s.checkinPeriod()
s.log.Infof("%s service had never checked in, await for check-in for %v", name, timeout)
checkedIn = s.awaitCheckin(ctx, comm, timeout)
}
// Received check in send STOPPING
if checkedIn {
s.log.Infof("%s service has checked in, send stopping state to service", name)
s.state.forceExpectedState(client.UnitStateStopping)
comm.CheckinExpected(s.state.toCheckinExpected(), nil)
} else {
s.log.Infof("%s service had never checked in, proceed to uninstall", name)
}
}
s.log.Infof("uninstall %s service", name)
err := s.uninstall(ctx)
if err != nil {
s.log.Errorf("failed %s service uninstall, err: %v", name, err)
}
}
// Force component stopped state
s.log.Debugf("set %s service runtime to stopped state", name)
s.forceCompState(client.UnitStateStopped, fmt.Sprintf("Stopped: %s service runtime", name))
}
// awaitCheckin awaits checkin with timeout.
func (s *serviceRuntime) awaitCheckin(ctx context.Context, comm Communicator, timeout time.Duration) bool {
name := s.name()
t := time.NewTimer(timeout)
defer t.Stop()
for {
select {
case <-ctx.Done():
// stop cancelled
s.log.Debugf("stopping %s service, cancelled", name)
return false
case <-t.C:
// stop timed out
s.log.Debugf("stopping %s service, timed out", name)
return false
case <-comm.CheckinObserved():
return true
}
}
}
// processNewComp Proccesses component configuration change.
// Sends the configuration to the service with expectation of check-in from service after.
// Sends the observed change change back to the Agent components runtime management.
func (s *serviceRuntime) processNewComp(newComp component.Component, comm Communicator) {
s.log.Debugf("observed component update for %s service", s.name())
sendExpected := s.state.syncExpected(&newComp)
changed := s.state.syncUnits(&newComp)
if sendExpected || s.state.unsettled() {
comm.CheckinExpected(s.state.toCheckinExpected(), nil)
}
if changed {
s.sendObserved()
}
}
// updateCompConfig Updates the component configuration.
// Sends the configuration to the service with expectation of check-in from service after.
func (s *serviceRuntime) updateCompConfig(newComp component.Component, comm Communicator) {
s.log.Debugf("update component configuration for %s service", s.name())
sendExpected := s.state.syncExpected(&newComp)
if sendExpected || s.state.unsettled() {
comm.CheckinExpected(s.state.toCheckinExpected(), nil)
}
}
func (s *serviceRuntime) processCheckin(checkin *proto.CheckinObserved, comm Communicator, lastCheckin *time.Time) {
name := s.name()
s.log.Debugf("observed check-in for %s service: %v", name, checkin)
sendExpected := false
changed := false
if s.state.State == client.UnitStateStarting {
// first observation after start, set component to healthy
s.state.State = client.UnitStateHealthy
s.state.Message = fmt.Sprintf("Healthy: communicating with %s service", name)
changed = true
}
if !s.isRunning() {
return
}
if lastCheckin.IsZero() {
// first check-in
sendExpected = true
}
// Warning lastCheckin must contain a monotonic clock.
// Functions like Local(), UTC(), Round(), AddDate(),
// etc. remove the monotonic clock. See
// https://pkg.go.dev/time
*lastCheckin = time.Now()
if s.state.syncCheckin(checkin) {
changed = true
}
if s.state.unsettled() {
sendExpected = true
}
if sendExpected {
comm.CheckinExpected(s.state.toCheckinExpected(), checkin)
}
if changed {
s.sendObserved()
}
if s.state.cleanupStopped() {
s.sendObserved()
}
}
// isRunning returns true is the service is running
func (s *serviceRuntime) isRunning() bool {
return s.state.State != client.UnitStateStopping &&
s.state.State != client.UnitStateStopped
}
// checkStatus checks check-ins state, called on timer
func (s *serviceRuntime) checkStatus(checkinPeriod time.Duration, lastCheckin *time.Time, missedCheckins *int) {
if s.isRunning() {
// Warning now must contain a monotonic clock.
// Functions like Local(), UTC(), Round(), AddDate(),
// etc. remove the monotonic clock. See
// https://pkg.go.dev/time
now := time.Now()
if lastCheckin.IsZero() {
// never checked-in
*missedCheckins++
} else if now.Sub(*lastCheckin) > checkinPeriod {
// missed check-in during required period
*missedCheckins++
} else if now.Sub(*lastCheckin) <= checkinPeriod {
*missedCheckins = 0
}
if *missedCheckins == 0 {
s.compState(client.UnitStateHealthy, *missedCheckins)
} else if *missedCheckins > 0 && *missedCheckins < maxCheckinMisses {
s.compState(client.UnitStateDegraded, *missedCheckins)
} else if *missedCheckins >= maxCheckinMisses {
// something is wrong; the service should be checking in
msg := fmt.Sprintf("Failed: %s service missed %d check-ins", s.name(), maxCheckinMisses)
s.forceCompState(client.UnitStateFailed, msg)
}
}
}
func (s *serviceRuntime) checkinPeriod() time.Duration {
checkinPeriod := s.comp.InputSpec.Spec.Service.Timeouts.Checkin
if checkinPeriod == 0 {
checkinPeriod = defaultCheckServiceStatusInterval
}
return checkinPeriod
}
// Watch returns a channel to watch for component state changes.
//
// A new state is sent anytime the state for a unit or the whole component changes.
func (s *serviceRuntime) Watch() <-chan ComponentState {
return s.ch
}
// Start starts the service.
//
// Non-blocking and never returns an error.
func (s *serviceRuntime) Start() error {
// clear channel so it's the latest action
select {
case <-s.actionCh:
default:
}
s.actionCh <- actionModeSigned{actionStart, nil}
return nil
}
// Update updates the currComp runtime with a new-revision for the component definition.
//
// Non-blocking and never returns an error.
func (s *serviceRuntime) Update(comp component.Component) error {
// clear channel so it's the latest component
select {
case <-s.compCh:
default:
}
s.compCh <- comp
return nil
}
// Stop stops the service.
//
// Non-blocking and never returns an error.
func (s *serviceRuntime) Stop() error {
// clear channel so it's the latest action
select {
case <-s.actionCh:
default:
}
s.actionCh <- actionModeSigned{actionStop, nil}
return nil
}
// Teardown stop and uninstall the service.
//
// Non-blocking and never returns an error.
func (s *serviceRuntime) Teardown(signed *component.Signed) error {
// clear channel so it's the latest action
select {
case <-s.actionCh:
default:
}
s.actionCh <- actionModeSigned{actionTeardown, signed}
return nil
}
func (s *serviceRuntime) forceCompState(state client.UnitState, msg string) {
if s.state.forceState(state, msg) {
s.sendObserved()
}
}
func (s *serviceRuntime) sendObserved() {
s.ch <- s.state.Copy()
}
func (s *serviceRuntime) compState(state client.UnitState, missedCheckins int) {
name := s.name()
msg := stateUnknownMessage
if state == client.UnitStateHealthy {
msg = fmt.Sprintf("Healthy: communicating with %s service", name)
} else if state == client.UnitStateDegraded {
if missedCheckins == 1 {
msg = fmt.Sprintf("Degraded: %s service missed 1 check-in", name)
} else {
msg = fmt.Sprintf("Degraded: %s missed %d check-ins", name, missedCheckins)
}
}
if s.state.compState(state, msg) {
s.sendObserved()
}
}
func (s *serviceRuntime) name() string {
return s.comp.InputSpec.Spec.Name
}
// check executes the service check command
func (s *serviceRuntime) check(ctx context.Context) error {
if s.comp.InputSpec.Spec.Service.Operations.Check == nil {
s.log.Errorf("missing check spec for %s service", s.comp.BinaryName())
return ErrOperationSpecUndefined
}
s.log.Debugf("check if the %s is installed", s.comp.BinaryName())
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Check)
}
// install executes the service install command
func (s *serviceRuntime) install(ctx context.Context) error {
if s.comp.InputSpec.Spec.Service.Operations.Install == nil {
s.log.Errorf("missing install spec for %s service", s.comp.BinaryName())
return ErrOperationSpecUndefined
}
s.log.Debugf("install %s service", s.comp.BinaryName())
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Install)
}
// uninstall executes the service uninstall command
func (s *serviceRuntime) uninstall(ctx context.Context) error {
// Always retry for internal attempts to uninstall, because they are an attempt to converge the agent's current state
// with its desired state based on the agent policy.
return uninstallService(ctx, s.log, s.comp, "", s.executeServiceCommandImpl)
}
// UninstallService uninstalls the service. When shouldRetry is true the uninstall command will be retried until it succeeds.
func UninstallService(ctx context.Context, log *logger.Logger, comp component.Component, uninstallToken string) error {
return uninstallService(ctx, log, comp, uninstallToken, executeServiceCommand)
}
//nolint:gosec // was false flagged as hardcoded credentials by linter. it is not.
const uninstallTokenArg = "--uninstall-token"
// resolveUninstallTokenArg Resolves the uninstall token parameter.
// If the uninstall spec arguments contains the --uninstall-token then
// 1. Remove the argument if the value of uninstallToken is empty
// or
// 2. Inject the value of uninstallToken after the --uninstall-token argument
//
// If args do not contain "--uninstall-token", older endpoint spec, do nothing
func resolveUninstallTokenArg(uninstallSpec *component.ServiceOperationsCommandSpec, uninstallToken string) *component.ServiceOperationsCommandSpec {
if uninstallSpec == nil {
return nil
}
spec := *uninstallSpec
for i, arg := range spec.Args {
if arg == uninstallTokenArg {
if uninstallToken == "" { // Remove --uninstall-token argument if the token is empty
spec.Args = append(spec.Args[:i], spec.Args[i+1:]...)
} else { // Inject token value after --uninstall-token argument
args := append(spec.Args[:i+1], uninstallToken)
spec.Args = append(args, spec.Args[i+1:]...)
}
break
}
}
return &spec
}
func uninstallService(ctx context.Context, log *logger.Logger, comp component.Component, uninstallToken string, executeServiceCommandImpl executeServiceCommandFunc) error {
if comp.InputSpec.Spec.Service.Operations.Uninstall == nil {
log.Errorf("missing uninstall spec for %s service", comp.BinaryName())
return ErrOperationSpecUndefined
}
// If tamper protection feature flag is disabled, force uninstallToken value to empty,
// this will remove the --uninstall-token command arg
if !features.TamperProtection() {
uninstallToken = ""
}
uninstallSpec := resolveUninstallTokenArg(comp.InputSpec.Spec.Service.Operations.Uninstall, uninstallToken)
log.Debugf("uninstall %s service", comp.BinaryName())
return executeServiceCommandImpl(ctx, log, comp.InputSpec.BinaryPath, uninstallSpec)
}