components/google-built-opentelemetry-collector/extension/healthagent/healthagent.go (124 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package healthagent import ( "context" "fmt" "net" "sync" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/grpc" "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/components/google-built-opentelemetry-collector/extension/healthagent/internal/healthpb" ) type healthAgent struct { config Config logger *zap.SugaredLogger server *grpc.Server set extension.Settings healthMetric metric.Int64ObservableGauge mu sync.Mutex ready bool lastError *componentstatus.Event lastOK *componentstatus.Event } func newHealthAgent(config Config, set extension.Settings) *healthAgent { return &healthAgent{ config: config, set: set, logger: set.Logger.Sugar(), } } // isHealthy returns true iff the agent is HEALTHY. func (hc *healthAgent) isHealthy() bool { hc.mu.Lock() defer hc.mu.Unlock() if !hc.ready { hc.logger.Debug("Pipelines are not ready yet") return false } if hc.lastOK == nil && hc.lastError == nil { // Should not happen, but let's return HEALTHY since we haven't seen any errors. return true } if hc.lastError == nil { // There was never an error => HEALTHY return true } if hc.lastOK == nil { // There was never OK => UNHEALTHY hc.logger.Infof("There was never OK => UNHEALTHY") return false } // If lastError happenned after lastOk => UNHEALTHY // else, if lastError is within (time.Now() - ErrorCheckInterval, time.Now()] => UNHEALTHY // else => HEALTHY if hc.lastError.Timestamp().After(hc.lastOK.Timestamp()) { hc.logger.Infof("lastError happenned after lastOk, hc.lastError: %v, hc.lastOK: %v", hc.lastError.Timestamp(), hc.lastOK.Timestamp()) return false } if hc.lastError.Timestamp().After(time.Now().Add(-hc.config.ErrorCheckInterval)) { hc.logger.Infof("lastError is within (time.Now() - ErrorCheckInterval, time.Now()], hc.lastError: %v, hc.lastOK: %v", hc.lastError.Timestamp(), hc.lastOK.Timestamp()) return false } return true } func (hc *healthAgent) startGRPCServer(host component.Host) error { lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%s", hc.config.Port)) if err != nil { return err } hc.server = grpc.NewServer() healthpb.RegisterHealthAgentServer(hc.server, newServer(&hc.config, hc.logger, hc)) go func() { err := hc.server.Serve(lis) if err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() return nil } func updateStatus(st **componentstatus.Event, event *componentstatus.Event) { if *st == nil || (*st).Timestamp().Before(event.Timestamp()) { *st = event } } // healthAgent subscribes to OpenTelemetry updates about components via ComponentStatusChanged function. // Any component (exporter\receiver\processor) that reports its status will be caught here. // Code references: // - https://github.com/open-telemetry/opentelemetry-collector/blob/v0.92.0/extension/extension.go#L62 // - https://github.com/open-telemetry/opentelemetry-collector/blob/v0.92.0/component/status.go#L27-L42 // - https://github.com/open-telemetry/opentelemetry-collector/pull/8169#issuecomment-1670048722 func (hc *healthAgent) ComponentStatusChanged(source *componentstatus.InstanceID, event *componentstatus.Event) { if event.Status() != componentstatus.StatusOK && event.Status() != componentstatus.StatusRecoverableError { return } hc.mu.Lock() defer hc.mu.Unlock() hc.logger.Debugf("Health check status updated to %s, based on signal from component %s", event.Status().String(), source.ComponentID().String()) if event.Status() == componentstatus.StatusOK { updateStatus(&hc.lastOK, event) } else { updateStatus(&hc.lastError, event) } } // Start and Shutdown are defined here: // https://github.com/open-telemetry/opentelemetry-collector/blob/v0.55.0/component/component.go#L45-L72. func (hc *healthAgent) Start(_ context.Context, host component.Host) error { err := hc.startGRPCServer(host) if err != nil { return err } // Check if the host implements componentstatus.Reporter if _, ok := host.(componentstatus.Reporter); ok { hc.logger.Info("Health Agent Host implements componentstatus.Reporter") } else { hc.logger.Info("Health Agent Host does not implement componentstatus.Reporter") } return nil } func (hc *healthAgent) Shutdown(context.Context) error { // If `lis` creation failed in startGRPCServer, then hc.server is nil. if hc.server == nil { return nil } hc.server.GracefulStop() // Calls `Stop()` on `lis` from `startGRPCServer`. return nil } // Ready and NotReady are defined here: // https://github.com/open-telemetry/opentelemetry-collector/blob/v0.55.0/component/extension.go#L30-L45. func (hc *healthAgent) Ready() error { hc.mu.Lock() defer hc.mu.Unlock() hc.ready = true return nil } func (hc *healthAgent) NotReady() error { hc.mu.Lock() defer hc.mu.Unlock() hc.ready = false return nil }