internal/mysqlmetrics/mysqlmetrics.go (276 lines of code) (raw):

/* Copyright 2024 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package mysqlmetrics implements metric collection for the MySQL workload agent service. package mysqlmetrics import ( "context" "database/sql" "errors" "fmt" "runtime" "strconv" "strings" "github.com/go-sql-driver/mysql" "github.com/GoogleCloudPlatform/workloadagent/internal/workloadmanager" configpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/commandlineexecutor" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/secret" ) const ( innoDBKey = "is_inno_db_default" bufferPoolKey = "buffer_pool_size" totalRAMKey = "total_ram" ) type gceInterface interface { GetSecret(ctx context.Context, projectID, secretName string) (string, error) } type rowsInterface interface { Next() bool Close() error Scan(dest ...any) error } type dbWrapper struct { db *sql.DB dbInterface } type dbInterface interface { QueryContext(ctx context.Context, query string, args ...any) (rowsInterface, error) Ping() error } func (d dbWrapper) QueryContext(ctx context.Context, query string, args ...any) (rowsInterface, error) { return d.db.QueryContext(ctx, query, args...) } func (d dbWrapper) Ping() error { return d.db.Ping() } // MySQLMetrics contains variables and methods to collect metrics for MySQL databases running on the current host. type MySQLMetrics struct { execute commandlineexecutor.Execute Config *configpb.Configuration db dbInterface connect func(ctx context.Context, dataSource string) (dbInterface, error) WLMClient workloadmanager.WLMWriter } type engineResult struct { // these are the fields in the table output from SHOW ENGINES engine string support string comment string transactions string xa string savepoints string } // password gets the password for the MySQL database. // If the password is set in the configuration, it is used directly (not recommended). // Otherwise, if the secret configuration is set, the secret is fetched from GCE. // Without either, the password is not set and requests to the MySQL database will fail. func (m *MySQLMetrics) password(ctx context.Context, gceService gceInterface) (secret.String, error) { pw := "" if m.Config.GetMysqlConfiguration().GetConnectionParameters().GetPassword() != "" { return secret.String(m.Config.GetMysqlConfiguration().GetConnectionParameters().GetPassword()), nil } secretCfg := m.Config.GetMysqlConfiguration().GetConnectionParameters().GetSecret() if secretCfg.GetSecretName() != "" && secretCfg.GetProjectId() != "" { var err error pw, err = gceService.GetSecret(ctx, secretCfg.GetProjectId(), secretCfg.GetSecretName()) if err != nil { return secret.String(""), fmt.Errorf("failed to get secret: %v", err) } } return secret.String(pw), nil } // defaultConnect connects to the MySQL database and pings it. func defaultConnect(ctx context.Context, dataSource string) (dbInterface, error) { d, err := sql.Open("mysql", dataSource) if err != nil { return nil, fmt.Errorf("failed to open MySQL connection: %v", err) } return dbWrapper{db: d}, nil } func (m *MySQLMetrics) dbDSN(ctx context.Context, gceService gceInterface) (string, error) { pw, err := m.password(ctx, gceService) if err != nil { return "", fmt.Errorf("initializing password: %w", err) } cfg := mysql.Config{ User: m.Config.GetMysqlConfiguration().GetConnectionParameters().GetUsername(), Passwd: pw.SecretValue(), Addr: "localhost:3306", // using localhost because the agent is running on the same machine as the MySQL server DBName: "mysql", } return cfg.FormatDSN(), nil } // New creates a new MySQLMetrics object initialized with default values. func New(ctx context.Context, config *configpb.Configuration, wlmClient workloadmanager.WLMWriter) *MySQLMetrics { return &MySQLMetrics{ execute: commandlineexecutor.ExecuteCommand, Config: config, connect: defaultConnect, WLMClient: wlmClient, } } // InitDB initializes the MySQL database connection. func (m *MySQLMetrics) InitDB(ctx context.Context, gceService gceInterface) error { dbDSN, err := m.dbDSN(ctx, gceService) if err != nil { return fmt.Errorf("getting dbDSN: %w", err) } db, err := m.connect(ctx, dbDSN) if err != nil { return fmt.Errorf("connecting to MySQL: %w", err) } m.db = db err = m.db.Ping() if err != nil { return fmt.Errorf("failed to ping MySQL connection: %v", err) } log.CtxLogger(ctx).Debugw("MySQL connection ping success") return nil } func executeQuery(ctx context.Context, db dbInterface, query string) (rowsInterface, error) { return db.QueryContext(ctx, query) } func readEngine(ctx context.Context, rows rowsInterface) (engineResult, error) { // These are the fields in the table output from SHOW ENGINES. var engine sql.NullString var support sql.NullString var comment sql.NullString var transactions sql.NullString var xa sql.NullString var savepoints sql.NullString if err := rows.Scan(&engine, &support, &comment, &transactions, &xa, &savepoints); err != nil { return engineResult{}, err } log.CtxLogger(ctx).Debugw("MySQL table name", "engine", engine, "support", support, "comment", comment, "transactions", transactions, "xa", xa, "savepoints", savepoints) return engineResult{ engine: engine.String, support: support.String, comment: comment.String, transactions: transactions.String, xa: xa.String, savepoints: savepoints.String, }, nil } func (m *MySQLMetrics) isInnoDBStorageEngine(ctx context.Context) (bool, error) { rows, err := executeQuery(ctx, m.db, "SHOW ENGINES") if err != nil { return false, fmt.Errorf("issue trying to show engines: %v", err) } log.CtxLogger(ctx).Debugw("MySQL show engines result", "rows", rows) engineResults := []engineResult{} defer rows.Close() for rows.Next() { engineResult, err := readEngine(ctx, rows) if err != nil { log.CtxLogger(ctx).Debugw("MySQL read engine error", "err", err) continue } engineResults = append(engineResults, engineResult) } isInnoDBDefault := false for _, engineResult := range engineResults { if engineResult.engine == "InnoDB" && engineResult.support == "DEFAULT" { isInnoDBDefault = true } } log.CtxLogger(ctx).Debugw("MySQL isInnoDBDefault", "isInnoDBDefault", isInnoDBDefault) return isInnoDBDefault, nil } func (m *MySQLMetrics) bufferPoolSize(ctx context.Context) (int64, error) { rows, err := executeQuery(ctx, m.db, "SELECT @@innodb_buffer_pool_size") if err != nil { log.CtxLogger(ctx).Debugw("MySQL buffer pool size error", "err", err) return 0, fmt.Errorf("can't get buffer pool size in test MySQL connection: %v", err) } log.CtxLogger(ctx).Debugw("MySQL buffer pool size result", "rows", rows) defer rows.Close() var bufferPoolSize int64 if !rows.Next() { return 0, errors.New("no rows returned from buffer pool size query") } if err := rows.Scan(&bufferPoolSize); err != nil { return 0, err } log.CtxLogger(ctx).Debugw("MySQL buffer pool size", "bufferPoolSize", bufferPoolSize) return bufferPoolSize, nil } func windowsTotalRAM(ctx context.Context, output string) (int, error) { // Expected to be something like "TotalPhysicalMemory\n134876032413" lines := strings.Split(output, "\n") if len(lines) < 2 { return 0, fmt.Errorf("not enough lines found in output for windows total RAM: %d", len(lines)) } ramString := strings.TrimSpace(lines[1]) ram, err := strconv.Atoi(ramString) if err != nil { return 0, fmt.Errorf("failed to convert total RAM to integer: %v", err) } return ram, nil } func (m *MySQLMetrics) totalRAM(ctx context.Context, isWindowsOS bool) (int, error) { cmd := commandlineexecutor.Params{ Executable: "grep", Args: []string{"MemTotal", "/proc/meminfo"}, } if isWindowsOS { cmd = commandlineexecutor.Params{ Executable: "cmd", Args: []string{"/C", "wmic", "computersystem", "get", "totalphysicalmemory"}, } } log.CtxLogger(ctx).Debugw("getTotalRAM command", "command", cmd) res := m.execute(ctx, cmd) log.CtxLogger(ctx).Debugw("getTotalRAM result", "result", res) if res.Error != nil { return 0, fmt.Errorf("failed to execute command: %v", res.Error) } if isWindowsOS { return windowsTotalRAM(ctx, res.StdOut) } // Expected to be something like "MemTotal: 1348760 kB" lines := strings.Split(res.StdOut, "\n") fields := strings.Fields(lines[0]) if len(fields) != 3 { return 0, fmt.Errorf("found wrong number of fields in total RAM: %d", len(fields)) } ram, err := strconv.Atoi(fields[1]) if err != nil { return 0, fmt.Errorf("failed to convert total RAM to integer: %v", err) } units := fields[2] if strings.ToUpper(units) == "KB" { ram = ram * 1024 } return ram, nil } // CollectMetricsOnce collects metrics for MySQL databases running on the host. func (m *MySQLMetrics) CollectMetricsOnce(ctx context.Context) (*workloadmanager.WorkloadMetrics, error) { bufferPoolSize, err := m.bufferPoolSize(ctx) if err != nil { log.CtxLogger(ctx).Warnf("Failed to get buffer pool size: %v", err) return nil, err } isWindowsOS := runtime.GOOS == "windows" totalRAM, err := m.totalRAM(ctx, isWindowsOS) if err != nil { log.CtxLogger(ctx).Warnf("Failed to get total RAM: %v", err) return nil, err } isInnoDBDefault, err := m.isInnoDBStorageEngine(ctx) if err != nil { log.CtxLogger(ctx).Warnf("Failed to get InnoDB default status: %v", err) return nil, err } log.CtxLogger(ctx).Debugw("Finished collecting MySQL metrics once. Next step is to send to WLM (DW).", bufferPoolKey, bufferPoolSize, totalRAMKey, totalRAM, innoDBKey, isInnoDBDefault, ) metrics := workloadmanager.WorkloadMetrics{ WorkloadType: workloadmanager.MYSQL, Metrics: map[string]string{ bufferPoolKey: strconv.FormatInt(bufferPoolSize, 10), totalRAMKey: strconv.Itoa(totalRAM), innoDBKey: strconv.FormatBool(isInnoDBDefault), }, } res, err := workloadmanager.SendDataInsight(ctx, workloadmanager.SendDataInsightParams{ WLMetrics: metrics, CloudProps: m.Config.GetCloudProperties(), WLMService: m.WLMClient, }) if err != nil { return nil, err } if res == nil { log.CtxLogger(ctx).Warn("SendDataInsight did not return an error but the WriteInsight response is nil") return &metrics, nil } log.CtxLogger(ctx).Debugw("WriteInsight response", "StatusCode", res.HTTPStatusCode) return &metrics, nil }