internal/daemon/mysql/mysql.go (192 lines of code) (raw):

/* Copyright 2024 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package mysql implements the MySQL workload agent service. package mysql import ( "context" "time" "go.uber.org/zap/zapcore" "github.com/GoogleCloudPlatform/workloadagent/internal/mysqldiscovery" "github.com/GoogleCloudPlatform/workloadagent/internal/mysqlmetrics" "github.com/GoogleCloudPlatform/workloadagent/internal/servicecommunication" "github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics" "github.com/GoogleCloudPlatform/workloadagent/internal/workloadmanager" configpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/gce" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/recovery" ) const ( discoveryFrequency = 10 * time.Minute wlmCollectionFrequency = 5 * time.Minute ) // Service implements the interfaces for MySQL workload agent service. type Service struct { Config *configpb.Configuration CloudProps *configpb.CloudProperties CommonCh <-chan *servicecommunication.Message processes servicecommunication.DiscoveryResult mySQLProcesses []servicecommunication.ProcessWrapper dwActivated bool WLMClient workloadmanager.WLMWriter } type runDiscoveryArgs struct { s *Service } type runMetricCollectionArgs struct { s *Service } // Start initiates the MySQL workload agent service func (s *Service) Start(ctx context.Context, a any) { if s.Config.GetMysqlConfiguration() != nil && !s.Config.GetMysqlConfiguration().GetEnabled() { // If MySQL workload agent service is explicitly disabled in the configuration, then return. log.CtxLogger(ctx).Info("MySQL workload agent service is disabled in the configuration") return } go (func() { for { s.checkServiceCommunication(ctx) } })() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() enabled := s.Config.GetMysqlConfiguration().GetEnabled() EnableCheck: for { select { case <-ctx.Done(): log.CtxLogger(ctx).Info("MySQL workload agent service cancellation requested") return case <-ticker.C: // Once DW is enabled and the workload is present/enabled, start discovery and metric collection. if s.dwActivated && (s.isWorkloadPresent() || enabled) { log.CtxLogger(ctx).Info("MySQL workload agent service is enabled and DW is activated. Starting discovery and metric collection") break EnableCheck } } } // Start MySQL Discovery dCtx := log.SetCtx(ctx, "context", "MySQLDiscovery") discoveryRoutine := &recovery.RecoverableRoutine{ Routine: runDiscovery, RoutineArg: runDiscoveryArgs{s}, ErrorCode: usagemetrics.MySQLDiscoveryFailure, UsageLogger: *usagemetrics.UsageLogger, ExpectedMinDuration: 0, } discoveryRoutine.StartRoutine(dCtx) // Start MySQL Metric Collection mcCtx := log.SetCtx(ctx, "context", "MySQLMetricCollection") metricCollectionRoutine := &recovery.RecoverableRoutine{ Routine: runMetricCollection, RoutineArg: runMetricCollectionArgs{s}, ErrorCode: usagemetrics.MySQLMetricCollectionFailure, UsageLogger: *usagemetrics.UsageLogger, ExpectedMinDuration: 0, } metricCollectionRoutine.StartRoutine(mcCtx) select { case <-ctx.Done(): log.CtxLogger(ctx).Info("MySQL workload agent service cancellation requested") return } } func runDiscovery(ctx context.Context, a any) { log.CtxLogger(ctx).Info("Starting MySQL Discovery") var args runDiscoveryArgs var ok bool if args, ok = a.(runDiscoveryArgs); !ok { log.CtxLogger(ctx).Errorw("failed to parse discovery args", "args", a) return } log.CtxLogger(ctx).Debugw("MySQL discovery args", "args", args) ticker := time.NewTicker(discoveryFrequency) defer ticker.Stop() for { select { case <-ctx.Done(): log.CtxLogger(ctx).Info("MySQL discovery cancellation requested") return case <-ticker.C: mysqldiscovery.Discover(ctx) } } } func runMetricCollection(ctx context.Context, a any) { log.CtxLogger(ctx).Info("Starting MySQL Metric Collection") var args runMetricCollectionArgs var ok bool if args, ok = a.(runMetricCollectionArgs); !ok { log.CtxLogger(ctx).Errorf("failed to parse metric collection args", "args", a) return } log.CtxLogger(ctx).Debugw("MySQL metric collection args", "args", args) ticker := time.NewTicker(wlmCollectionFrequency) defer ticker.Stop() gceService, err := gce.NewGCEClient(ctx) if err != nil { usagemetrics.Error(usagemetrics.GCEServiceCreationFailure) log.CtxLogger(ctx).Errorf("initializing GCE services: %w", err) return } m := mysqlmetrics.New(ctx, args.s.Config, args.s.WLMClient) err = m.InitDB(ctx, gceService) if err != nil { log.CtxLogger(ctx).Errorf("failed to initialize MySQL DB: %v", err) return } for { m.CollectMetricsOnce(ctx) select { case <-ctx.Done(): log.CtxLogger(ctx).Info("MySQL metric collection cancellation requested") return case <-ticker.C: continue } } } // checkServiceCommunication listens to the common channel for messages and processes them. func (s *Service) checkServiceCommunication(ctx context.Context) { // Effectively give ctx.Done() priority over the channel. if ctx.Err() != nil { return } select { case <-ctx.Done(): return case msg := <-s.CommonCh: log.CtxLogger(ctx).Debugw("MySQL workload agent service received a message on the common channel", "message", msg) switch msg.Origin { case servicecommunication.Discovery: s.processes = msg.DiscoveryResult s.identifyMySQLProcesses(ctx) case servicecommunication.DWActivation: s.dwActivated = msg.DWActivationResult.Activated default: log.CtxLogger(ctx).Debugw("MySQL workload agent service received a message with an unexpected origin", "origin", msg.Origin) } } } func (s *Service) identifyMySQLProcesses(ctx context.Context) { s.mySQLProcesses = []servicecommunication.ProcessWrapper{} for _, process := range s.processes.Processes { name, err := process.Name() if err == nil && name == "mysqld" { s.mySQLProcesses = append(s.mySQLProcesses, process) } } s.logMySQLProcesses(ctx, zapcore.DebugLevel) } func (s *Service) isWorkloadPresent() bool { return len(s.mySQLProcesses) > 0 } func (s *Service) logMySQLProcesses(ctx context.Context, loglevel zapcore.Level) { log.CtxLogger(ctx).Logf(loglevel, "Number of processes found: %v", len(s.processes.Processes)) log.CtxLogger(ctx).Logf(loglevel, "Number of MySQL processes found: %v", len(s.mySQLProcesses)) for _, process := range s.mySQLProcesses { name, _ := process.Name() username, _ := process.Username() cmdline, _ := process.CmdlineSlice() env, _ := process.Environ() log.CtxLogger(ctx).Logw(loglevel, "MySQL process", "name", name, "username", username, "cmdline", cmdline, "env", env, "pid", process.Pid()) } } // String returns the name of the MySQL service. func (s *Service) String() string { return "MySQL Service" } // ErrorCode returns the error code for the MySQL service. func (s *Service) ErrorCode() int { return usagemetrics.MySQLServiceError } // ExpectedMinDuration returns the expected minimum duration for the MySQL service. // Used by the recovery handler to determine if the service ran long enough to be considered // successful. func (s *Service) ExpectedMinDuration() time.Duration { return 0 }