sources/spanner/infoschema.go (452 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package spanner import ( "context" "fmt" "math" "sort" "strconv" "strings" "cloud.google.com/go/spanner" spannerclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/client" "github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api" _ "github.com/lib/pq" // we will use database/sql package instead of using this package directly "google.golang.org/api/iterator" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" ) // InfoSchemaImpl postgres specific implementation for InfoSchema. type InfoSchemaImpl struct { Client *spanner.Client SpannerClient spannerclient.SpannerClient Ctx context.Context SpDialect string } func NewInfoSchemaImplWithSpannerClient(ctx context.Context, dbURI string, spDialect string) (*InfoSchemaImpl, error) { spannerClient, err := spannerclient.NewSpannerClientImpl(ctx, dbURI) if err != nil { return nil, err } return &InfoSchemaImpl{SpannerClient: spannerClient, Ctx: ctx, SpDialect: spDialect}, nil } // GetToDdl function below implement the common.InfoSchema interface. func (isi InfoSchemaImpl) GetToDdl() common.ToDdl { return ToDdlImpl{} } // We leave the 5 functions below empty to be able to pass this as an infoSchema interface. We don't need these for now. func (isi InfoSchemaImpl) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error { return nil } // GetRowCount returns the row count of the table. func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error) { q := "SELECT count(*) FROM " + table.Name + ";" stmt := spanner.Statement{ SQL: q, } iter := isi.Client.Single().Query(isi.Ctx, stmt) defer iter.Stop() var count int64 row, err := iter.Next() if err == iterator.Done { return 0, nil } if err != nil { return count, err } row.Columns(&count) return count, err } func (isi InfoSchemaImpl) GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error) { return nil, nil } func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error) { return nil, nil } func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { return internal.DataflowOutput{}, nil } // GetTableName returns table name. func (isi InfoSchemaImpl) GetTableName(schema string, tableName string) string { if isi.SpDialect == constants.DIALECT_POSTGRESQL { if schema == "public" { // Drop public prefix for pg spanner. return tableName } } else { if schema == "" { return tableName } } return fmt.Sprintf("%s.%s", schema, tableName) } // GetTables return list of tables in the selected database. func (isi InfoSchemaImpl) GetTables() ([]common.SchemaAndName, error) { q := `SELECT table_schema, table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema = ''` if isi.SpDialect == constants.DIALECT_POSTGRESQL { q = `SELECT table_schema, table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema = 'public'` } stmt := spanner.Statement{SQL: q} var iter spannerclient.RowIterator if isi.SpannerClient != nil { iter = isi.SpannerClient.Single().Query(isi.Ctx, stmt) } else { iter = isi.Client.Single().Query(isi.Ctx, stmt) } defer iter.Stop() var tableSchema, tableName string var tables []common.SchemaAndName for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return nil, fmt.Errorf("couldn't get tables: %w", err) } err = row.Columns(&tableSchema, &tableName) if err != nil { return nil, err } tables = append(tables, common.SchemaAndName{Schema: tableSchema, Name: tableName}) } return tables, nil } func (sp *InfoSchemaImpl) PopulateSpannerSchema(ctx context.Context, conv *internal.Conv) error { processSchema := common.ProcessSchemaImpl{} expressionVerificationAccessor, _ := expressions_api.NewExpressionVerificationAccessorImpl(ctx, conv.SpProjectId, conv.SpInstanceId) ddlVerifier, err := expressions_api.NewDDLVerifierImpl(ctx, conv.SpProjectId, conv.SpInstanceId) if err != nil { return fmt.Errorf("error trying create ddl verifier: %v", err) } schemaToSpanner := common.SchemaToSpannerImpl{ DdlV: ddlVerifier, ExpressionVerificationAccessor: expressionVerificationAccessor, } err = processSchema.ProcessSchema(conv, sp, common.DefaultWorkers, internal.AdditionalSchemaAttributes{IsSharded: false}, &schemaToSpanner, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) if err != nil { return fmt.Errorf("error trying to read and convert spanner schema: %v", err) } parentTables, err := sp.GetInterleaveTables(conv.SpSchema) if err != nil { // We should ideally throw an error here as it could potentially cause a lot of failed writes. // We raise an unexpected error for now to make it compatible with the integration tests. // In the emulator, the interleave_type column in not supported hence the query fails. conv.Unexpected(fmt.Sprintf("error trying to fetch interleave table info from schema: %v", err)) } // Assign parents if any. for tableName, parentTable := range parentTables { tableId, _ := internal.GetTableIdFromSpName(conv.SpSchema, tableName) spTable := conv.SpSchema[tableId] spTable.ParentTable.Id = parentTable.Id spTable.ParentTable.OnDelete = parentTable.OnDelete conv.SpSchema[tableId] = spTable } return nil } // GetColumns returns a list of Column objects and names func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error) { q := `SELECT column_name, spanner_type, is_nullable FROM information_schema.columns WHERE table_schema = '' AND table_name = @p1 ORDER BY ordinal_position;` if isi.SpDialect == constants.DIALECT_POSTGRESQL { q = `SELECT column_name, spanner_type, is_nullable FROM information_schema.columns WHERE table_schema = 'public' AND table_name = $1 ORDER BY ordinal_position;` } stmt := spanner.Statement{ SQL: q, Params: map[string]interface{}{ "p1": table.Name, }, } var iter spannerclient.RowIterator if isi.SpannerClient != nil { iter = isi.SpannerClient.Single().Query(isi.Ctx, stmt) } else { iter = isi.Client.Single().Query(isi.Ctx, stmt) } defer iter.Stop() colDefs := make(map[string]schema.Column) var colIds []string var colName, spannerType, isNullable string for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return nil, nil, fmt.Errorf("couldn't get column info for table %s: %s", table.Name, err) } err = row.Columns(&colName, &spannerType, &isNullable) if err != nil { return nil, nil, fmt.Errorf("cannot read row for table %s while reading columns: %s", table.Name, err) } ignored := schema.Ignored{} for _, c := range constraints[colName] { switch c { case "CHECK": ignored.Check = true case "FOREIGN KEY", "PRIMARY KEY", "UNIQUE": // Nothing to do here -- these are handled elsewhere. } } colId := internal.GenerateColumnId() c := schema.Column{ Id: colId, Name: colName, Type: toType(spannerType), NotNull: common.ToNotNull(conv, isNullable), } colDefs[colId] = c colIds = append(colIds, colId) } return colDefs, colIds, nil } // GetConstraints returns a list of primary keys and by-column map of // other constraints. Note: we need to preserve ordinal order of // columns in primary key constraints. // Note that foreign key constraints are handled in getForeignKeys. func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, []schema.CheckConstraint, map[string][]string, error) { q := `SELECT k.column_name, t.constraint_type FROM information_schema.table_constraints AS t INNER JOIN information_schema.KEY_COLUMN_USAGE AS k ON t.constraint_name = k.constraint_name AND t.constraint_schema = k.constraint_schema WHERE k.table_schema = '' AND k.table_name = @p1 ORDER BY k.ordinal_position;` if isi.SpDialect == constants.DIALECT_POSTGRESQL { q = `SELECT k.column_name, t.constraint_type FROM information_schema.table_constraints AS t INNER JOIN information_schema.KEY_COLUMN_USAGE AS k ON t.constraint_name = k.constraint_name AND t.constraint_schema = k.constraint_schema WHERE k.table_schema = 'public' AND k.table_name = $1 ORDER BY k.ordinal_position;` } stmt := spanner.Statement{ SQL: q, Params: map[string]interface{}{ "p1": table.Name, }, } var iter spannerclient.RowIterator if isi.SpannerClient != nil { iter = isi.SpannerClient.Single().Query(isi.Ctx, stmt) } else { iter = isi.Client.Single().Query(isi.Ctx, stmt) } defer iter.Stop() var primaryKeys []string var col, constraint string m := make(map[string][]string) for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return nil, nil, nil, fmt.Errorf("couldn't get row while reading constraints: %w", err) } err = row.Columns(&col, &constraint) if err != nil { return nil, nil, nil, err } if col == "" || constraint == "" { conv.Unexpected("Got empty col or constraint") continue } switch constraint { case "PRIMARY KEY": primaryKeys = append(primaryKeys, col) default: m[col] = append(m[col], constraint) } } return primaryKeys, nil, m, nil } // GetForeignKeys returns a list of all the foreign key constraints. func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error) { q := `SELECT k.constraint_name, k.column_name, c.table_name, c.column_name FROM information_schema.key_column_usage AS k JOIN information_schema.constraint_column_usage AS c ON k.constraint_name = c.constraint_name JOIN information_schema.table_constraints AS t ON k.constraint_name = t.constraint_name WHERE t.constraint_type='FOREIGN KEY' AND t.table_schema = '' AND t.table_name = @p1 ORDER BY k.constraint_name, k.ordinal_position;` if isi.SpDialect == constants.DIALECT_POSTGRESQL { q = `SELECT k.constraint_name, k.column_name, c.table_name, c.column_name FROM information_schema.key_column_usage AS k JOIN information_schema.constraint_column_usage AS c ON k.constraint_name = c.constraint_name JOIN information_schema.table_constraints AS t ON k.constraint_name = t.constraint_name WHERE t.constraint_type='FOREIGN KEY' AND t.table_schema = 'public' AND t.table_name = $1 ORDER BY k.constraint_name, k.ordinal_position;` } stmt := spanner.Statement{ SQL: q, Params: map[string]interface{}{ "p1": table.Name, }, } var iter spannerclient.RowIterator if isi.SpannerClient != nil { iter = isi.SpannerClient.Single().Query(isi.Ctx, stmt) } else { iter = isi.Client.Single().Query(isi.Ctx, stmt) } defer iter.Stop() var col, refCol, fKeyName, refTable string fKeys := make(map[string]common.FkConstraint) var keyNames []string for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return nil, fmt.Errorf("couldn't get row while fetching foreign keys: %w", err) } err = row.Columns(&fKeyName, &col, &refTable, &refCol) if err != nil { return nil, err } if _, found := fKeys[fKeyName]; found { fk := fKeys[fKeyName] fk.Cols = append(fk.Cols, col) fk.Refcols = append(fk.Refcols, refCol) fKeys[fKeyName] = fk continue } fKeys[fKeyName] = common.FkConstraint{Name: fKeyName, Table: isi.GetTableName(table.Schema, refTable), Refcols: []string{refCol}, Cols: []string{col}} keyNames = append(keyNames, fKeyName) } sort.Strings(keyNames) for _, k := range keyNames { // The query returns a crypted result for multi-col FKs. Currently for a FK from (a,b,c) -> (x,y,z), // the returned rows like (a,x), (a,y), (a,z), (b,x), (b,y), (b,z), (c,x), (c,y), (c,z). // Need to reduce it to (a,x), (b,y), (c,z). The logic below does that. n := int(math.Sqrt(float64(len(fKeys[k].Cols)))) cols, refcols := []string{}, []string{} for i := 0; i < n; i++ { cols = append(cols, fKeys[k].Cols[i*n]) refcols = append(refcols, fKeys[k].Refcols[i]) } foreignKeys = append(foreignKeys, schema.ForeignKey{ Id: internal.GenerateForeignkeyId(), Name: fKeys[k].Name, ColumnNames: cols, ReferTableName: fKeys[k].Table, ReferColumnNames: refcols}) } return foreignKeys, nil } // GetIndexes returns a list of Indexes per table. func (isi InfoSchemaImpl) GetIndexes(conv *internal.Conv, table common.SchemaAndName, colNameIdMap map[string]string) ([]schema.Index, error) { q := `SELECT distinct c.INDEX_NAME,c.COLUMN_NAME,c.ORDINAL_POSITION,c.COLUMN_ORDERING,i.IS_UNIQUE FROM information_schema.index_columns AS c JOIN information_schema.indexes AS i ON c.INDEX_NAME=i.INDEX_NAME WHERE c.table_schema = '' AND i.INDEX_TYPE='INDEX' AND c.TABLE_NAME = @p1 ORDER BY c.INDEX_NAME, c.ORDINAL_POSITION;` if isi.SpDialect == constants.DIALECT_POSTGRESQL { q = `SELECT distinct c.INDEX_NAME,c.COLUMN_NAME,c.ORDINAL_POSITION,c.COLUMN_ORDERING,i.IS_UNIQUE FROM information_schema.index_columns AS c JOIN information_schema.indexes AS i ON c.INDEX_NAME=i.INDEX_NAME WHERE c.table_schema = 'public' AND i.INDEX_TYPE='INDEX' AND c.TABLE_NAME = $1 ORDER BY c.INDEX_NAME, c.ORDINAL_POSITION;` } stmt := spanner.Statement{ SQL: q, Params: map[string]interface{}{ "p1": table.Name, }, } var iter spannerclient.RowIterator if isi.SpannerClient != nil { iter = isi.SpannerClient.Single().Query(isi.Ctx, stmt) } else { iter = isi.Client.Single().Query(isi.Ctx, stmt) } defer iter.Stop() var name, column, ordering string var isUnique bool var isPgUnique string var sequence int64 indexMap := make(map[string]schema.Index) var indexNames []string var indexes []schema.Index for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return nil, fmt.Errorf("couldn't read row while fetching interleaved tables: %w", err) } if isi.SpDialect == constants.DIALECT_POSTGRESQL { err = row.Columns(&name, &column, &sequence, &ordering, &isPgUnique) if err != nil { fmt.Println(err) conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } } else { err = row.Columns(&name, &column, &sequence, &ordering, &isUnique) if err != nil { fmt.Println(err) conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } } isUnique = isPgUnique == "YES" if _, found := indexMap[name]; !found { indexNames = append(indexNames, name) indexMap[name] = schema.Index{ Id: internal.GenerateIndexesId(), Name: name, Unique: isUnique} } index := indexMap[name] index.Keys = append(index.Keys, schema.Key{ ColId: colNameIdMap[column], Desc: (ordering == "DESC")}) indexMap[name] = index } for _, k := range indexNames { indexes = append(indexes, indexMap[k]) } return indexes, nil } func (isi InfoSchemaImpl) GetInterleaveTables(spSchema ddl.Schema) (map[string]ddl.InterleavedParent, error) { q := `SELECT table_name, parent_table_name, on_delete_action FROM information_schema.tables WHERE interleave_type = 'IN PARENT' AND table_type = 'BASE TABLE' AND table_schema = ''` if isi.SpDialect == constants.DIALECT_POSTGRESQL { q = `SELECT table_name, parent_table_name, on_delete_action FROM information_schema.tables WHERE interleave_type = 'IN PARENT' AND table_type = 'BASE TABLE' AND table_schema = 'public'` } stmt := spanner.Statement{SQL: q} var iter spannerclient.RowIterator if isi.SpannerClient != nil { iter = isi.SpannerClient.Single().Query(isi.Ctx, stmt) } else { iter = isi.Client.Single().Query(isi.Ctx, stmt) } defer iter.Stop() var tableName, parentTableName, onDelete string parentTables := map[string]ddl.InterleavedParent{} for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return nil, fmt.Errorf("couldn't read row while fetching interleaved tables: %w", err) } err = row.Columns(&tableName, &parentTableName, &onDelete) if err != nil { return nil, err } parentTableId, _ := internal.GetTableIdFromSpName(spSchema, parentTableName) parentTables[tableName] = ddl.InterleavedParent{Id: parentTableId, OnDelete: onDelete} } return parentTables, nil } func toType(dataType string) schema.Type { switch { case strings.Contains(dataType, "ARRAY"): typeLenStr := dataType[(strings.Index(dataType, "<") + 1):(len(dataType) - 1)] schemaType := toType(typeLenStr) schemaType.ArrayBounds = []int64{-1} return schemaType case strings.Contains(dataType, "("): idx := strings.Index(dataType, "(") typeLenStr := dataType[(idx + 1):(len(dataType) - 1)] var typeLen int64 if typeLenStr == "MAX" { typeLen = ddl.MaxLength } else { typeLen, _ = strconv.ParseInt(typeLenStr, 10, 64) } return schema.Type{Name: dataType[:idx], Mods: []int64{typeLen}} default: return schema.Type{Name: dataType} } }