sources/common/toddl.go (534 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. /* Package common creates an outline for common functionality across the multiple source databases we support. While adding new methods or code here 1. Ensure that the changes do not adversely impact any source that uses the common code 2. Test cases might not sufficiently cover all cases, so integration and manual testing should be done ensure no functionality is breaking. Most of the test cases that cover the code in this package will lie in the implementing source databases, so it might not be required to have unit tests for each method here. 3. Any functions added here should be used by two or more databases 4. If it looks like the code is getting more complex due to refactoring, it is probably better off leaving the functionality out of common */ package common import ( "context" "fmt" "reflect" "strconv" "strings" "unicode" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" ) // ToDdl interface is meant to be implemented by all sources. When support for a // new target database is added, please add a new method here with the output // type expected. In case a particular source to target transoformation is not // supported, an error is to be returned by the corresponding method. type ToDdl interface { ToSpannerType(conv *internal.Conv, spType string, srcType schema.Type, isPk bool) (ddl.Type, []internal.SchemaIssue) GetColumnAutoGen(conv *internal.Conv, autoGenCol ddl.AutoGenCol, colId string, tableId string) (*ddl.AutoGenCol, error) } type SchemaToSpannerInterface interface { SchemaToSpannerDDL(conv *internal.Conv, toddl ToDdl, attributes internal.AdditionalSchemaAttributes) error SchemaToSpannerDDLHelper(conv *internal.Conv, toddl ToDdl, srcTable schema.Table, isRestore bool) error SchemaToSpannerSequenceHelper(conv *internal.Conv, srcSequence ddl.Sequence) error } type SchemaToSpannerImpl struct { ExpressionVerificationAccessor expressions_api.ExpressionVerificationAccessor DdlV expressions_api.DDLVerifier } var ErrorTypeMapping = map[string]internal.SchemaIssue{ "No matching signature for operator": internal.TypeMismatchError, "Syntax error": internal.InvalidConditionError, "Unrecognized name": internal.ColumnNotFoundError, "Function not found": internal.CheckConstraintFunctionNotFoundError, "unhandled error": internal.GenericError, } // SchemaToSpannerDDL performs schema conversion from the source DB schema to // Spanner. It uses the source schema in conv.SrcSchema, and writes // the Spanner schema to conv.SpSchema. func (ss *SchemaToSpannerImpl) SchemaToSpannerDDL(conv *internal.Conv, toddl ToDdl, attributes internal.AdditionalSchemaAttributes) error { srcSequences := conv.SrcSequences for _, srcSequence := range srcSequences { ss.SchemaToSpannerSequenceHelper(conv, srcSequence) } tableIds := GetSortedTableIdsBySrcName(conv.SrcSchema) for _, tableId := range tableIds { srcTable := conv.SrcSchema[tableId] ss.SchemaToSpannerDDLHelper(conv, toddl, srcTable, false) } conv.AddPrimaryKeys() if attributes.IsSharded { conv.AddShardIdColumn() } if conv.Source == constants.MYSQL && conv.SpProjectId != "" && conv.SpInstanceId != "" { // Process and verify Spanner DDL expressions for MYSQL expressionDetails := ss.DdlV.GetSourceExpressionDetails(conv, tableIds) expressions, err := ss.DdlV.VerifySpannerDDL(conv, expressionDetails) if err != nil && !strings.Contains(err.Error(), "expressions either failed verification") { return err } spannerSchemaApplyExpressions(conv, expressions) } if (conv.Source == constants.MYSQL || conv.Source == constants.MYSQLDUMP) && conv.SpProjectId != "" && conv.SpInstanceId != "" { // Process and verify Check constraints for MySQL and MySQLDump flow only err := ss.VerifyExpressions(conv) if err != nil { return err } } internal.ResolveRefs(conv) return nil } // GenerateExpressionDetailList it will generate the expression detail list which is used in verify expression method as a input func GenerateExpressionDetailList(spschema ddl.Schema) []internal.ExpressionDetail { expressionDetailList := []internal.ExpressionDetail{} for _, sp := range spschema { for _, cc := range sp.CheckConstraints { expressionDetail := internal.ExpressionDetail{ Expression: cc.Expr, Type: "CHECK", ReferenceElement: internal.ReferenceElement{Name: sp.Name}, ExpressionId: cc.ExprId, Metadata: map[string]string{"tableId": sp.Id}, } expressionDetailList = append(expressionDetailList, expressionDetail) } } return expressionDetailList } // RemoveError it will reset the table issue before re-populating func RemoveError(tableIssues map[string]internal.TableIssues) map[string]internal.TableIssues { for tableId, TableIssues := range tableIssues { for _, issue := range ErrorTypeMapping { removedIssue := removeSchemaIssue(TableIssues.TableLevelIssues, issue) TableIssues.TableLevelIssues = removedIssue tableIssues[tableId] = TableIssues } } return tableIssues } // GetIssue it will collect all the error and return it func GetIssue(result internal.VerifyExpressionsOutput) (map[string][]internal.InvalidCheckExp, map[string][]string) { exprOutputsByTable := make(map[string][]internal.ExpressionVerificationOutput) issues := make(map[string][]internal.InvalidCheckExp) invalidExpIds := make(map[string][]string) for _, ev := range result.ExpressionVerificationOutputList { if !ev.Result { tableId := ev.ExpressionDetail.Metadata["tableId"] exprOutputsByTable[tableId] = append(exprOutputsByTable[tableId], ev) } } for tableId, exprOutputs := range exprOutputsByTable { for _, ev := range exprOutputs { var issue internal.SchemaIssue switch { case strings.Contains(ev.Err.Error(), "No matching signature for operator"): issue = internal.TypeMismatch case strings.Contains(ev.Err.Error(), "Syntax error") || strings.Contains(ev.Err.Error(), "syntax error at or near"): issue = internal.InvalidCondition case strings.Contains(ev.Err.Error(), "Unrecognized name"): issue = internal.ColumnNotFound case strings.Contains(ev.Err.Error(), "Function not found"): issue = internal.CheckConstraintFunctionNotFound default: issue = internal.GenericWarning } issues[tableId] = append(issues[tableId], internal.InvalidCheckExp{ IssueType: issue, Expression: ev.ExpressionDetail.Expression, }) invalidExpIds[tableId] = append(invalidExpIds[tableId], ev.ExpressionDetail.ExpressionId) } } return issues, invalidExpIds } // GetErroredIssue it will collect all the error and return it func GetErroredIssue(result internal.VerifyExpressionsOutput) map[string][]internal.InvalidCheckExp { exprOutputsByTable := make(map[string][]internal.ExpressionVerificationOutput) issues := make(map[string][]internal.InvalidCheckExp) for _, ev := range result.ExpressionVerificationOutputList { if !ev.Result { tableId := ev.ExpressionDetail.Metadata["tableId"] exprOutputsByTable[tableId] = append(exprOutputsByTable[tableId], ev) } } for tableId, exprOutputs := range exprOutputsByTable { for _, ev := range exprOutputs { var issue internal.SchemaIssue switch { case strings.Contains(ev.Err.Error(), "No matching signature for operator"): issue = internal.TypeMismatchError case strings.Contains(ev.Err.Error(), "Syntax error") || strings.Contains(ev.Err.Error(), "syntax error at or near"): issue = internal.InvalidConditionError case strings.Contains(ev.Err.Error(), "Unrecognized name"): issue = internal.ColumnNotFoundError case strings.Contains(ev.Err.Error(), "Function not found"): issue = internal.CheckConstraintFunctionNotFoundError default: issue = internal.GenericError } issues[tableId] = append(issues[tableId], internal.InvalidCheckExp{ IssueType: issue, Expression: ev.ExpressionDetail.Expression, }) } } return issues } // RemoveCheckConstraint this method will remove the constraint which has error func RemoveCheckConstraint(checkConstraints []ddl.CheckConstraint, expId string) []ddl.CheckConstraint { var filteredConstraints []ddl.CheckConstraint for _, checkConstraint := range checkConstraints { if checkConstraint.ExprId != expId { filteredConstraints = append(filteredConstraints, checkConstraint) } } return filteredConstraints } // VerifyExpression this function will use expression_api to validate check constraint expressions and add the relevant error // to suggestion tab and remove the check constraint which has error func (ss *SchemaToSpannerImpl) VerifyExpressions(conv *internal.Conv) error { ctx := context.Background() spschema := conv.SpSchema verifyExpressionsInput := internal.VerifyExpressionsInput{ Conv: conv, Source: conv.Source, ExpressionDetailList: GenerateExpressionDetailList(spschema), } ss.ExpressionVerificationAccessor.RefreshSpannerClient(ctx, conv.SpProjectId, conv.SpInstanceId) if len(verifyExpressionsInput.ExpressionDetailList) != 0 { result := ss.ExpressionVerificationAccessor.VerifyExpressions(ctx, verifyExpressionsInput) if result.ExpressionVerificationOutputList == nil { return result.Err } issueTypes, invalidExpIds := GetIssue(result) if len(issueTypes) > 0 { for tableId, issues := range issueTypes { if conv.InvalidCheckExp == nil { conv.InvalidCheckExp = map[string][]internal.InvalidCheckExp{} conv.InvalidCheckExp[tableId] = []internal.InvalidCheckExp{} } invalidCheckExp := conv.InvalidCheckExp[tableId] invalidCheckExp = append(invalidCheckExp, issues...) conv.InvalidCheckExp[tableId] = invalidCheckExp for _, issue := range issues { if _, exists := conv.SchemaIssues[tableId]; !exists { conv.SchemaIssues[tableId] = internal.TableIssues{ TableLevelIssues: []internal.SchemaIssue{}, } } tableIssue := conv.SchemaIssues[tableId] if !IsSchemaIssuePresent(tableIssue.TableLevelIssues, issue.IssueType) { tableIssue.TableLevelIssues = append(tableIssue.TableLevelIssues, issue.IssueType) } conv.SchemaIssues[tableId] = tableIssue } } } if len(invalidExpIds) > 0 { for tableId, expressionIdList := range invalidExpIds { for _, expId := range expressionIdList { spschema := conv.SpSchema[tableId] spschema.CheckConstraints = RemoveCheckConstraint(spschema.CheckConstraints, expId) conv.SpSchema[tableId] = spschema } } } } return nil } // IsSchemaIssuePresent checks if issue is present in the given schemaissue list. func IsSchemaIssuePresent(schemaissue []internal.SchemaIssue, issue internal.SchemaIssue) bool { for _, s := range schemaissue { if s == issue { return true } } return false } func (ss *SchemaToSpannerImpl) SchemaToSpannerDDLHelper(conv *internal.Conv, toddl ToDdl, srcTable schema.Table, isRestore bool) error { spTableName, err := internal.GetSpannerTable(conv, srcTable.Id) if err != nil { conv.Unexpected(fmt.Sprintf("Couldn't map source table %s to Spanner: %s", srcTable.Name, err)) return err } var spColIds []string spColDef := make(map[string]ddl.ColumnDef) var ( totalNonKeyColumnSize int tableLevelIssues []internal.SchemaIssue ) columnLevelIssues := make(map[string][]internal.SchemaIssue) // Iterate over columns using ColNames order. for _, srcColId := range srcTable.ColIds { srcCol := srcTable.ColDefs[srcColId] colName, err := internal.GetSpannerCol(conv, srcTable.Id, srcCol.Id, spColDef) if err != nil { conv.Unexpected(fmt.Sprintf("Couldn't map source column %s of table %s to Spanner: %s", srcTable.Name, srcCol.Name, err)) continue } spColIds = append(spColIds, srcColId) isPk := IsPrimaryKey(srcColId, srcTable) ty, issues := toddl.ToSpannerType(conv, "", srcCol.Type, isPk) // TODO(hengfeng): add issues for all elements of srcCol.Ignored. if srcCol.Ignored.ForeignKey { issues = append(issues, internal.ForeignKey) } _, isChanged := internal.FixName(srcCol.Name) if isChanged && (srcCol.Name != colName) { issues = append(issues, internal.IllegalName) } if srcCol.Ignored.Default { issues = append(issues, internal.DefaultValue) } if srcCol.Ignored.AutoIncrement { // TODO(adibh) - check why this is not there in postgres issues = append(issues, internal.AutoIncrement) } // Set the not null constraint to false for unsupported source datatypes isNotNull := srcCol.NotNull if findSchemaIssue(issues, internal.NoGoodType) != -1 { isNotNull = false } // Set the not null constraint to false for array datatype and add a warning because // datastream does not support array datatypes. if ty.IsArray { issues = append(issues, internal.ArrayTypeNotSupported) isNotNull = false } // Set auto generation for column srcAutoGen := srcCol.AutoGen var autoGenCol *ddl.AutoGenCol = &ddl.AutoGenCol{} if srcAutoGen.Name != "" { autoGenCol, err = toddl.GetColumnAutoGen(conv, srcAutoGen, srcColId, srcTable.Id) if autoGenCol != nil { if err != nil { srcCol.Ignored.AutoIncrement = true issues = append(issues, internal.AutoIncrement) } else { issues = append(issues, internal.SequenceCreated) } } } if len(issues) > 0 { columnLevelIssues[srcColId] = issues } spColDef[srcColId] = ddl.ColumnDef{ Name: colName, T: ty, NotNull: isNotNull, Comment: "From: " + quoteIfNeeded(srcCol.Name) + " " + srcCol.Type.Print(), Id: srcColId, AutoGen: *autoGenCol, } if !checkIfColumnIsPartOfPK(srcColId, srcTable.PrimaryKeys) { totalNonKeyColumnSize += getColumnSize(ty.Name, ty.Len) } } if totalNonKeyColumnSize > ddl.MaxNonKeyColumnLength { tableLevelIssues = append(tableLevelIssues, internal.RowLimitExceeded) } conv.SchemaIssues[srcTable.Id] = internal.TableIssues{ TableLevelIssues: tableLevelIssues, ColumnLevelIssues: columnLevelIssues, } comment := "Spanner schema for source table " + quoteIfNeeded(srcTable.Name) conv.SpSchema[srcTable.Id] = ddl.CreateTable{ Name: spTableName, ColIds: spColIds, ColDefs: spColDef, PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys), ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore), CheckConstraints: cvtCheckConstraint(conv, srcTable.CheckConstraints), Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef), Comment: comment, Id: srcTable.Id, } return nil } func (ss *SchemaToSpannerImpl) SchemaToSpannerSequenceHelper(conv *internal.Conv, srcSequence ddl.Sequence) error { switch srcSequence.SequenceKind { case constants.AUTO_INCREMENT: spSequence := ddl.Sequence{ Name: srcSequence.Name, Id: srcSequence.Id, SequenceKind: "BIT REVERSED POSITIVE", SkipRangeMin: srcSequence.SkipRangeMin, SkipRangeMax: srcSequence.SkipRangeMax, StartWithCounter: srcSequence.StartWithCounter, } conv.SpSequences[srcSequence.Id] = spSequence default: spSequence := ddl.Sequence{ Name: srcSequence.Name, Id: srcSequence.Id, SequenceKind: "BIT REVERSED POSITIVE", SkipRangeMin: srcSequence.SkipRangeMin, SkipRangeMax: srcSequence.SkipRangeMax, StartWithCounter: srcSequence.StartWithCounter, } conv.SpSequences[srcSequence.Id] = spSequence } return nil } func quoteIfNeeded(s string) string { for _, r := range s { if unicode.IsLetter(r) || unicode.IsDigit(r) || unicode.IsPunct(r) { continue } return strconv.Quote(s) } return s } func cvtPrimaryKeys(srcKeys []schema.Key) []ddl.IndexKey { var spKeys []ddl.IndexKey for _, k := range srcKeys { spKeys = append(spKeys, ddl.IndexKey{ColId: k.ColId, Desc: k.Desc, Order: k.Order}) } return spKeys } func cvtForeignKeys(conv *internal.Conv, spTableName string, srcTableId string, srcKeys []schema.ForeignKey, isRestore bool) []ddl.Foreignkey { var spKeys []ddl.Foreignkey for _, key := range srcKeys { spKey, err := CvtForeignKeysHelper(conv, spTableName, srcTableId, key, isRestore) if err != nil { continue } spKeys = append(spKeys, spKey) } return spKeys } // cvtCheckConstraint converts check constraints from source to Spanner. func cvtCheckConstraint(conv *internal.Conv, srcKeys []schema.CheckConstraint) []ddl.CheckConstraint { var spcc []ddl.CheckConstraint for _, cc := range srcKeys { spcc = append(spcc, ddl.CheckConstraint{ Id: cc.Id, Name: internal.ToSpannerCheckConstraintName(conv, cc.Name), Expr: cc.Expr, ExprId: cc.ExprId, }) } return spcc } func CvtForeignKeysHelper(conv *internal.Conv, spTableName string, srcTableId string, srcKey schema.ForeignKey, isRestore bool) (ddl.Foreignkey, error) { if len(srcKey.ColIds) != len(srcKey.ReferColumnIds) { conv.Unexpected(fmt.Sprintf("ConvertForeignKeys: ColIds and referColumns don't have the same lengths: len(columns)=%d, len(referColumns)=%d for source tableId: %s, referenced table: %s", len(srcKey.ColIds), len(srcKey.ReferColumnIds), srcTableId, srcKey.ReferTableId)) return ddl.Foreignkey{}, fmt.Errorf("ConvertForeignKeys: columns and referColumns don't have the same lengths") } // check whether spanner refer table exist or not. _, isPresent := conv.SpSchema[srcKey.ReferTableId] if !isPresent && isRestore { return ddl.Foreignkey{}, nil } // check whether source refer table exist or not. _, isPresent = conv.SrcSchema[srcKey.ReferTableId] if !isPresent { conv.Unexpected(fmt.Sprintf("Can't map foreign key for source tableId: %s, referenced tableId: %s", srcTableId, srcKey.ReferTableId)) return ddl.Foreignkey{}, fmt.Errorf("reference table not found") } var spColIds, spReferColIds []string for i, colId := range srcKey.ColIds { spColIds = append(spColIds, colId) spReferColIds = append(spReferColIds, srcKey.ReferColumnIds[i]) } spKeyName := internal.ToSpannerForeignKey(conv, srcKey.Name) spDeleteRule := internal.ToSpannerOnDelete(conv, srcTableId, srcKey.OnDelete) spUpdateRule := internal.ToSpannerOnUpdate(conv, srcTableId, srcKey.OnUpdate) spKey := ddl.Foreignkey{ Name: spKeyName, ColIds: spColIds, ReferTableId: srcKey.ReferTableId, ReferColumnIds: spReferColIds, Id: srcKey.Id, OnDelete: spDeleteRule, OnUpdate: spUpdateRule, } return spKey, nil } func cvtIndexes(conv *internal.Conv, tableId string, srcIndexes []schema.Index, spColIds []string, spColDef map[string]ddl.ColumnDef) []ddl.CreateIndex { var spIndexes []ddl.CreateIndex for _, srcIndex := range srcIndexes { spIndex := CvtIndexHelper(conv, tableId, srcIndex, spColIds, spColDef) if (!reflect.DeepEqual(spIndex, ddl.CreateIndex{})) { spIndexes = append(spIndexes, spIndex) } } return spIndexes } func SrcTableToSpannerDDL(conv *internal.Conv, toddl ToDdl, srcTable schema.Table, ddlVerifier expressions_api.DDLVerifier) error { schemaToSpanner := SchemaToSpannerImpl{ DdlV: ddlVerifier, } err := schemaToSpanner.SchemaToSpannerDDLHelper(conv, toddl, srcTable, true) if err != nil { return err } for tableId, sourceTable := range conv.SrcSchema { if _, isPresent := conv.SpSchema[tableId]; !isPresent { continue } spTable := conv.SpSchema[tableId] if tableId != srcTable.Id { spTable.ForeignKeys = cvtForeignKeysForAReferenceTable(conv, tableId, srcTable.Id, sourceTable.ForeignKeys, spTable.ForeignKeys) conv.SpSchema[tableId] = spTable } } internal.ResolveRefs(conv) return nil } func cvtForeignKeysForAReferenceTable(conv *internal.Conv, tableId string, referTableId string, srcKeys []schema.ForeignKey, spKeys []ddl.Foreignkey) []ddl.Foreignkey { for _, key := range srcKeys { if key.ReferTableId == referTableId { spKey, err := CvtForeignKeysHelper(conv, conv.SpSchema[tableId].Name, tableId, key, true) if err != nil { continue } spKey.Id = key.Id spKeys = append(spKeys, spKey) } } return spKeys } func CvtIndexHelper(conv *internal.Conv, tableId string, srcIndex schema.Index, spColIds []string, spColDef map[string]ddl.ColumnDef) ddl.CreateIndex { var spKeys []ddl.IndexKey var spStoredColIds []string for _, k := range srcIndex.Keys { isPresent := false for _, v := range spColIds { if v == k.ColId { isPresent = true if conv.SpDialect == constants.DIALECT_POSTGRESQL { if spColDef[v].T.Name == ddl.Numeric { // index on NUMERIC is not supported in PGSQL Dialect currently. // Indexes which contains a NUMERIC column in it will need to be skipped. return ddl.CreateIndex{} } } break } } if !isPresent { conv.Unexpected(fmt.Sprintf("Can't map index key column for tableId %s columnId %s", tableId, k.ColId)) continue } spKeys = append(spKeys, ddl.IndexKey{ColId: k.ColId, Desc: k.Desc, Order: k.Order}) } for _, colId := range srcIndex.StoredColumnIds { isPresent := false for _, v := range spColIds { if v == colId { isPresent = true break } } if !isPresent { conv.Unexpected(fmt.Sprintf("Can't map index column for tableId %s columnId %s", tableId, colId)) continue } spStoredColIds = append(spStoredColIds, colId) } if srcIndex.Name == "" { // Generate a name if index name is empty in MySQL. // Collision of index name will be handled by ToSpannerIndexName. srcIndex.Name = fmt.Sprintf("Index_%s", conv.SrcSchema[tableId].Name) } spIndexName := internal.ToSpannerIndexName(conv, srcIndex.Name) spIndex := ddl.CreateIndex{ Name: spIndexName, TableId: tableId, Unique: srcIndex.Unique, Keys: spKeys, StoredColumnIds: spStoredColIds, Id: srcIndex.Id, } return spIndex } // Applies all valid expressions which can be migrated to spanner conv object func spannerSchemaApplyExpressions(conv *internal.Conv, expressions internal.VerifyExpressionsOutput) { for _, expression := range expressions.ExpressionVerificationOutputList { switch expression.ExpressionDetail.Type { case "DEFAULT": { tableId := expression.ExpressionDetail.Metadata["TableId"] columnId := expression.ExpressionDetail.Metadata["ColId"] if expression.Result { col := conv.SpSchema[tableId].ColDefs[columnId] col.DefaultValue = ddl.DefaultValue{ IsPresent: true, Value: ddl.Expression{ ExpressionId: expression.ExpressionDetail.ExpressionId, Statement: expression.ExpressionDetail.Expression, }, } conv.SpSchema[tableId].ColDefs[columnId] = col } else { colIssues := conv.SchemaIssues[tableId].ColumnLevelIssues[columnId] colIssues = append(colIssues, internal.DefaultValue) conv.SchemaIssues[tableId].ColumnLevelIssues[columnId] = colIssues } } } } }