sources/oracle/infoschema.go (438 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package oracle import ( "context" "database/sql" "fmt" "sort" "strings" sp "cloud.google.com/go/spanner" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/profiles" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" "github.com/GoogleCloudPlatform/spanner-migration-tool/streaming" ) type InfoSchemaImpl struct { DbName string Db *sql.DB MigrationProjectId string SourceProfile profiles.SourceProfile TargetProfile profiles.TargetProfile } // GetToDdl function below implement the common.InfoSchema interface. func (isi InfoSchemaImpl) GetToDdl() common.ToDdl { return ToDdlImpl{} } // GetTableName returns table name. func (isi InfoSchemaImpl) GetTableName(dbName string, tableName string) string { return tableName } // GetRowsFromTable returns a sql Rows object for a table. func (isi InfoSchemaImpl) GetRowsFromTable(conv *internal.Conv, tableId string) (interface{}, error) { tbl := conv.SrcSchema[tableId] srcCols := tbl.ColIds if len(srcCols) == 0 { conv.Unexpected(fmt.Sprintf("Couldn't get source columns for table %s ", tbl.Name)) return nil, nil } q := getSelectQuery(isi.DbName, tbl.Schema, tbl.Name, tbl.ColIds, tbl.ColDefs) rows, err := isi.Db.Query(q) return rows, err } func getSelectQuery(srcDb string, schemaName string, tableName string, colIds []string, colDefs map[string]schema.Column) string { var selects = make([]string, len(colIds)) for i, colId := range colIds { cn := colDefs[colId].Name var s string if TimestampReg.MatchString(colDefs[colId].Type.Name) { s = fmt.Sprintf(`SYS_EXTRACT_UTC("%s") AS "%s"`, cn, cn) } else if len(colDefs[colId].Type.ArrayBounds) == 1 { s = fmt.Sprintf(`(SELECT JSON_ARRAYAGG(COLUMN_VALUE RETURNING VARCHAR2(4000)) FROM TABLE ("%s"."%s")) AS "%s"`, tableName, cn, cn) } else { switch colDefs[colId].Type.Name { case "NUMBER": s = fmt.Sprintf(`TO_CHAR("%s") AS "%s"`, cn, cn) case "XMLTYPE": s = fmt.Sprintf(`CAST(XMLTYPE.getStringVal("%s") AS VARCHAR2(4000)) AS "%s"`, cn, cn) case "SDO_GEOMETRY": s = fmt.Sprintf(`SDO_UTIL.TO_WKTGEOMETRY("%s") AS "%s"`, cn, cn) case "OBJECT": s = fmt.Sprintf(` ( CASE WHEN "%s" IS NULL THEN '' ELSE XMLTYPE("%s").getStringVal() END ) AS "%s" `, cn, cn, cn) default: s = fmt.Sprintf(`"%s"`, cn) } } selects[i] = s } return fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, strings.Join(selects, ", "), schemaName, tableName) } // ProcessData performs data conversion for source database. func (isi InfoSchemaImpl) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, commonColIds []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error { srcTableName := conv.SrcSchema[tableId].Name rowsInterface, err := isi.GetRowsFromTable(conv, tableId) if err != nil { conv.Unexpected(fmt.Sprintf("Couldn't get data for table %s : err = %s", srcTableName, err)) return err } rows := rowsInterface.(*sql.Rows) defer rows.Close() srcCols, _ := rows.Columns() v, scanArgs := buildVals(len(srcCols)) colNameIdMap := internal.GetSrcColNameIdMap(conv.SrcSchema[tableId]) for rows.Next() { // get RawBytes from data. err := rows.Scan(scanArgs...) if err != nil { conv.Unexpected(fmt.Sprintf("Couldn't process sql data row: %s", err)) // Scan failed, so we don't have any data to add to bad rows. conv.StatsAddBadRow(srcTableName, conv.DataMode()) continue } values := valsToStrings(v) newValues, err := common.PrepareValues(conv, tableId, colNameIdMap, commonColIds, srcCols, values) if err != nil { conv.Unexpected(fmt.Sprintf("Error while converting data: %s\n", err)) conv.StatsAddBadRow(srcTableName, conv.DataMode()) conv.CollectBadRow(srcTableName, srcCols, values) continue } ProcessDataRow(conv, tableId, commonColIds, srcSchema, spSchema, newValues) } return nil } // GetRowCount with number of rows in each table. func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error) { q := fmt.Sprintf(`SELECT count(*) FROM "%s"`, table.Name) rows, err := isi.Db.Query(q) if err != nil { return 0, err } defer rows.Close() var count int64 if rows.Next() { err := rows.Scan(&count) return count, err } return 0, nil } func (isi InfoSchemaImpl) GetTables() ([]common.SchemaAndName, error) { q := fmt.Sprintf("SELECT table_name FROM all_tables WHERE owner = '%s'", isi.DbName) rows, err := isi.Db.Query(q) if err != nil { return nil, fmt.Errorf("couldn't get tables: %w", err) } defer rows.Close() var tableName string var tables []common.SchemaAndName for rows.Next() { rows.Scan(&tableName) tables = append(tables, common.SchemaAndName{Schema: isi.DbName, Name: tableName}) } return tables, nil } // GetColumns returns a list of Column objects and names func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error) { q := fmt.Sprintf(` SELECT column_name, data_type, nullable, data_default, data_length, data_precision, data_scale, at.typecode, act.elem_type_name, act.length, act.precision, act.scale FROM all_tab_columns atc LEFT JOIN all_types at ON atc.data_type=at.type_name AND atc.owner = at.owner LEFT JOIN all_coll_types act ON atc.data_type=act.type_name AND atc.owner = at.owner WHERE atc.owner = '%s' AND atc.table_name = '%s' `, table.Schema, table.Name) cols, err := isi.Db.Query(q) if err != nil { return nil, nil, fmt.Errorf("couldn't get schema for table %s.%s: %s", table.Schema, table.Name, err) } defer cols.Close() colDefs := make(map[string]schema.Column) var colIds []string var colName, dataType string var isNullable string var colDefault, typecode, elementDataType sql.NullString var charMaxLen, numericPrecision, numericScale, elementCharMaxLen, elementNumericPrecision, elementNumericScale sql.NullInt64 for cols.Next() { err := cols.Scan(&colName, &dataType, &isNullable, &colDefault, &charMaxLen, &numericPrecision, &numericScale, &typecode, &elementDataType, &elementCharMaxLen, &elementNumericPrecision, &elementNumericScale) if err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } ignored := schema.Ignored{} for _, c := range constraints[colName] { // Type of constraint definition in oracle C (check constraint on a table) // P (primary key), U (unique key) ,R (referential integrity), V (with check option, on a view) // O (with read only, on a view). // We've already filtered out PRIMARY KEY. switch c { case "C": ignored.Check = true // Oracle 21c introduces a JSON datatype, before that we used to store json as VARCHAR2, CLOB, and BLOB. // If column has check constraints IS JSON(check for J in constraints array as per GetConstraints function) then update src datatype to JSON // so toSpannerTypeInternal function map this datatype to spanner JSON. case "J": dataType = "JSON" charMaxLen.Valid = false } } if typecode.Valid && typecode.String == "OBJECT" { dataType = "OBJECT" charMaxLen.Valid = false } ignored.Default = colDefault.Valid colId := internal.GenerateColumnId() c := schema.Column{ Id: colId, Name: colName, Type: toType(dataType, typecode, elementDataType, charMaxLen, numericPrecision, numericScale, elementCharMaxLen, elementNumericPrecision, elementNumericScale), NotNull: strings.ToUpper(isNullable) == "N", Ignored: ignored, } colDefs[colId] = c colIds = append(colIds, colId) } return colDefs, colIds, nil } // GetConstraints returns a list of primary keys and by-column map of // other constraints. Note: we need to preserve ordinal order of // columns in primary key constraints. // Note that foreign key constraints are handled in getForeignKeys. func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, []schema.CheckConstraint, map[string][]string, error) { q := fmt.Sprintf(` SELECT k.column_name, t.constraint_type, t.search_condition FROM all_constraints t INNER JOIN all_cons_columns k ON (k.constraint_name = t.constraint_name) WHERE t.owner = '%s' AND k.table_name = '%s' `, table.Schema, table.Name) rows, err := isi.Db.Query(q) if err != nil { return nil, nil, nil, err } defer rows.Close() var primaryKeys []string var col, constraint string var condition sql.NullString m := make(map[string][]string) for rows.Next() { err := rows.Scan(&col, &constraint, &condition) if err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } if col == "" || constraint == "" { conv.Unexpected("Got empty col or constraint") continue } // P (primary key) constraint in oracle switch constraint { case "P": primaryKeys = append(primaryKeys, col) case "C": // If column has IS JSON check constraints then add extra string "J" in constraints array. // condition value example `column_name IS JSON`,null etc. if condition.Valid && strings.Contains(condition.String, "IS JSON") { m[col] = append(m[col], "J") } m[col] = append(m[col], constraint) default: m[col] = append(m[col], constraint) } } return primaryKeys, nil, m, nil } // GetForeignKeys return list all the foreign keys constraints. func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error) { q := fmt.Sprintf(` SELECT B.table_name AS ref_table, A.column_name AS col_name, B.column_name AS ref_col_name, A.constraint_name AS name FROM all_cons_columns A JOIN all_constraints C ON A.owner = C.owner AND A.constraint_name = C.constraint_name JOIN all_cons_columns B ON B.owner = C.owner AND B.constraint_name = C.r_constraint_name WHERE A.table_name='%s' AND A.owner='%s' `, table.Name, isi.DbName) rows, err := isi.Db.Query(q) if err != nil { return nil, err } defer rows.Close() var col, refCol, refTable, fKeyName string fKeys := make(map[string]common.FkConstraint) var keyNames []string for rows.Next() { err := rows.Scan(&refTable, &col, &refCol, &fKeyName) if err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } if _, found := fKeys[fKeyName]; found { fk := fKeys[fKeyName] fk.Cols = append(fk.Cols, col) fk.Refcols = append(fk.Refcols, refCol) fKeys[fKeyName] = fk continue } fKeys[fKeyName] = common.FkConstraint{Name: fKeyName, Table: refTable, Refcols: []string{refCol}, Cols: []string{col}} keyNames = append(keyNames, fKeyName) } sort.Strings(keyNames) for _, k := range keyNames { foreignKeys = append(foreignKeys, schema.ForeignKey{ Id: internal.GenerateForeignkeyId(), Name: fKeys[k].Name, ColumnNames: fKeys[k].Cols, ReferTableName: fKeys[k].Table, ReferColumnNames: fKeys[k].Refcols}) } return foreignKeys, nil } // GetIndexes return a list of all indexes for the specified table. // Oracle db support several types of index: // 1. Normal indexes. (By default, Oracle Database creates B-tree indexes.) // 2.Bitmap indexes // 3.Partitioned indexes // 4. Function-based indexes // 5.Domain indexes, // we are only considering normal index as of now. func (isi InfoSchemaImpl) GetIndexes(conv *internal.Conv, table common.SchemaAndName, colNameIdMap map[string]string) ([]schema.Index, error) { q := fmt.Sprintf(` SELECT IC.index_name, IC.column_name, IC.column_position, IC.descend, I.uniqueness, IE.column_expression, I.index_type FROM all_ind_columns IC LEFT JOIN all_ind_expressions IE ON IC.index_name = IE.index_name AND IC.column_position=IE.column_position AND IC.index_owner = IE.index_owner LEFT JOIN all_indexes I ON IC.index_name = I.index_name AND I.table_owner = IC.index_owner WHERE IC.index_owner='%s' AND IC.table_name='%s' ORDER BY IC.index_name, IC.column_position `, table.Schema, table.Name) rows, err := isi.Db.Query(q) if err != nil { return nil, err } defer rows.Close() var name, column, sequence, Unique, indexType string var collation, colexpression sql.NullString indexMap := make(map[string]schema.Index) var indexNames []string ignoredIndex := make(map[string]bool) var indexes []schema.Index for rows.Next() { if err := rows.Scan(&name, &column, &sequence, &collation, &Unique, &colexpression, &indexType); err != nil { conv.Unexpected(fmt.Sprintf("Can't scan: %v", err)) continue } // ingnore all index except normal // UPPER("EMAIL") check for the function call with "(",")" if indexType != "NORMAL" && strings.Contains(colexpression.String, "(") && strings.Contains(colexpression.String, ")") { ignoredIndex[name] = true } //INDEX1_LAST SYS_NC00009$ 1 DESC NONUNIQUE "LAST_NAME" FUNCTION-BASED NORMAL // DESC column make index functional index but as special case we included that // and update column name with column expression if colexpression.Valid && !strings.Contains(colexpression.String, "(") && !strings.Contains(colexpression.String, ")") { column = colexpression.String[1 : len(colexpression.String)-1] } if _, found := indexMap[name]; !found { indexNames = append(indexNames, name) indexMap[name] = schema.Index{ Id: internal.GenerateIndexesId(), Name: name, Unique: (Unique == "UNIQUE")} } index := indexMap[name] index.Keys = append(index.Keys, schema.Key{ ColId: colNameIdMap[column], Desc: (collation.Valid && collation.String == "DESC")}) indexMap[name] = index } for _, k := range indexNames { // only add noraml index if _, found := ignoredIndex[k]; !found { indexes = append(indexes, indexMap[k]) } } return indexes, nil } // StartChangeDataCapture is used for automatic triggering of Datastream job when // performing a streaming migration. func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error) { mp := make(map[string]interface{}) var ( schemaDetails map[string]internal.SchemaDetails err error ) commonInfoSchema := common.InfoSchemaImpl{} schemaDetails, err = commonInfoSchema.GetIncludedSrcTablesFromConv(conv) streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Oracle.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, schemaDetails) if err != nil { return nil, fmt.Errorf("error reading streaming config: %v", err) } pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User, constants.REGULAR_GCS) if err != nil { return nil, fmt.Errorf("error creating pubsub resources: %v", err) } streamingCfg.PubsubCfg = *pubsubCfg dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User, constants.DLQ_GCS) if err != nil { return nil, fmt.Errorf("error creating pubsub resources: %v", err) } streamingCfg.DlqPubsubCfg = *dlqPubsubCfg streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err } mp["streamingCfg"] = streamingCfg return mp, err } // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, migrationProjectId string, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) dfOutput, err := streaming.StartDataflow(ctx, migrationProjectId, isi.TargetProfile, streamingCfg, conv) if err != nil { return internal.DataflowOutput{}, err } return dfOutput, nil } func toType(dataType string, typecode, elementDataType sql.NullString, charLen sql.NullInt64, numericPrecision, numericScale, elementCharMaxLen, elementNumericPrecision, elementNumericScale sql.NullInt64) schema.Type { switch { case typecode.Valid && typecode.String == "COLLECTION": return modifyType(elementDataType.String, elementCharMaxLen, elementNumericPrecision, elementNumericScale, true) default: return modifyType(dataType, charLen, numericPrecision, numericScale, false) } } func modifyType(dataType string, charLen sql.NullInt64, numericPrecision, numericScale sql.NullInt64, isArray bool) schema.Type { var t schema.Type switch { case dataType == "NUMBER" && numericPrecision.Valid && numericScale.Valid && numericScale.Int64 != 0: t = schema.Type{Name: dataType, Mods: []int64{numericPrecision.Int64, numericScale.Int64}} case dataType == "NUMBER" && numericPrecision.Valid: t = schema.Type{Name: dataType, Mods: []int64{numericPrecision.Int64}} // Oracle get column query return data length for the Number type. case dataType != "NUMBER" && charLen.Valid: t = schema.Type{Name: dataType, Mods: []int64{charLen.Int64}} default: t = schema.Type{Name: dataType} } if isArray { t.ArrayBounds = []int64{-1} return t } return t } // buildVals constructs []sql.RawBytes value containers to scan row // results into. Returns both the underlying containers (as a slice) // as well as an interface{} of pointers to containers to pass to // rows.Scan. func buildVals(n int) (v []sql.RawBytes, iv []interface{}) { v = make([]sql.RawBytes, n) // rows.Scan wants '[]interface{}' as an argument, so we must copy the // references into such a slice. iv = make([]interface{}, len(v)) for i := range v { iv[i] = &v[i] } return v, iv } func valsToStrings(vals []sql.RawBytes) []string { toString := func(val sql.RawBytes) string { if val == nil { return "NULL" } return string(val) } var s []string for _, v := range vals { s = append(s, toString(v)) } return s }