common/metrics/metrics.go (131 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package utils contains common helper functions used across multiple other packages. // Utils should not import any Spanner migration tool packages. package metrics import ( "context" "fmt" "math" "cloud.google.com/go/monitoring/dashboard/apiv1/dashboardpb" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal/reports" "github.com/GoogleCloudPlatform/spanner-migration-tool/proto/migration" ) // GetMigrationData returns migration data comprising source schema details, // request id, target dialect, connection mechanism etc based on // the conv object, source driver and target db func GetMigrationData(conv *internal.Conv, driver, typeOfConv string) *migration.MigrationData { migrationData := migration.MigrationData{ MigrationRequestId: &conv.Audit.MigrationRequestId, } if typeOfConv == constants.DataConv { return &migrationData } migrationData.MigrationType = conv.Audit.MigrationType migrationData.SourceConnectionMechanism, migrationData.Source = getMigrationDataSourceDetails(driver, &migrationData) migrationData.SchemaPatterns = getMigrationDataSchemaPatterns(conv, &migrationData) switch conv.SpDialect { case constants.DIALECT_GOOGLESQL: migrationData.TargetDialect = migration.MigrationData_GOOGLE_STANDARD_SQL.Enum() case constants.DIALECT_POSTGRESQL: migrationData.TargetDialect = migration.MigrationData_POSTGRESQL_STANDARD_SQL.Enum() default: migrationData.TargetDialect = migration.MigrationData_TARGET_DIALECT_UNSPECIFIED.Enum() } return &migrationData } // getMigrationDataSchemaPatterns returns schema petterns like number of tables, foreign key, primary key, // indexes, interleaves, max interleave depth in the spanner schema and count of missing primary keys // if any in source schema func getMigrationDataSchemaPatterns(conv *internal.Conv, migrationData *migration.MigrationData) *migration.MigrationData_SchemaPatterns { numTables := int32(len(conv.SrcSchema)) var numForeignKey, numIndexes, numMissingPrimaryKey, numInterleaves, maxInterleaveDepth, numColumns, numWarnings int32 = 0, 0, 0, 0, 0, 0, 0 for srcTableId, srcSchema := range conv.SrcSchema { if len(srcSchema.PrimaryKeys) == 0 { numMissingPrimaryKey++ } _, cols, warnings := reports.AnalyzeCols(conv, srcTableId) numColumns += int32(cols) numWarnings += int32(warnings) } for _, table := range conv.SpSchema { numForeignKey += int32(len(table.ForeignKeys)) numIndexes += int32(len(table.Indexes)) depth := 0 tableId := table.Id for conv.SpSchema[tableId].ParentTable.Id != "" { numInterleaves++ depth++ tableId = conv.SpSchema[tableId].ParentTable.Id } maxInterleaveDepth = int32(math.Max(float64(maxInterleaveDepth), float64(depth))) } return &migration.MigrationData_SchemaPatterns{ NumTables: &numTables, NumForeignKey: &numForeignKey, NumInterleaves: &numInterleaves, MaxInterleaveDepth: &maxInterleaveDepth, NumIndexes: &numIndexes, NumMissingPrimaryKey: &numMissingPrimaryKey, NumColumns: &numColumns, NumWarnings: &numWarnings, } } // getMigrationDataSourceDetails returns source database type and // source connection mechanism in migrationData object func getMigrationDataSourceDetails(driver string, migrationData *migration.MigrationData) (*migration.MigrationData_SourceConnectionMechanism, *migration.MigrationData_Source) { switch driver { case constants.PGDUMP: return migration.MigrationData_DB_DUMP.Enum(), migration.MigrationData_POSTGRESQL.Enum() case constants.MYSQLDUMP: return migration.MigrationData_DB_DUMP.Enum(), migration.MigrationData_MYSQL.Enum() case constants.POSTGRES: return migration.MigrationData_DIRECT_CONNECTION.Enum(), migration.MigrationData_POSTGRESQL.Enum() case constants.MYSQL: return migration.MigrationData_DIRECT_CONNECTION.Enum(), migration.MigrationData_MYSQL.Enum() case constants.DYNAMODB: return migration.MigrationData_DIRECT_CONNECTION.Enum(), migration.MigrationData_DYNAMODB.Enum() case constants.ORACLE: return migration.MigrationData_DIRECT_CONNECTION.Enum(), migration.MigrationData_ORACLE.Enum() case constants.SQLSERVER: return migration.MigrationData_DIRECT_CONNECTION.Enum(), migration.MigrationData_SQL_SERVER.Enum() case constants.CSV: return migration.MigrationData_FILE.Enum(), migration.MigrationData_CSV.Enum() default: return migration.MigrationData_SOURCE_CONNECTION_MECHANISM_UNSPECIFIED.Enum(), migration.MigrationData_SOURCE_UNSPECIFIED.Enum() } } // CreateDataflowShardMonitoringDashboard returns a monitoring dashboard for a single shard func (resourceIds MonitoringMetricsResources) CreateDataflowShardMonitoringDashboard(ctx context.Context) (*dashboardpb.Dashboard, error) { var mosaicGroups = []MosaicGroup{ {groupTitle: fmt.Sprintf("Dataflow Job: %s", resourceIds.DataflowJobId), groupCreateTileFunction: createShardDataflowMetrics}, {groupTitle: fmt.Sprintf("Datastream: %s", resourceIds.DatastreamId), groupCreateTileFunction: createShardDatastreamMetrics}, {groupTitle: fmt.Sprintf("GCS Bucket: %s", resourceIds.JobMetadataGcsBucket), groupCreateTileFunction: createShardGcsMetrics}, {groupTitle: fmt.Sprintf("Pubsub: %s", resourceIds.PubsubSubscriptionId), groupCreateTileFunction: createShardPubsubMetrics}, {groupTitle: fmt.Sprintf("Spanner: instances/%s/databases/%s", resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId), groupCreateTileFunction: createSpannerMetrics}, } var dashboardDisplayName string if resourceIds.ShardId != "" { dashboardDisplayName = fmt.Sprintf("Migration Shard %s", resourceIds.ShardId) } else { dashboardDisplayName = fmt.Sprintf("Migration Dashboard %s", resourceIds.MigrationRequestId) } createDashboardReq := getCreateMonitoringDashboardRequest(resourceIds, createShardIndependentTopMetrics, mosaicGroups, nil, dashboardDisplayName) client := getDashboardClient(ctx) if client == nil { return nil, fmt.Errorf("dashboard client could not be created") } resp, err := client.CreateDashboard(ctx, createDashboardReq) if err != nil { return nil, err } return resp, err } // CreateDataflowAggMonitoringDashboard returns a monitoring dashboard for a sharded migration, aggregated across all shards func (resourceIds MonitoringMetricsResources) CreateDataflowAggMonitoringDashboard(ctx context.Context) (*dashboardpb.Dashboard, error) { var mosaicGroups = []MosaicGroup{ {groupTitle: "Summary of Dataflow Jobs", groupCreateTileFunction: createAggDataflowMetrics}, {groupTitle: "Summary of Datastreams", groupCreateTileFunction: createAggDatastreamMetrics}, {groupTitle: "Summary of GCS Buckets", groupCreateTileFunction: createAggGcsMetrics}, {groupTitle: "Summary of Pubsubs", groupCreateTileFunction: createAggPubsubMetrics}, {groupTitle: fmt.Sprintf("Spanner: instances/%s/databases/%s", resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId), groupCreateTileFunction: createSpannerMetrics}, } createDashboardReq := getCreateMonitoringDashboardRequest(resourceIds, createAggIndependentTopMetrics, mosaicGroups, createAggIndependentBottomMetrics, fmt.Sprintf("Migration Aggregated %s", resourceIds.MigrationRequestId)) client := getDashboardClient(ctx) if client == nil { return nil, fmt.Errorf("dashboard client could not be created") } resp, err := client.CreateDashboard(ctx, createDashboardReq) if err != nil { return nil, err } return resp, err }