pkg/component/runtime/manager.go (815 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package runtime import ( "context" "crypto/tls" "crypto/x509" "encoding/json" "errors" "fmt" "net" "net/url" "path" "strconv" "strings" "sync" "time" "github.com/gofrs/uuid/v5" "go.elastic.co/apm/module/apmgrpc/v2" "go.elastic.co/apm/v2" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/core/authority" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/control" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/ipc" "github.com/elastic/elastic-agent/pkg/utils" ) const ( // initialCheckinTimeout is the maximum amount of wait time from initial check-in stream to // getting the first check-in observed state. initialCheckinTimeout = 5 * time.Second // maxCheckinMisses is the maximum number of check-in misses a component can miss before it is killed // and restarted. maxCheckinMisses = 3 // diagnosticTimeoutCPU is the maximum amount of time to wait for a diagnostic response from a unit while collecting CPU profiles diagnosticTimeoutCPU = time.Minute // diagnosticTimeout is the maximum amount of time to wait for a diagnostic response from a unit diagnosticTimeout = time.Second * 20 // stopCheckRetryPeriod is a idle time between checks for component stopped state stopCheckRetryPeriod = 200 * time.Millisecond ) var ( // ErrNoUnit is returned when manager is not controlling this unit. ErrNoUnit = errors.New("no unit under control of this manager") // ErrNoComponent is returned when manager is not controlling this component ErrNoComponent = errors.New("no component under control of this manager") ) // ComponentComponentState provides a structure to map a component to current component state. type ComponentComponentState struct { Component component.Component `yaml:"component"` State ComponentState `yaml:"state"` LegacyPID string `yaml:"-"` // To propagate PID for the /processes, and yes, it was a string } // ComponentUnitDiagnosticRequest used to request diagnostics from specific unit. type ComponentUnitDiagnosticRequest struct { Component component.Component Unit component.Unit } // ComponentUnitDiagnostic provides a structure to map a component/unit to diagnostic results. type ComponentUnitDiagnostic struct { Component component.Component Unit component.Unit Results []*proto.ActionDiagnosticUnitResult Err error } // ComponentDiagnostic provides a structure to map a component to a diagnostic result. type ComponentDiagnostic struct { Component component.Component Results []*proto.ActionDiagnosticUnitResult Err error } // Manager for the entire runtime of operating components. type Manager struct { proto.UnimplementedElasticAgentServer logger *logger.Logger baseLogger *logger.Logger ca *authority.CertificateAuthority listenAddr string listenPort int isLocal bool agentInfo info.Agent tracer *apm.Tracer monitor MonitoringManager grpcConfig *configuration.GRPCConfig // Set when the RPC server is ready to receive requests, for use by tests. serverReady chan struct{} // updateChan forwards component model updates from the public Update method // to the internal run loop. updateChan chan component.Model // Next component model update that will be applied, in case we get one // while a previous update is still in progress. If we get more than one, // keep only the most recent. // Only access from the main runtime manager goroutine. nextUpdate *component.Model // currentMx protects access to the current map only currentMx sync.RWMutex current map[string]*componentRuntimeState subMx sync.RWMutex subscriptions map[string][]*Subscription subAllMx sync.RWMutex subscribeAll []*SubscriptionAll errCh chan error // doneChan is closed when Manager is shutting down to signal that any // pending requests should be canceled. doneChan chan struct{} } // NewManager creates a new manager. func NewManager( logger, baseLogger *logger.Logger, agentInfo info.Agent, tracer *apm.Tracer, monitor MonitoringManager, grpcConfig *configuration.GRPCConfig, ) (*Manager, error) { ca, err := authority.NewCA() if err != nil { return nil, err } if agentInfo == nil { return nil, errors.New("agentInfo cannot be nil") } controlAddress := control.Address() // [gRPC:8.15] For 8.14 this always returns local TCP address, until Endpoint is modified to support domain sockets gRPC listenAddr, err := deriveCommsAddress(controlAddress, grpcConfig) if err != nil { return nil, fmt.Errorf("failed to derive comms GRPC: %w", err) } logger.With("address", listenAddr).Infof("GRPC comms socket listening at %s", listenAddr) m := &Manager{ logger: logger, baseLogger: baseLogger, ca: ca, listenAddr: listenAddr, isLocal: ipc.IsLocal(listenAddr), agentInfo: agentInfo, tracer: tracer, current: make(map[string]*componentRuntimeState), subscriptions: make(map[string][]*Subscription), updateChan: make(chan component.Model), errCh: make(chan error), monitor: monitor, grpcConfig: grpcConfig, serverReady: make(chan struct{}), doneChan: make(chan struct{}), } return m, nil } // Run runs the manager's grpc server, implementing the // calls CheckinV2 and Actions (with a legacy handler for Checkin // that returns an error). // // Called on its own goroutine from Coordinator.runner. // // Blocks until the context is done. func (m *Manager) Run(ctx context.Context) error { var ( listener net.Listener err error server *grpc.Server wgServer sync.WaitGroup ) if m.isLocal { listener, err = ipc.CreateListener(m.logger, m.listenAddr) } else { listener, err = net.Listen("tcp", m.listenAddr) } if err != nil { return fmt.Errorf("error starting tcp listener for runtime manager: %w", err) } if m.isLocal { defer ipc.CleanupListener(m.logger, m.listenAddr) } else { m.listenPort = listener.Addr().(*net.TCPAddr).Port } certPool := x509.NewCertPool() if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok { return errors.New("failed to append root CA") } creds := credentials.NewTLS(&tls.Config{ ClientAuth: tls.RequireAndVerifyClientCert, ClientCAs: certPool, GetCertificate: m.getCertificate, MinVersion: tls.VersionTLS12, }) m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize) if m.tracer != nil { apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer)) server = grpc.NewServer( grpc.UnaryInterceptor(apmInterceptor), grpc.Creds(creds), grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), ) } else { server = grpc.NewServer( grpc.Creds(creds), grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), ) } proto.RegisterElasticAgentServer(server, m) // start serving GRPC connections wgServer.Add(1) go func() { defer wgServer.Done() m.serverLoop(ctx, listener, server) }() // Start the run loop, which continues on the main goroutine // until the context is canceled. m.runLoop(ctx) // Notify components to shutdown and wait for their response m.shutdown() // Close the rpc listener and wait for serverLoop to return listener.Close() wgServer.Wait() // Cancel any remaining connections server.Stop() return ctx.Err() } // The main run loop for the runtime manager, whose responsibilities are: // - Accept component model updates from the Coordinator // - Apply those updates safely without ever blocking, because a block here // propagates to a block in the Coordinator // - Close doneChan when the loop ends, so the Coordinator knows not to send // any more updates func (m *Manager) runLoop(ctx context.Context) { var updateInProgress bool updateDoneChan := make(chan struct{}) LOOP: for ctx.Err() == nil { select { case <-ctx.Done(): break LOOP case model := <-m.updateChan: // We got a new component model from m.Update(), mark it as the // next update to apply, overwriting any previous pending value. m.nextUpdate = &model case <-updateDoneChan: // An update call has finished, we can initiate another when available. updateInProgress = false } // After each select call, check if there's a pending update that // can be applied. if m.nextUpdate != nil && !updateInProgress { // There is a component model update available, apply it. go func(model component.Model) { // Run the update with tearDown set to true since this is coming // from a user-initiated policy update err := m.update(model, true) // When update is done, send its result back to the coordinator, // unless we're shutting down. select { case m.errCh <- err: case <-ctx.Done(): } // Signal the runtime manager that we're finished. Note that // we don't select on ctx.Done() in this case because the runtime // manager always reads the results of an update once initiated, // even if it is shutting down. updateDoneChan <- struct{}{} }(*m.nextUpdate) updateInProgress = true m.nextUpdate = nil } } // Signal that the run loop is ended to unblock any incoming messages. // We need to do this before waiting on the final update result, otherwise // it might be stuck trying to send the result to errCh. close(m.doneChan) if updateInProgress { // Wait for the existing update to finish before shutting down, // otherwise the new update call closing everything will // conflict. <-updateDoneChan } } func (m *Manager) serverLoop(ctx context.Context, listener net.Listener, server *grpc.Server) { close(m.serverReady) for ctx.Err() == nil { err := server.Serve(listener) if err != nil && ctx.Err() == nil { // Only log an error if we aren't shutting down, otherwise we'll spam // the logs with "use of closed network connection" for a connection that // was closed on purpose. m.logger.Errorf("control protocol listener failed: %s", err) } } } // Errors returns channel that errors are reported on. func (m *Manager) Errors() <-chan error { return m.errCh } // Update forwards a new component model to Manager's run loop. // When it has been processed, a result will be sent on Manager's // error channel. // Called from the main Coordinator goroutine. // // If calling from a test, you should read from errCh afterwards to avoid // blocking Manager's main loop. func (m *Manager) Update(model component.Model) { select { case m.updateChan <- model: case <-m.doneChan: // Manager is shutting down, ignore the update } } // PerformAction executes an action on a unit. func (m *Manager) PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) { id, err := uuid.NewV4() if err != nil { return nil, err } paramBytes := []byte("{}") if params != nil { paramBytes, err = json.Marshal(params) if err != nil { return nil, err } } runtime := m.getRuntimeFromUnit(comp, unit) if runtime == nil { return nil, ErrNoUnit } req := &proto.ActionRequest{ Id: id.String(), Name: name, Params: paramBytes, UnitId: unit.ID, UnitType: proto.UnitType(unit.Type), Type: proto.ActionRequest_CUSTOM, } res, err := runtime.performAction(ctx, req) if err != nil { return nil, err } var respBody map[string]interface{} if res.Status == proto.ActionResponse_FAILED { if res.Result != nil { err = json.Unmarshal(res.Result, &respBody) if err != nil { return nil, err } errMsgT, ok := respBody["error"] if ok { errMsg, ok := errMsgT.(string) if ok { return nil, errors.New(errMsg) } } } return nil, errors.New("generic action failure") } if res.Result != nil { err = json.Unmarshal(res.Result, &respBody) if err != nil { return nil, err } } return respBody, nil } // PerformComponentDiagnostics executes the diagnostic action for the given components. If no components are provided then // it performs diagnostics for all running components. func (m *Manager) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]ComponentDiagnostic, error) { if len(req) == 0 { if len(req) == 0 { m.currentMx.RLock() for _, runtime := range m.current { currComp := runtime.getCurrent() req = append(req, currComp) } m.currentMx.RUnlock() } } resp := []ComponentDiagnostic{} diagnosticCount := len(req) respChan := make(chan ComponentDiagnostic, diagnosticCount) for diag := 0; diag < diagnosticCount; diag++ { // transform the additional metrics field into JSON params params := client.DiagnosticParams{} if len(additionalMetrics) > 0 { for _, param := range additionalMetrics { params.AdditionalMetrics = append(params.AdditionalMetrics, param.String()) } } // perform diagnostics in parallel; if we have a CPU pprof request, it'll take 30 seconds each. go func(iter int) { diagResponse, err := m.performDiagAction(ctx, req[iter], component.Unit{}, proto.ActionRequest_COMPONENT, params) respStruct := ComponentDiagnostic{ Component: req[iter], Err: err, Results: diagResponse, } respChan <- respStruct }(diag) } // performDiagAction will have timeouts at various points, // but for the sake of paranoia, create our own timeout collectTimeout, cancel := context.WithTimeout(ctx, time.Minute*2) defer cancel() for res := 0; res < diagnosticCount; res++ { select { case <-collectTimeout.Done(): return nil, fmt.Errorf("got context done waiting for diagnostics") case data := <-respChan: resp = append(resp, data) } } return resp, nil } // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then // it performs diagnostics for all current units. func (m *Manager) PerformDiagnostics(ctx context.Context, req ...ComponentUnitDiagnosticRequest) []ComponentUnitDiagnostic { // build results from units var results []ComponentUnitDiagnostic if len(req) > 0 { for _, q := range req { r := m.getRuntimeFromUnit(q.Component, q.Unit) if r == nil { results = append(results, ComponentUnitDiagnostic{ Unit: q.Unit, Err: ErrNoUnit, }) } else { results = append(results, ComponentUnitDiagnostic{ Component: r.getCurrent(), Unit: q.Unit, }) } } } else { m.currentMx.RLock() for _, r := range m.current { currComp := r.getCurrent() for _, u := range currComp.Units { var err error if currComp.Err != nil { err = currComp.Err } else if u.Err != nil { err = u.Err } if err != nil { results = append(results, ComponentUnitDiagnostic{ Component: currComp, Unit: u, Err: err, }) } else { results = append(results, ComponentUnitDiagnostic{ Component: currComp, Unit: u, }) } } } m.currentMx.RUnlock() } for i, r := range results { if r.Err != nil { // already in error don't perform diagnostics continue } diag, err := m.performDiagAction(ctx, r.Component, r.Unit, proto.ActionRequest_UNIT, client.DiagnosticParams{}) if err != nil { r.Err = err } else { r.Results = diag } results[i] = r } return results } // Subscribe to changes in a component. // // Allows a component without that ID to exist. Once a component starts matching that ID then changes will start to // be provided over the channel. Cancelling the context results in the subscription being unsubscribed. // // Note: Not reading from a subscription channel will cause the Manager to block. func (m *Manager) Subscribe(ctx context.Context, componentID string) *Subscription { sub := newSubscription(ctx) // add latestState to channel m.currentMx.RLock() comp, ok := m.current[componentID] m.currentMx.RUnlock() if ok { latestState := comp.getLatest() go func() { select { case <-ctx.Done(): case sub.ch <- latestState: } }() } // add subscription for future changes m.subMx.Lock() m.subscriptions[componentID] = append(m.subscriptions[componentID], sub) m.subMx.Unlock() go func() { <-ctx.Done() // unsubscribe m.subMx.Lock() defer m.subMx.Unlock() for key, subs := range m.subscriptions { for i, s := range subs { if sub == s { m.subscriptions[key] = append(m.subscriptions[key][:i], m.subscriptions[key][i+1:]...) return } } } }() return sub } // SubscribeAll subscribes to all changes in all components. // // This provides the current state for existing components at the time of first subscription. Cancelling the context // results in the subscription being unsubscribed. // // Note: Not reading from a subscription channel will cause the Manager to block. func (m *Manager) SubscribeAll(ctx context.Context) *SubscriptionAll { sub := newSubscriptionAll(ctx) // add the latest states m.currentMx.RLock() latest := make([]ComponentComponentState, 0, len(m.current)) for _, comp := range m.current { latest = append(latest, ComponentComponentState{Component: comp.getCurrent(), State: comp.getLatest()}) } m.currentMx.RUnlock() if len(latest) > 0 { go func() { for _, l := range latest { select { case <-ctx.Done(): return case sub.ch <- l: } } }() } // add subscription for future changes m.subAllMx.Lock() m.subscribeAll = append(m.subscribeAll, sub) m.subAllMx.Unlock() go func() { <-ctx.Done() // unsubscribe m.subAllMx.Lock() defer m.subAllMx.Unlock() for i, s := range m.subscribeAll { if sub == s { m.subscribeAll = append(m.subscribeAll[:i], m.subscribeAll[i+1:]...) return } } }() return sub } // Checkin is called by v1 sub-processes and has been removed. func (m *Manager) Checkin(_ proto.ElasticAgent_CheckinServer) error { return status.Error(codes.Unavailable, "removed; upgrade to V2") } // CheckinV2 is the new v2 communication for components. func (m *Manager) CheckinV2(server proto.ElasticAgent_CheckinV2Server) error { initCheckinChan := make(chan *proto.CheckinObserved) go func() { // this goroutine will not be leaked, because when the main function // returns it will close the connection. that will cause this // function to return. observed, err := server.Recv() if err != nil { close(initCheckinChan) return } initCheckinChan <- observed }() var ok bool var initCheckin *proto.CheckinObserved t := time.NewTimer(initialCheckinTimeout) select { case initCheckin, ok = <-initCheckinChan: t.Stop() case <-t.C: // close connection return status.Error(codes.DeadlineExceeded, "never sent initial observed message") } if !ok { // close connection return nil } runtime := m.getRuntimeFromToken(initCheckin.Token) if runtime == nil { // no component runtime with token; close connection return status.Error(codes.PermissionDenied, "invalid token") } // enable chunking with the communicator if the initial checkin // states that it supports chunking runtime.comm.chunkingAllowed = false for _, support := range initCheckin.Supports { if support == proto.ConnectionSupports_CheckinChunking { runtime.comm.chunkingAllowed = true break } } if runtime.comm.chunkingAllowed { if m.grpcConfig.CheckinChunkingDisabled { // chunking explicitly disabled runtime.comm.chunkingAllowed = false runtime.logger.Warn("control checkin v2 protocol supports chunking, but chunking was explicitly disabled") } else { runtime.logger.Info("control checkin v2 protocol has chunking enabled") } } return runtime.comm.checkin(server, initCheckin) } // Actions is the actions stream used to broker actions between Elastic Agent and components. func (m *Manager) Actions(server proto.ElasticAgent_ActionsServer) error { initRespChan := make(chan *proto.ActionResponse) go func() { // go func will not be leaked, because when the main function // returns it will close the connection. that will cause this // function to return. observed, err := server.Recv() if err != nil { close(initRespChan) return } initRespChan <- observed }() var ok bool var initResp *proto.ActionResponse t := time.NewTimer(initialCheckinTimeout) select { case initResp, ok = <-initRespChan: t.Stop() case <-t.C: // close connection m.logger.Debug("actions stream never sent initial response message; closing connection") return status.Error(codes.DeadlineExceeded, "never sent initial response message") } if !ok { // close connection return nil } if initResp.Id != client.ActionResponseInitID { // close connection m.logger.Debug("actions stream first response message must be an init message; closing connection") return status.Error(codes.InvalidArgument, "initial response must be an init message") } runtime := m.getRuntimeFromToken(initResp.Token) if runtime == nil { // no component runtime with token; close connection m.logger.Debug("actions stream sent an invalid token; closing connection") return status.Error(codes.PermissionDenied, "invalid token") } return runtime.comm.actions(server) } // update updates the current state of the running components. // It is only called by the main runtime manager goroutine in Manager.Run. // // This returns as soon as possible, work is performed in the background. func (m *Manager) update(model component.Model, teardown bool) error { touched := make(map[string]bool) newComponents := make([]component.Component, 0, len(model.Components)) for _, comp := range model.Components { touched[comp.ID] = true m.currentMx.RLock() existing, ok := m.current[comp.ID] m.currentMx.RUnlock() if ok { // existing component; send runtime updated value existing.setCurrent(comp) if err := existing.runtime.Update(comp); err != nil { return fmt.Errorf("failed to update component %s: %w", comp.ID, err) } continue } newComponents = append(newComponents, comp) } var stop []*componentRuntimeState m.currentMx.RLock() for id, existing := range m.current { // skip if already touched (meaning it still existing) if _, done := touched[id]; done { continue } // component was removed (time to clean it up) stop = append(stop, existing) } m.currentMx.RUnlock() var stoppedWg sync.WaitGroup stoppedWg.Add(len(stop)) for _, existing := range stop { m.logger.Debugf("Stopping component %q", existing.id) _ = existing.stop(teardown, model.Signed) // stop is async, wait for operation to finish, // otherwise new instance may be started and components // may fight for resources (e.g. ports, files, locks) go func(state *componentRuntimeState) { err := m.waitForStopped(state) if err != nil { m.logger.Errorf("updating components: failed waiting %s stop", state.id) } stoppedWg.Done() }(existing) } stoppedWg.Wait() // start new components for _, comp := range newComponents { // new component; create its runtime logger := m.baseLogger.Named(fmt.Sprintf("component.runtime.%s", comp.ID)) state, err := newComponentRuntimeState(m, logger, m.monitor, comp, m.isLocal) if err != nil { return fmt.Errorf("failed to create new component %s: %w", comp.ID, err) } m.currentMx.Lock() m.current[comp.ID] = state m.currentMx.Unlock() m.logger.Debugf("Starting component %q", comp.ID) if err = state.start(); err != nil { return fmt.Errorf("failed to start component %s: %w", comp.ID, err) } } return nil } func (m *Manager) waitForStopped(comp *componentRuntimeState) error { if comp == nil { return nil } currComp := comp.getCurrent() compID := currComp.ID timeout := defaultStopTimeout if currComp.InputSpec != nil && currComp.InputSpec.Spec.Service != nil && currComp.InputSpec.Spec.Service.Operations.Uninstall != nil && currComp.InputSpec.Spec.Service.Operations.Uninstall.Timeout > 0 { // if component is a service and timeout is defined, use the one defined timeout = currComp.InputSpec.Spec.Service.Operations.Uninstall.Timeout } timeoutCh := time.After(timeout) for { latestState := comp.getLatest() if latestState.State == client.UnitStateStopped { m.logger.Debugf("component %q stopped.", compID) return nil } // it might happen the component stop signal isn't received but the // manager detects it stopped running. Then the manager removes it from // its list of current components. Therefore, we also need to check if // the component was removed, if it was, we consider it stopped. m.currentMx.RLock() if _, exists := m.current[compID]; !exists { m.currentMx.RUnlock() return nil } m.currentMx.RUnlock() select { case <-timeoutCh: return fmt.Errorf("timeout exceeded after %s", timeout) case <-time.After(stopCheckRetryPeriod): } } } // Called from Manager's Run goroutine. func (m *Manager) shutdown() { // don't tear down as this is just a shutdown, so components most likely will come back // on next start of the manager _ = m.update(component.Model{Components: []component.Component{}}, false) // wait until all components are removed for { m.currentMx.RLock() length := len(m.current) m.currentMx.RUnlock() if length <= 0 { return } <-time.After(100 * time.Millisecond) } } // stateChanged notifies of the state change and returns true if the state is final (stopped) func (m *Manager) stateChanged(state *componentRuntimeState, latest ComponentState) (exit bool) { m.subAllMx.RLock() for _, sub := range m.subscribeAll { select { case <-sub.ctx.Done(): case sub.ch <- ComponentComponentState{ Component: state.getCurrent(), State: latest, }: } } m.subAllMx.RUnlock() m.subMx.RLock() subs := m.subscriptions[state.id] for _, sub := range subs { select { case <-sub.ctx.Done(): case sub.ch <- latest: } } m.subMx.RUnlock() shutdown := state.shuttingDown.Load() if shutdown && latest.State == client.UnitStateStopped { // shutdown is complete; remove from currComp m.currentMx.Lock() delete(m.current, state.id) m.currentMx.Unlock() exit = true } return exit } func (m *Manager) getCertificate(chi *tls.ClientHelloInfo) (*tls.Certificate, error) { var cert *tls.Certificate m.currentMx.RLock() for _, runtime := range m.current { if runtime.comm.name == chi.ServerName { cert = runtime.comm.cert.Certificate break } } m.currentMx.RUnlock() if cert != nil { return cert, nil } return nil, errors.New("no supported TLS certificate") } // Called from GRPC listeners func (m *Manager) getRuntimeFromToken(token string) *componentRuntimeState { m.currentMx.RLock() defer m.currentMx.RUnlock() for _, runtime := range m.current { if runtime.comm.token == token { return runtime } } return nil } func (m *Manager) getRuntimeFromUnit(comp component.Component, unit component.Unit) *componentRuntimeState { m.currentMx.RLock() defer m.currentMx.RUnlock() for _, c := range m.current { if c.id == comp.ID { currComp := c.getCurrent() for _, u := range currComp.Units { if u.Type == unit.Type && u.ID == unit.ID { return c } } } } return nil } func (m *Manager) getRuntimeFromComponent(comp component.Component) *componentRuntimeState { m.currentMx.RLock() defer m.currentMx.RUnlock() for _, currentComp := range m.current { if currentComp.id == comp.ID { return currentComp } } return nil } func (m *Manager) getListenAddr() string { if m.isLocal { return m.listenAddr } host, port, err := net.SplitHostPort(m.listenAddr) if err == nil && port == "0" { return net.JoinHostPort(host, strconv.Itoa(m.listenPort)) } return m.listenAddr } // performDiagAction creates a diagnostic ActionRequest and executes it against the runtime that's mapped to the specified component. // if the specified actionLevel is ActionRequest_COMPONENT, the unit field is ignored. func (m *Manager) performDiagAction(ctx context.Context, comp component.Component, unit component.Unit, actionLevel proto.ActionRequest_Level, params client.DiagnosticParams) ([]*proto.ActionDiagnosticUnitResult, error) { // if we're gathering CPU diagnostics, request a longer timeout; CPU diag collection requires the diagnostic hook to sit and gather a CPU profile. finalDiagnosticTime := diagnosticTimeout for _, tag := range params.AdditionalMetrics { if tag == "CPU" { finalDiagnosticTime = diagnosticTimeoutCPU break } } ctx, cancel := context.WithTimeout(ctx, finalDiagnosticTime) defer cancel() id, err := uuid.NewV4() if err != nil { return nil, err } var runtime *componentRuntimeState if actionLevel == proto.ActionRequest_UNIT { runtime = m.getRuntimeFromUnit(comp, unit) if runtime == nil { return nil, ErrNoUnit } } else { runtime = m.getRuntimeFromComponent(comp) if runtime == nil { return nil, ErrNoComponent } } if len(params.AdditionalMetrics) > 0 { m.logger.Debugf("Performing diagnostic action with params: %v; will wait %s", params.AdditionalMetrics, finalDiagnosticTime) } marshalParams, err := json.Marshal(params) if err != nil { return nil, fmt.Errorf("error marshalling json for params: %w", err) } req := &proto.ActionRequest{ Id: id.String(), Type: proto.ActionRequest_DIAGNOSTICS, Level: actionLevel, Params: marshalParams, } if actionLevel == proto.ActionRequest_UNIT { req.UnitId = unit.ID req.UnitType = proto.UnitType(unit.Type) } res, err := runtime.performAction(ctx, req) // the only way this can return an error is a context Done(), be sure to make that explicit. if err != nil { if errors.Is(err, context.DeadlineExceeded) { return nil, fmt.Errorf("diagnostic action timed out, deadline is %s: %w", finalDiagnosticTime, err) } return nil, fmt.Errorf("error running performAction: %w", err) } if res.Status == proto.ActionResponse_FAILED { var respBody map[string]interface{} if res.Result != nil { err = json.Unmarshal(res.Result, &respBody) if err != nil { return nil, fmt.Errorf("error unmarshaling JSON in FAILED response: %w", err) } errMsgT, ok := respBody["error"] if ok { errMsg, ok := errMsgT.(string) if ok { return nil, errors.New(errMsg) } } } return nil, errors.New("unit failed to perform diagnostics, no error could be extracted from response") } return res.Diagnostic, nil } // deriveCommsAddress derives the comms socket/pipe path/name from given control address and GRPC config func deriveCommsAddress(controlAddress string, grpc *configuration.GRPCConfig) (string, error) { if grpc.IsLocal() { return deriveCommsSocketName(controlAddress) } return grpc.String(), nil } var errInvalidUri = errors.New("invalid uri") // deriveCommsSocketName derives the agent communication unix/npipe path // currently from the control socket path, since it's already set properly // matching the socket path length to meet the system limits of the platform func deriveCommsSocketName(uri string) (string, error) { u, err := url.Parse(uri) if err != nil { return "", err } if len(u.Path) == 0 || (u.Scheme != "unix" && u.Scheme != "npipe") { return "", fmt.Errorf("%w %s", errInvalidUri, uri) } // The base name without extension and use it as id argument for SocketURLWithFallback call // THe idea it to use the same logic for the comms path as for the control socket/pipe path base := strings.TrimSuffix(path.Base(u.Path), path.Ext(u.Path)) return utils.SocketURLWithFallback(base, path.Dir(u.Path)), nil }