internal/daemon/oracle/oracle.go (187 lines of code) (raw):

/* Copyright 2024 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package oracle implements the Oracle workload agent service. package oracle import ( "context" "runtime" "sync" "time" "github.com/GoogleCloudPlatform/workloadagent/internal/oraclediscovery" "github.com/GoogleCloudPlatform/workloadagent/internal/oraclemetrics" "github.com/GoogleCloudPlatform/workloadagent/internal/servicecommunication" "github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics" cpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/recovery" ) // Service implements the interfaces for Oracle workload agent service. type Service struct { Config *cpb.Configuration CloudProps *cpb.CloudProperties metricCollectionRoutine *recovery.RecoverableRoutine discoveryRoutine *recovery.RecoverableRoutine currentSIDs []string CommonCh <-chan *servicecommunication.Message isProcessPresent bool processes []servicecommunication.ProcessWrapper processesMutex sync.Mutex } type runDiscoveryArgs struct { s *Service } type runMetricCollectionArgs struct { s *Service } var oraProcessPrefixes = []string{"ora_pmon_", "db_pmon_"} // Start initiates the Oracle workload agent service func (s *Service) Start(ctx context.Context, a any) { go (func() { for { s.checkServiceCommunication(ctx) } })() // Check if the enabled field is unset. If it is, then the service is still enabled if the workload is present. if s.Config.GetOracleConfiguration().Enabled == nil { log.CtxLogger(ctx).Info("Oracle service enabled field is not set, will check for workload presence to determine if service should be enabled.") // If the workload is present, proceed with starting the service even if it is not enabled. for !s.isProcessPresent { time.Sleep(5 * time.Second) } log.CtxLogger(ctx).Info("Oracle workload is present. Starting service.") } else if !s.Config.GetOracleConfiguration().GetEnabled() { log.CtxLogger(ctx).Info("Oracle service is disabled") return } if runtime.GOOS != "linux" { log.CtxLogger(ctx).Error("Oracle service is only supported on Linux") return } if s.Config.GetOracleConfiguration().GetOracleDiscovery().GetEnabled() { dCtx := log.SetCtx(ctx, "context", "OracleDiscovery") s.discoveryRoutine = &recovery.RecoverableRoutine{ Routine: runDiscovery, RoutineArg: runDiscoveryArgs{s}, ErrorCode: usagemetrics.OracleDiscoverDatabaseFailure, UsageLogger: *usagemetrics.UsageLogger, ExpectedMinDuration: 0, } s.discoveryRoutine.StartRoutine(dCtx) } if s.Config.GetOracleConfiguration().GetOracleMetrics().GetEnabled() { mcCtx := log.SetCtx(ctx, "context", "OracleMetricCollection") s.metricCollectionRoutine = &recovery.RecoverableRoutine{ Routine: runMetricCollection, RoutineArg: runMetricCollectionArgs{s}, ErrorCode: usagemetrics.OracleMetricCollectionFailure, UsageLogger: *usagemetrics.UsageLogger, ExpectedMinDuration: 0, } s.metricCollectionRoutine.StartRoutine(mcCtx) } select { case <-ctx.Done(): log.CtxLogger(ctx).Info("Oracle workload agent service cancellation requested") return } } func runDiscovery(ctx context.Context, a any) { log.CtxLogger(ctx).Info("Running Oracle Discovery") var args runDiscoveryArgs var ok bool if args, ok = a.(runDiscoveryArgs); !ok { log.CtxLogger(ctx).Error("args is not of type runDiscoveryArgs") return } s := args.s ticker := time.NewTicker(args.s.Config.GetOracleConfiguration().GetOracleDiscovery().GetUpdateFrequency().AsDuration()) defer ticker.Stop() ds := oraclediscovery.New() for { // Discovery data is not used yet. s.processesMutex.Lock() processes := s.processes s.processesMutex.Unlock() // Don't start discovery until processes are populated. for processes == nil { time.Sleep(5 * time.Second) s.processesMutex.Lock() processes = s.processes s.processesMutex.Unlock() // Respect context cancellation. select { case <-ctx.Done(): log.CtxLogger(ctx).Info("Oracle Discovery cancellation requested") return default: continue } } _, err := ds.Discover(ctx, s.CloudProps, processes) if err != nil { log.CtxLogger(ctx).Errorw("Failed to discover databases", "error", err) return } select { case <-ctx.Done(): log.CtxLogger(ctx).Info("Oracle Discovery cancellation requested") return case <-ticker.C: continue } } } func runMetricCollection(ctx context.Context, a any) { log.CtxLogger(ctx).Info("Running Oracle metric collection") var args runMetricCollectionArgs var ok bool if args, ok = a.(runMetricCollectionArgs); !ok { log.CtxLogger(ctx).Errorw("Failed to parse metric collection args", "args", a) return } ticker := time.NewTicker(args.s.Config.GetOracleConfiguration().GetOracleMetrics().GetCollectionFrequency().AsDuration()) defer ticker.Stop() metricCollector, err := oraclemetrics.New(ctx, args.s.Config) if err != nil { log.CtxLogger(ctx).Errorw("Failed to initialize metric collector", "error", err) return } for { select { case <-ctx.Done(): log.CtxLogger(ctx).Info("Metric Collection cancellation requested") return case <-ticker.C: metricCollector.SendHealthMetricsToCloudMonitoring(ctx) metricCollector.SendDefaultMetricsToCloudMonitoring(ctx) } } } // checkServiceCommunication listens to the common channel for messages and processes them. func (s *Service) checkServiceCommunication(ctx context.Context) { // Effectively give ctx.Done() priority over the channel. if ctx.Err() != nil { return } select { case <-ctx.Done(): return case msg := <-s.CommonCh: log.CtxLogger(ctx).Debugw("Oracle workload agent service received a message on the common channel", "message", msg) switch msg.Origin { case servicecommunication.Discovery: log.CtxLogger(ctx).Debugw("Oracle workload agent service received a discovery message") s.processesMutex.Lock() s.processes = msg.DiscoveryResult.Processes s.processesMutex.Unlock() for _, p := range msg.DiscoveryResult.Processes { name, err := p.Name() if err == nil && servicecommunication.HasAnyPrefix(name, oraProcessPrefixes) { s.isProcessPresent = true break } } case servicecommunication.DWActivation: log.CtxLogger(ctx).Debugw("Oracle workload agent service received a DW activation message") default: log.CtxLogger(ctx).Debugw("Oracle workload agent service received a message with an unexpected origin", "origin", msg.Origin) } } } // String returns the name of the oracle service. func (s *Service) String() string { return "Oracle Service" } // ErrorCode returns the error code for the oracle service. func (s *Service) ErrorCode() int { return usagemetrics.OracleServiceError } // ExpectedMinDuration returns the expected minimum duration for the oracle service. // Used by the recovery handler to determine if the service ran long enough to be considered // successful. func (s *Service) ExpectedMinDuration() time.Duration { return 0 }