assessment/sources/mysql/infoschema.go (430 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mysql import ( "database/sql" "fmt" "math" "strings" "github.com/GoogleCloudPlatform/spanner-migration-tool/assessment/utils" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" ) type InfoSchemaImpl struct { Db *sql.DB DbName string } type SourceSpecificComparisonImpl struct{} func (isi InfoSchemaImpl) GetTableInfo(conv *internal.Conv) (map[string]utils.TableAssessmentInfo, error) { tb := make(map[string]utils.TableAssessmentInfo) dbIdentifier := utils.DbIdentifier{ DatabaseName: isi.DbName, } var errString string for _, table := range conv.SrcSchema { columnAssessments := make(map[string]utils.ColumnAssessmentInfo[any]) var collation, charset string q := `SELECT TABLE_COLLATION, SUBSTRING_INDEX(TABLE_COLLATION, '_', 1) as CHARACTER_SET FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?;` err := isi.Db.QueryRow(q, isi.DbName, table.Name).Scan(&collation, &charset) if err != nil { errString = errString + fmt.Sprintf("couldn't get schema for table %s: %s", table.Name, err) } for _, column := range table.ColDefs { q = `SELECT c.column_type, c.extra, c.generation_expression FROM information_schema.COLUMNS c where table_schema = ? and table_name = ? and column_name = ? ORDER BY c.ordinal_position;` var columnType string var colExtra, colGeneratedExp sql.NullString var isOnUpdateTimestampSet, isVirtual, isPresent bool var generatedColumn utils.GeneratedColumnInfo err := isi.Db.QueryRow(q, isi.DbName, table.Name, column.Name).Scan(&columnType, &colExtra, &colGeneratedExp) if err != nil { errString = errString + fmt.Sprintf("couldn't get schema for column %s.%s: %s", table.Name, column.Name, err) } if strings.Contains(colExtra.String, "on update CURRENT_TIMESTAMP") { isOnUpdateTimestampSet = true } else if strings.Contains(colExtra.String, "VIRTUAL GENERATED") { isVirtual = true isPresent = true } else if strings.Contains(colExtra.String, "STORED GENERATED") { isPresent = true } if colGeneratedExp.Valid { generatedColumn = utils.GeneratedColumnInfo{ Statement: colGeneratedExp.String, IsPresent: isPresent, IsVirtual: isVirtual, } } columnAssessments[column.Id] = utils.ColumnAssessmentInfo[any]{ Db: utils.DbIdentifier{ DatabaseName: isi.DbName, }, Name: column.Name, TableName: table.Name, ColumnDef: column, IsUnsigned: strings.Contains(strings.ToLower(columnType), " unsigned"), MaxColumnSize: getColumnMaxSize(column.Type.Name, column.Type.Mods, charset), IsOnUpdateTimestampSet: isOnUpdateTimestampSet, GeneratedColumn: generatedColumn, } } tb[table.Id] = utils.TableAssessmentInfo{Name: table.Name, TableDef: table, ColumnAssessmentInfos: columnAssessments, Db: dbIdentifier, Charset: charset, Collation: collation} } if errString != "" { return tb, fmt.Errorf(errString) } return tb, nil } // GetIndexes return a list of all indexes for the specified table. func (isi InfoSchemaImpl) GetIndexInfo(table string, index schema.Index) (utils.IndexAssessmentInfo, error) { q := `SELECT DISTINCT INDEX_NAME,COLUMN_NAME,SEQ_IN_INDEX,COLLATION,NON_UNIQUE,INDEX_TYPE FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = ? ORDER BY INDEX_NAME, SEQ_IN_INDEX;` var name, column, sequence, nonUnique, indexType string var collation sql.NullString err := isi.Db.QueryRow(q, isi.DbName, table, index.Name).Scan(&name, &column, &sequence, &collation, &nonUnique, &indexType) if err != nil { return utils.IndexAssessmentInfo{}, fmt.Errorf("couldn't get index for index name %s.%s: %s", table, index.Name, err) } return utils.IndexAssessmentInfo{ Ty: indexType, Name: name, Db: utils.DbIdentifier{ DatabaseName: isi.DbName, }, IndexDef: index, }, nil } func (isi InfoSchemaImpl) GetTriggerInfo() ([]utils.TriggerAssessmentInfo, error) { q := `SELECT DISTINCT TRIGGER_NAME,EVENT_OBJECT_TABLE,ACTION_STATEMENT,ACTION_TIMING,EVENT_MANIPULATION FROM INFORMATION_SCHEMA.TRIGGERS WHERE EVENT_OBJECT_SCHEMA = ?` rows, err := isi.Db.Query(q, isi.DbName) if err != nil { return nil, err } defer rows.Close() var name, table, actionStmt, actionTiming, eventManipulation string var triggers []utils.TriggerAssessmentInfo var errString string for rows.Next() { if err := rows.Scan(&name, &table, &actionStmt, &actionTiming, &eventManipulation); err != nil { errString = errString + fmt.Sprintf("Can't scan: %v", err) continue } triggers = append(triggers, utils.TriggerAssessmentInfo{ Name: name, Operation: actionStmt, TargetTable: table, ActionTiming: actionTiming, EventManipulation: eventManipulation, Db: utils.DbIdentifier{ DatabaseName: isi.DbName, }, }) } if errString != "" { return triggers, fmt.Errorf(errString) } return triggers, nil } func (isi InfoSchemaImpl) GetStoredProcedureInfo() ([]utils.StoredProcedureAssessmentInfo, error) { q := `SELECT DISTINCT ROUTINE_NAME,ROUTINE_DEFINITION,IS_DETERMINISTIC FROM INFORMATION_SCHEMA.ROUTINES WHERE ROUTINE_TYPE='PROCEDURE' AND ROUTINE_SCHEMA = ?` rows, err := isi.Db.Query(q, isi.DbName) if err != nil { return nil, err } defer rows.Close() var name, defintion, isDeterministic string var storedProcedures []utils.StoredProcedureAssessmentInfo var errString string for rows.Next() { if err := rows.Scan(&name, &defintion, &isDeterministic); err != nil { errString = errString + fmt.Sprintf("Can't scan: %v", err) continue } storedProcedures = append(storedProcedures, utils.StoredProcedureAssessmentInfo{ Name: name, Definition: defintion, IsDeterministic: isDeterministic == "YES", Db: utils.DbIdentifier{ DatabaseName: isi.DbName, }, }) } if errString != "" { return storedProcedures, fmt.Errorf(errString) } return storedProcedures, nil } func (isi InfoSchemaImpl) GetFunctionInfo() ([]utils.FunctionAssessmentInfo, error) { q := `SELECT DISTINCT ROUTINE_NAME,ROUTINE_DEFINITION,IS_DETERMINISTIC, DTD_IDENTIFIER FROM INFORMATION_SCHEMA.ROUTINES WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA = ?` rows, err := isi.Db.Query(q, isi.DbName) if err != nil { return nil, err } defer rows.Close() var name, defintion, isDeterministic, datatype string var functions []utils.FunctionAssessmentInfo var errString string for rows.Next() { if err := rows.Scan(&name, &defintion, &isDeterministic, &datatype); err != nil { errString = errString + fmt.Sprintf("Can't scan: %v", err) continue } functions = append(functions, utils.FunctionAssessmentInfo{ Name: name, Definition: defintion, IsDeterministic: isDeterministic == "YES", Db: utils.DbIdentifier{ DatabaseName: isi.DbName, }, Datatype: datatype, }) } if errString != "" { return functions, fmt.Errorf(errString) } return functions, nil } func (isi InfoSchemaImpl) GetViewInfo() ([]utils.ViewAssessmentInfo, error) { q := `SELECT DISTINCT TABLE_NAME,VIEW_DEFINITION,CHECK_OPTION, IS_UPDATABLE FROM INFORMATION_SCHEMA.VIEWS WHERE TABLE_SCHEMA = ?` rows, err := isi.Db.Query(q, isi.DbName) if err != nil { return nil, err } defer rows.Close() var name, defintion, checkOption, isUpdatable string var views []utils.ViewAssessmentInfo var errString string for rows.Next() { if err := rows.Scan(&name, &defintion, &checkOption, &isUpdatable); err != nil { errString = errString + fmt.Sprintf("Can't scan: %v", err) continue } views = append(views, utils.ViewAssessmentInfo{ Name: name, Definition: defintion, CheckOption: checkOption, IsUpdatable: isUpdatable == "YES", Db: utils.DbIdentifier{ DatabaseName: isi.DbName, }, }) } if errString != "" { return views, fmt.Errorf(errString) } return views, nil } func getColumnMaxSize(dataType string, mods []int64, mysqlCharset string) int64 { dataTypeLower := strings.ToLower(dataType) bytesPerChar := int64(1) // Default for binary types or non-char types switch dataTypeLower { case "char", "varchar", "tinytext", "text", "mediumtext", "longtext": bytesPerChar = getMaxBytesPerChar(mysqlCharset) } switch dataTypeLower { case "date": return 4 case "timestamp", "datetime": return 8 case "bit": if len(mods) > 0 { return int64(math.Ceil(float64(mods[0]) / 8.0)) } return 1 case "tinyint": return 1 case "smallint": return 2 case "mediumint": return 3 case "int", "integer": return 4 case "bigint": return 8 case "float": return 4 case "double", "real": return 8 case "decimal", "numeric": if len(mods) > 0 { precision := mods[0] scale := int64(0) if len(mods) > 1 { scale = mods[1] } // Calculate storage based on precision and scale intDigits := precision - scale intBytes := (intDigits + 8) / 9 fracBytes := (scale + 8) / 9 return intBytes + fracBytes // Total size } return 8 // Default size if no precision/scale provided case "char": maxChars := int64(1) // Default for CHAR is CHAR(1) if len(mods) > 0 { maxChars = mods[0] } return maxChars * bytesPerChar case "varchar": maxChars := int64(0) if len(mods) > 0 { maxChars = mods[0] } else { maxChars = 255 } return maxChars * bytesPerChar case "binary", "varbinary": if len(mods) > 0 { return mods[0] } return 255 case "tinyblob": return 255 case "blob": return 65535 case "mediumblob": return 16777215 case "longblob": return 4294967295 case "tinytext": return 255 * bytesPerChar case "text": return 65535 * bytesPerChar case "mediumtext": return 16777215 * bytesPerChar case "longtext": return 4294967295 * bytesPerChar case "json": return 4294967295 default: return 4 } } func getMaxBytesPerChar(charset string) int64 { charsetLower := strings.ToLower(charset) switch charsetLower { case "binary", "latin1", "ascii", "cp850", "dec8", "hp8", "koi8r", "latin2", "swe7", "armscii8", "cp1250", "cp1251", "cp1256", "cp1257", "cp852", "cp866", "geostd8", "greek", "hebrew", "keybcs2", "koi8u", "latin5", "latin7", "macce", "macroman": return 1 case "big5", "gb2312", "gbk", "sjis", "ujis", "eucjpms", "euckr", "ucs2": return 2 case "utf8", "utf8mb3": return 3 case "utf8mb4", "utf16", "utf32", "gb18030": return 4 default: // Default to a common multi-byte max if unknown. return 4 } } func (ssa SourceSpecificComparisonImpl) IsDataTypeCodeCompatible(srcColumnDef utils.SrcColumnDetails, spColumnDef utils.SpColumnDetails) bool { switch strings.ToUpper(spColumnDef.Datatype) { case "BOOL": switch srcColumnDef.Datatype { case "tinyint": return true case "bit": return true default: return false } case "BYTES": switch srcColumnDef.Datatype { case "binary": return true case "varbinary": return true case "blob": return true default: return false } case "DATE": switch srcColumnDef.Datatype { case "date": return true default: return false } case "FLOAT32": switch srcColumnDef.Datatype { case "float": return true case "double": return true default: return false } case "FLOAT64": switch srcColumnDef.Datatype { case "float": return true case "double": return true default: return false } case "INT64": switch srcColumnDef.Datatype { case "int": return true case "bigint": return true default: return false } case "JSON": switch srcColumnDef.Datatype { case "json": return true case "varchar": return true default: return false } case "NUMERIC": switch srcColumnDef.Datatype { case "float": return true case "double": return true default: return false } case "STRING": switch srcColumnDef.Datatype { case "varchar": return true case "text": return true case "mediumtext": return true case "longtext": return true default: return false } case "TIMESTAMP": switch srcColumnDef.Datatype { case "timestamp": return true case "datetime": return true default: return false } default: return false } }