internal/mapping.go (215 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"fmt"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
)
// GetSpannerTable maps a source DB table name into a legal Spanner table
// name. Note that source DB column names can be essentially any string, but
// Spanner column names must use a limited character set. This means that
// getSpannerTable may have to change a name to make it legal, we must ensure
// that:
// a) the new table name is legal
// b) the new table name doesn't clash with other Spanner table names
// c) we consistently return the same name for this table.
//
// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
// distinct and should not differ only in case.
func GetSpannerTable(conv *Conv, tableId string) (string, error) {
if tableId == "" {
return "", fmt.Errorf("bad parameter: table-id string is empty")
}
if sp, found := conv.SpSchema[tableId]; found {
return sp.Name, nil
}
srcTableName := conv.SrcSchema[tableId].Name
spTableName := GetSpannerValidName(conv, srcTableName)
if spTableName != srcTableName {
VerbosePrintf("Mapping source DB table %s to Spanner table %s\n", srcTableName, spTableName)
logger.Log.Debug(fmt.Sprintf("Mapping source DB table %s to Spanner table %s\n", srcTableName, spTableName))
}
return spTableName, nil
}
// GetSpannerCol maps a source DB table/column into a legal Spanner column
// name. If mustExist is true, we return error if the column is new.
// Note that source DB column names can be essentially any string, but
// Spanner column names must use a limited character set. This means that
// getSpannerCol may have to change a name to make it legal, we must ensure
// that:
// a) the new col name is legal
// b) the new col name doesn't clash with other col names in the same table
// c) we consistently return the same name for the same col.
func GetSpannerCol(conv *Conv, tableId, colId string, spColDef map[string]ddl.ColumnDef) (string, error) {
if tableId == "" {
return "", fmt.Errorf("bad parameter: table id string is empty")
}
if colId == "" {
return "", fmt.Errorf("bad parameter: column id string is empty")
}
if spCol, found := spColDef[colId]; found {
return spCol.Name, nil
}
srcTable := conv.SrcSchema[tableId]
srcColName := srcTable.ColDefs[colId].Name
spColName, _ := FixName(srcColName)
usedColNames := map[string]bool{}
for _, spCol := range spColDef {
usedColNames[spCol.Name] = true
}
if _, found := usedColNames[spColName]; found {
// spColName has been used before i.e. FixName caused a collision.
// Add unique postfix: use number of cols in this table so far.
// However, there is a chance this has already been used,
// so need to iterate
id := len(spColDef)
for {
c := spColName + "_" + strconv.Itoa(id)
if _, found := usedColNames[c]; !found {
spColName = c
break
}
id++
}
}
if spColName != srcColName {
VerbosePrintf("Mapping source DB col %s (table %s) to Spanner col %s\n", srcColName, srcTable.Name, spColName)
logger.Log.Debug(fmt.Sprintf("Mapping source DB col %s (table %s) to Spanner col %s\n", srcColName, srcTable.Name, spColName))
}
return spColName, nil
}
// GetSpannerCols maps a slice of source columns into their corresponding
// Spanner columns using GetSpannerCol.
func GetSpannerCols(conv *Conv, tableId string, srcCols []string) ([]string, error) {
var spCols []string
for _, srcColName := range srcCols {
colId, err := GetColIdFromSrcName(conv.SrcSchema[tableId].ColDefs, srcColName)
if err != nil {
return nil, err
}
spCol, err := GetSpannerCol(conv, tableId, colId, conv.SpSchema[tableId].ColDefs)
if err != nil {
return nil, err
}
spCols = append(spCols, spCol)
}
return spCols, nil
}
// ToSpannerForeignKey maps source foreign key name to
// legal Spanner foreign key name.
// If the srcKeyName is empty string we can just return
// empty string without error.
// If the srcKeyName is not empty we need to make sure
// of the following things:
// a) the new foreign key name is legal
// b) the new foreign key name doesn't clash with other Spanner
//
// foreign key names
//
// Note that foreign key constraint names in Spanner have to be globally unique
// (across the database). But in some source databases, such as PostgreSQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerForeignKey(conv *Conv, srcFkName string) string {
if srcFkName == "" {
return ""
}
return GetSpannerValidName(conv, srcFkName)
}
// ToSpannerOnDelete maps the source ON DELETE action
// to the corresponding Spanner compatible action.
// The following mapping is followed:
// a) CASCADE/NO ACTION -> mapped to the same as source action
// b) all others -> NO ACTION (default)
//
// For all source actions converted to a different action,
// an issue is appended to it's TableLevelIssues to
// generate a warning message for the user
//
// Since only MySQL and PostgreSQL have this functionality
// as of yet, for other sources OnDelete fields are
// kept empty i.e. "" is mapped to "" and an issue is appended
// to generate warning message (prints only once for each table with FK Actions)
func ToSpannerOnDelete(conv *Conv, srcTableId string, srcDeleteRule string) string {
srcDeleteRule = strings.ToUpper(srcDeleteRule)
if srcDeleteRule == constants.FK_NO_ACTION || srcDeleteRule == constants.FK_CASCADE {
return srcDeleteRule
}
if conv.SchemaIssues == nil {
conv.SchemaIssues = make(map[string]TableIssues)
}
tableIssues := conv.SchemaIssues[srcTableId]
// srcDeleteRule will only be empty for unsupported sources (Oracle, SQL Server)
if srcDeleteRule == "" {
//add ForeignKeyActionNotSupported issue only if not previously added
if !Contains(tableIssues.TableLevelIssues, ForeignKeyActionNotSupported) {
conv.SchemaIssues[srcTableId] = TableIssues{
TableLevelIssues: append(tableIssues.TableLevelIssues, ForeignKeyActionNotSupported),
ColumnLevelIssues: tableIssues.ColumnLevelIssues,
}
}
return ""
}
conv.SchemaIssues[srcTableId] = TableIssues{
TableLevelIssues: append(tableIssues.TableLevelIssues, ForeignKeyOnDelete),
ColumnLevelIssues: tableIssues.ColumnLevelIssues}
return constants.FK_NO_ACTION
}
// ToSpannerOnUpdate maps the source ON UPDATE action
// to the corresponding Spanner compatible action.
// The following mapping is followed:
// all actions -> NO ACTION (default)
// (Spanner only supports ON UPDATE NO ACTION)
//
// For all source actions converted to a different action,
// an issue is appended to it's TableLevelIssues to
// generate a warning message for the user
//
// Since only MySQL and PostgreSQL have this functionality
// as of yet, for other sources OnUpdate fields are
// kept empty i.e. "" is mapped to "" and an issue is appended
// to generate warning message (prints only once for each table with FK Actions)
func ToSpannerOnUpdate(conv *Conv, srcTableId string, srcUpdateRule string) string {
srcUpdateRule = strings.ToUpper(srcUpdateRule)
if srcUpdateRule == constants.FK_NO_ACTION {
return srcUpdateRule
}
if conv.SchemaIssues == nil {
conv.SchemaIssues = make(map[string]TableIssues)
}
tableIssues := conv.SchemaIssues[srcTableId]
// srcUpdateRule will only be empty for unsupported sources (Oracle, SQL Server)
if srcUpdateRule == "" {
//add ForeignKeyActionNotSupported issue only if not previously added
if !Contains(tableIssues.TableLevelIssues, ForeignKeyActionNotSupported) {
conv.SchemaIssues[srcTableId] = TableIssues{
TableLevelIssues: append(tableIssues.TableLevelIssues, ForeignKeyActionNotSupported),
ColumnLevelIssues: tableIssues.ColumnLevelIssues,
}
}
return ""
}
conv.SchemaIssues[srcTableId] = TableIssues{
TableLevelIssues: append(tableIssues.TableLevelIssues, ForeignKeyOnUpdate),
ColumnLevelIssues: tableIssues.ColumnLevelIssues}
return constants.FK_NO_ACTION
}
// ToSpannerIndexName maps source index name to legal Spanner index name.
// We need to make sure of the following things:
// a) the new index name is legal
// b) the new index name doesn't clash with other Spanner
//
// index names
//
// Note that index key constraint names in Spanner have to be globally unique
// (across the database). But in some source databases, such as MySQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerIndexName(conv *Conv, srcIndexName string) string {
return GetSpannerValidName(conv, srcIndexName)
}
// Note that the check constraints names in spanner have to be globally unique
// (across the database). But in some source databases, such as MySQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerCheckConstraintName(conv *Conv, srcCheckConstraintName string) string {
return GetSpannerValidName(conv, srcCheckConstraintName)
}
// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
// distinct and should not differ only in case.
func GetSpannerValidName(conv *Conv, srcName string) string {
spKeyName, _ := FixName(srcName)
if _, found := conv.UsedNames[strings.ToLower(spKeyName)]; found {
// spKeyName has been used before.
// Add unique postfix: use number of keys so far.
// However, there is a chance this has already been used,
// so need to iterate.
id := len(conv.UsedNames)
for {
c := spKeyName + "_" + strconv.Itoa(id)
if _, found := conv.UsedNames[strings.ToLower(c)]; !found {
spKeyName = c
break
}
id++
}
}
conv.UsedNames[strings.ToLower(spKeyName)] = true
return spKeyName
}
// ResolveRefs resolves all table and column references in foreign key constraints
// in the Spanner Schema. Note: Spanner requires that DDL references match
// the case of the referenced object, but this is not so for many source databases.
//
// TODO: Expand ResolveRefs to primary keys and indexes.
func ResolveRefs(conv *Conv) {
for table, spTable := range conv.SpSchema {
spTable.ForeignKeys = resolveFks(conv, table, spTable.ForeignKeys)
conv.SpSchema[table] = spTable
}
}
// resolveFks returns resolved version of fks.
// Foreign key constraints that can't be resolved are dropped.
func resolveFks(conv *Conv, table string, fks []ddl.Foreignkey) []ddl.Foreignkey {
var resolved []ddl.Foreignkey
for _, fk := range fks {
var err error
if fk.ColIds, err = resolveColRefs(conv, table, fk.ColIds); err != nil {
conv.Unexpected(fmt.Sprintf("Can't resolve Columns in foreign key constraint: %s", err))
delete(conv.UsedNames, strings.ToLower(fk.Name))
continue
}
if fk.ReferTableId, err = resolveTableRef(conv, fk.ReferTableId); err != nil {
conv.Unexpected(fmt.Sprintf("Can't resolve ReferTable in foreign key constraint: %s", err))
delete(conv.UsedNames, strings.ToLower(fk.Name))
continue
}
if fk.ReferColumnIds, err = resolveColRefs(conv, fk.ReferTableId, fk.ReferColumnIds); err != nil {
conv.Unexpected(fmt.Sprintf("Can't resolve ReferColumnIds in foreign key constraint: %s", err))
delete(conv.UsedNames, strings.ToLower(fk.Name))
continue
}
resolved = append(resolved, fk)
}
return resolved
}
func resolveTableRef(conv *Conv, tableRef string) (string, error) {
if _, ok := conv.SpSchema[tableRef]; ok {
return tableRef, nil
}
// Do case-insensitive search for tableRef.
tr := strings.ToLower(tableRef)
for t := range conv.SpSchema {
if strings.ToLower(t) == tr {
return t, nil
}
}
return "", fmt.Errorf("can't resolve table %v", tableRef)
}
func resolveColRefs(conv *Conv, tableRef string, colRefs []string) ([]string, error) {
table, err := resolveTableRef(conv, tableRef)
if err != nil {
return nil, err
}
resolveColRef := func(colRef string) (string, error) {
if _, ok := conv.SpSchema[table].ColDefs[colRef]; ok {
return colRef, nil
}
// Do case-insensitive search for colRef.
cr := strings.ToLower(colRef)
for _, c := range conv.SpSchema[table].ColIds {
if strings.ToLower(c) == cr {
return c, nil
}
}
return "", fmt.Errorf("can't resolve column: table=%v, column=%v", tableRef, colRef)
}
var cols []string
for _, colRef := range colRefs {
c, err := resolveColRef(colRef)
if err != nil {
return nil, err
}
cols = append(cols, c)
}
return cols, nil
}