internal/convert.go (526 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package internal import ( "fmt" "sync" "time" "github.com/GoogleCloudPlatform/spanner-migration-tool/logger" "github.com/GoogleCloudPlatform/spanner-migration-tool/proto/migration" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" "go.uber.org/zap" "google.golang.org/genproto/googleapis/type/datetime" ) // Conv contains all schema and data conversion state. type Conv struct { mode mode // Schema mode or data mode. SpSchema ddl.Schema // Maps Spanner table name to Spanner schema. SyntheticPKeys map[string]SyntheticPKey // Maps Spanner table name to synthetic primary key (if needed). SrcSchema map[string]schema.Table // Maps source-DB table name to schema information. SchemaIssues map[string]TableIssues // Maps source-DB table/col to list of schema conversion issues. InvalidCheckExp map[string][]InvalidCheckExp // List of check constraint expressions and corresponding issues. ToSpanner map[string]NameAndCols `json:"-"` // Maps from source-DB table name to Spanner name and column mapping. ToSource map[string]NameAndCols `json:"-"` // Maps from Spanner table name to source-DB table name and column mapping. UsedNames map[string]bool `json:"-"` // Map storing the names that are already assigned to tables, indices or foreign key contraints. dataSink func(table string, cols []string, values []interface{}) DataFlush func() `json:"-"` // Data flush is used to flush out remaining writes and wait for them to complete. Location *time.Location // Timezone (for timestamp conversion). sampleBadRows rowSamples // Rows that generated errors during conversion. Stats stats `json:"-"` TimezoneOffset string // Timezone offset for timestamp conversion. SpDialect string // The dialect of the spanner database to which Spanner migration tool is writing. UniquePKey map[string][]string // Maps Spanner table name to unique column name being used as primary key (if needed). Audit Audit `json:"-"` // Stores the audit information for the database conversion Rules []Rule // Stores applied rules during schema conversion IsSharded bool // Flag denoting if the migration is sharded or not ConvLock sync.RWMutex `json:"-"` // ConvLock prevents concurrent map read/write operations. This lock will be used in all the APIs that either read or write elements to the conv object. SpRegion string // Leader Region for Spanner Instance ResourceValidation bool // Flag denoting if validation for resources to generated is complete UI bool // Flag if UI interface was used for migration. ToDo: Remove flag after resource generation is introduced to UI SpSequences map[string]ddl.Sequence // Maps Spanner Sequences to Sequence Schema SrcSequences map[string]ddl.Sequence // Maps source-DB Sequences to Sequence schema information SpProjectId string // Spanner Project Id SpInstanceId string // Spanner Instance Id Source string // Source Database type being migrated } type InvalidCheckExp struct { IssueType SchemaIssue Expression string } type TableIssues struct { ColumnLevelIssues map[string][]SchemaIssue TableLevelIssues []SchemaIssue } type AdditionalSchemaAttributes struct { IsSharded bool } type AdditionalDataAttributes struct { ShardId string } type mode int const ( schemaOnly mode = iota dataOnly ) // SyntheticPKey specifies a synthetic primary key and current sequence // count for a table, if needed. We use a synthetic primary key when // the source DB table has no primary key. type SyntheticPKey struct { ColId string Sequence int64 } // SchemaIssue specifies a schema conversion issue. type SchemaIssue int // Defines all of the schema issues we track. Includes issues // with type mappings, as well as features (such as source // DB constraints) that aren't supported in Spanner. const ( DefaultValue SchemaIssue = iota ForeignKey MissingPrimaryKey UniqueIndexPrimaryKey MultiDimensionalArray NoGoodType Numeric NumericThatFits Decimal DecimalThatFits Serial AutoIncrement Timestamp Datetime Widened Time StringOverflow HotspotTimestamp HotspotAutoIncrement RedundantIndex AutoIncrementIndex InterleaveIndex InterleavedNotInOrder InterleavedOrder InterleavedAddColumn IllegalName InterleavedRenameColumn InterleavedChangeColumnSize RowLimitExceeded ShardIdColumnAdded ShardIdColumnPrimaryKey ArrayTypeNotSupported ForeignKeyOnDelete ForeignKeyOnUpdate SequenceCreated ForeignKeyActionNotSupported NumericPKNotSupported TypeMismatch TypeMismatchError DefaultValueError InvalidCondition InvalidConditionError ColumnNotFound ColumnNotFoundError CheckConstraintFunctionNotFound CheckConstraintFunctionNotFoundError GenericError GenericWarning ) const ( ShardIdColumn = "migration_shard_id" SyntheticPrimaryKey = "synth_id" ) // NameAndCols contains the name of a table and its columns. // Used to map between source DB and Spanner table and column names. type NameAndCols struct { Name string Cols map[string]string } // FkeyAndIdxs contains the name of a table, its foreign keys and indexes // Used to map between source DB and spanner table name, foreign key name and index names. type FkeyAndIdxs struct { Name string ForeignKey map[string]string Index map[string]string } type rowSamples struct { rows []*row bytes int64 // Bytes consumed by l. bytesLimit int64 // Limit on bytes consumed by l. } // row represents a single data row for a table. Used for tracking bad data rows. type row struct { table string cols []string vals []string } // Note on rows, bad rows and good rows: a data row is either: // a) not processed (but still shows in rows) // b) successfully converted and successfully written to Spanner. // c) successfully converted, but an error occurs when writing the row to Spanner. // d) unsuccessfully converted (we won't try to write such rows to Spanner). type stats struct { Rows map[string]int64 // Count of rows encountered during processing (a + b + c + d), broken down by source table. GoodRows map[string]int64 // Count of rows successfully converted (b + c), broken down by source table. BadRows map[string]int64 // Count of rows where conversion failed (d), broken down by source table. Statement map[string]*statementStat // Count of processed statements, broken down by statement type. Unexpected map[string]int64 // Count of unexpected conditions, broken down by condition description. Reparsed int64 // Count of times we re-parse dump data looking for end-of-statement. } type statementStat struct { Schema int64 Data int64 Skip int64 Error int64 } // Stores the audit information of conversion. // Elements that do not affect the migration functionality but are relevant for the migration metadata. type Audit struct { SchemaConversionDuration time.Duration `json:"-"` // Duration of schema conversion. DataConversionDuration time.Duration `json:"-"` // Duration of data conversion. MigrationRequestId string `json:"-"` // Unique request id generated per migration MigrationType *migration.MigrationData_MigrationType `json:"-"` // Type of migration: Schema migration, data migration or schema and data migration DryRun bool `json:"-"` // Flag to identify if the migration is a dry run. StreamingStats streamingStats `json:"-"` // Stores information related to streaming migration process. Progress Progress `json:"-"` // Stores information related to progress of the migration progress SkipMetricsPopulation bool `json:"-"` // Flag to identify if outgoing metrics metadata needs to skipped } // Stores information related to generated Dataflow Resources. type DataflowResources struct { JobId string `json:"JobId"` GcloudCmd string `json:"GcloudCmd"` Region string `json:"Region"` } type GcsResources struct { BucketName string `json:"BucketName"` } // Stores information related to generated Datastream Resources. type DatastreamResources struct { DatastreamName string `json:"DatastreamName"` Region string `json:"Region"` } // Stores information related to generated Pubsub Resources. type PubsubResources struct { TopicId string SubscriptionId string NotificationId string BucketName string Region string } // Stores information related to Monitoring resources type MonitoringResources struct { DashboardName string `json:"DashboardName"` } type ShardResources struct { DatastreamResources DatastreamResources PubsubResources PubsubResources DlqPubsubResources PubsubResources DataflowResources DataflowResources GcsResources GcsResources MonitoringResources MonitoringResources } // Stores information related to the streaming migration process. type streamingStats struct { Streaming bool // Flag for confirmation of streaming migration. TotalRecords map[string]map[string]int64 // Tablewise count of records received for processing, broken down by record type i.e. INSERT, MODIFY & REMOVE. BadRecords map[string]map[string]int64 // Tablewise count of records not converted successfully, broken down by record type. DroppedRecords map[string]map[string]int64 // Tablewise count of records successfully converted but failed to written on Spanner, broken down by record type. SampleBadRecords []string // Records that generated errors during conversion. SampleBadWrites []string // Records that faced errors while writing to Cloud Spanner. DatastreamResources DatastreamResources DataflowResources DataflowResources PubsubResources PubsubResources DlqPubsubResources PubsubResources GcsResources GcsResources MonitoringResources MonitoringResources ShardToShardResourcesMap map[string]ShardResources AggMonitoringResources MonitoringResources } type PubsubCfg struct { TopicId string SubscriptionId string NotificationId string BucketName string } type DataflowOutput struct { JobID string GCloudCmd string } // Stores information related to rules during schema conversion type Rule struct { Id string Name string Type string ObjectType string AssociatedObjects string Enabled bool Data interface{} AddedOn datetime.DateTime } type Tables struct { TableList []string `json:"TableList"` } type SchemaDetails struct { TableDetails []TableDetails `json:TableDetails` } type TableDetails struct { TableName string `json:TableName` } type VerifyExpressionsInput struct { Conv *Conv Source string ExpressionDetailList []ExpressionDetail } type ExpressionDetail struct { ReferenceElement ReferenceElement ExpressionId string Expression string Type string Metadata map[string]string } type ReferenceElement struct { Name string } type ExpressionVerificationOutput struct { ExpressionDetail ExpressionDetail Result bool Err error } type VerifyExpressionsOutput struct { ExpressionVerificationOutputList []ExpressionVerificationOutput Err error } // MakeConv returns a default-configured Conv. func MakeConv() *Conv { return &Conv{ SpSchema: ddl.NewSchema(), SyntheticPKeys: make(map[string]SyntheticPKey), SrcSchema: make(map[string]schema.Table), SchemaIssues: make(map[string]TableIssues), ToSpanner: make(map[string]NameAndCols), ToSource: make(map[string]NameAndCols), UsedNames: make(map[string]bool), Location: time.Local, // By default, use go's local time, which uses $TZ (when set). sampleBadRows: rowSamples{bytesLimit: 10 * 1000 * 1000}, Stats: stats{ Rows: make(map[string]int64), GoodRows: make(map[string]int64), BadRows: make(map[string]int64), Statement: make(map[string]*statementStat), Unexpected: make(map[string]int64), }, TimezoneOffset: "+00:00", // By default, use +00:00 offset which is equal to UTC timezone UniquePKey: make(map[string][]string), Audit: Audit{ StreamingStats: streamingStats{}, MigrationType: migration.MigrationData_SCHEMA_ONLY.Enum(), }, Rules: []Rule{}, SpSequences: make(map[string]ddl.Sequence), SrcSequences: make(map[string]ddl.Sequence), } } func (conv *Conv) ResetStats() { conv.Stats = stats{ Rows: make(map[string]int64), GoodRows: make(map[string]int64), BadRows: make(map[string]int64), Statement: make(map[string]*statementStat), Unexpected: make(map[string]int64), } } // SetDataSink configures conv to use the specified data sink. func (conv *Conv) SetDataSink(ds func(table string, cols []string, values []interface{})) { conv.dataSink = ds } // Note on modes. // We process the dump output twice. In the first pass (schema mode) we // build the schema, and the second pass (data mode) we write data to // Spanner. // SetSchemaMode configures conv to process schema-related statements and // build the Spanner schema. In schema mode we also process just enough // of other statements to get an accurate count of the number of data rows // (used for tracking progress when writing data to Spanner). func (conv *Conv) SetSchemaMode() { conv.mode = schemaOnly } // SetDataMode configures conv to convert data and write it to Spanner. // In this mode, we also do a complete re-processing of all statements // for stats purposes (its hard to keep track of which stats are // collected in each phase, so we simply reset and recollect), // but we don't modify the schema. func (conv *Conv) SetDataMode() { conv.mode = dataOnly } // WriteRow calls dataSink and updates row stats. func (conv *Conv) WriteRow(srcTable, spTable string, spCols []string, spVals []interface{}) { if conv.Audit.DryRun { conv.statsAddGoodRow(srcTable, conv.DataMode()) } else if conv.dataSink == nil { msg := "Internal error: ProcessDataRow called but dataSink not configured" VerbosePrintf("%s\n", msg) logger.Log.Debug("Internal error: ProcessDataRow called but dataSink not configured") conv.Unexpected(msg) conv.StatsAddBadRow(srcTable, conv.DataMode()) } else { conv.dataSink(spTable, spCols, spVals) conv.statsAddGoodRow(srcTable, conv.DataMode()) } } // Rows returns the total count of data rows processed. func (conv *Conv) Rows() int64 { n := int64(0) for _, c := range conv.Stats.Rows { n += c } return n } // BadRows returns the total count of bad rows encountered during // data conversion. func (conv *Conv) BadRows() int64 { n := int64(0) for _, c := range conv.Stats.BadRows { n += c } return n } // Statements returns the total number of statements processed. func (conv *Conv) Statements() int64 { n := int64(0) for _, x := range conv.Stats.Statement { n += x.Schema + x.Data + x.Skip + x.Error } return n } // StatementErrors returns the number of statement errors encountered. func (conv *Conv) StatementErrors() int64 { n := int64(0) for _, x := range conv.Stats.Statement { n += x.Error } return n } // Unexpecteds returns the total number of distinct unexpected conditions // encountered during processing. func (conv *Conv) Unexpecteds() int64 { return int64(len(conv.Stats.Unexpected)) } // CollectBadRow updates the list of bad rows, while respecting // the byte limit for bad rows. func (conv *Conv) CollectBadRow(srcTable string, srcCols, vals []string) { r := &row{table: srcTable, cols: srcCols, vals: vals} bytes := byteSize(r) // Cap storage used by badRows. Keep at least one bad row. if len(conv.sampleBadRows.rows) == 0 || bytes+conv.sampleBadRows.bytes < conv.sampleBadRows.bytesLimit { conv.sampleBadRows.rows = append(conv.sampleBadRows.rows, r) conv.sampleBadRows.bytes += bytes } } // SampleBadRows returns a string-formatted list of rows that generated errors. // Returns at most n rows. func (conv *Conv) SampleBadRows(n int) []string { var l []string for _, x := range conv.sampleBadRows.rows { l = append(l, fmt.Sprintf("table=%s cols=%v data=%v\n", x.table, x.cols, x.vals)) if len(l) > n { break } } return l } func (conv *Conv) AddShardIdColumn() { for t, ct := range conv.SpSchema { if ct.ShardIdColumn == "" { colName := conv.buildColumnNameWithBase(t, ShardIdColumn) columnId := GenerateColumnId() ct.ColIds = append(ct.ColIds, columnId) ct.ColDefs[columnId] = ddl.ColumnDef{Name: colName, Id: columnId, T: ddl.Type{Name: ddl.String, Len: 50}, NotNull: false, AutoGen: ddl.AutoGenCol{Name: "", GenerationType: ""}} ct.ShardIdColumn = columnId conv.SpSchema[t] = ct var issues []SchemaIssue issues = append(issues, ShardIdColumnAdded, ShardIdColumnPrimaryKey) conv.SchemaIssues[ct.Id].ColumnLevelIssues[columnId] = issues } } } // AddPrimaryKeys analyzes all tables in conv.schema and adds synthetic primary // keys for any tables that don't have primary key. func (conv *Conv) AddPrimaryKeys() { for t, ct := range conv.SpSchema { if len(ct.PrimaryKeys) == 0 { primaryKeyPopulated := false // Populating column with unique constraint as primary key in case // table doesn't have primary key and removing the unique index. if len(ct.Indexes) != 0 { for i, index := range ct.Indexes { if index.Unique { for _, indexKey := range index.Keys { ct.PrimaryKeys = append(ct.PrimaryKeys, ddl.IndexKey{ColId: indexKey.ColId, Desc: indexKey.Desc, Order: indexKey.Order}) conv.UniquePKey[t] = append(conv.UniquePKey[t], indexKey.ColId) addMissingPrimaryKeyWarning(ct.Id, indexKey.ColId, conv, UniqueIndexPrimaryKey) } primaryKeyPopulated = true ct.Indexes = append(ct.Indexes[:i], ct.Indexes[i+1:]...) break } } } if !primaryKeyPopulated { k := conv.buildColumnNameWithBase(t, SyntheticPrimaryKey) columnId := GenerateColumnId() ct.ColIds = append(ct.ColIds, columnId) ct.ColDefs[columnId] = ddl.ColumnDef{Name: k, Id: columnId, T: ddl.Type{Name: ddl.String, Len: 50}, AutoGen: ddl.AutoGenCol{Name: "", GenerationType: ""}} ct.PrimaryKeys = []ddl.IndexKey{{ColId: columnId, Order: 1}} conv.SyntheticPKeys[t] = SyntheticPKey{columnId, 0} addMissingPrimaryKeyWarning(ct.Id, columnId, conv, MissingPrimaryKey) } conv.SpSchema[t] = ct } } } // Add 'Missing Primary Key' as a Warning inside ColumnLevelIssues of conv object func addMissingPrimaryKeyWarning(tableId string, colId string, conv *Conv, schemaIssue SchemaIssue) { tableLevelIssues := conv.SchemaIssues[tableId].TableLevelIssues var columnLevelIssues map[string][]SchemaIssue if tableIssues, ok := conv.SchemaIssues[tableId]; ok { columnLevelIssues = tableIssues.ColumnLevelIssues } else { columnLevelIssues = make(map[string][]SchemaIssue) } columnLevelIssues[colId] = append(columnLevelIssues[colId], schemaIssue) conv.SchemaIssues[tableId] = TableIssues{ TableLevelIssues: tableLevelIssues, ColumnLevelIssues: columnLevelIssues, } } // SetLocation configures the timezone for data conversion. func (conv *Conv) SetLocation(loc *time.Location) { conv.Location = loc } func (conv *Conv) buildColumnNameWithBase(tableId, base string) string { if _, ok := conv.SpSchema[tableId]; !ok { conv.Unexpected(fmt.Sprintf("Table doesn't exist for tableId %s: ", tableId)) return base } count := 0 key := base for { // Check key isn't already a column in the table. ok := true for _, column := range conv.SpSchema[tableId].ColDefs { if column.Name == key { ok = false break } } if ok { return key } key = fmt.Sprintf("%s%d", base, count) count++ } } // Unexpected records stats about corner-cases and conditions // that were not expected. Note that the counts maybe not // be completely reliable due to potential double-counting // because we process dump data twice. func (conv *Conv) Unexpected(u string) { VerbosePrintf("Unexpected condition: %s\n", u) logger.Log.Debug("Unexpected condition", zap.String("condition", u)) // Limit size of unexpected map. If over limit, then only // update existing entries. if _, ok := conv.Stats.Unexpected[u]; ok || len(conv.Stats.Unexpected) < 1000 { conv.Stats.Unexpected[u]++ } } // StatsAddRow increments the count of rows for 'srcTable' if b is // true. The boolean arg 'b' is used to avoid double counting of // stats. Specifically, some code paths that report row stats run in // both schema-mode and data-mode e.g. statement.go. To avoid double // counting, we explicitly choose a mode-for-stats-collection for each // place where row stats are collected. When specifying this mode take // care to ensure that the code actually runs in the mode you specify, // otherwise stats will be dropped. func (conv *Conv) StatsAddRow(srcTable string, b bool) { if b { conv.Stats.Rows[srcTable]++ } } // statsAddGoodRow increments the good-row stats for 'srcTable' if b // is true. See StatsAddRow comments for context. func (conv *Conv) statsAddGoodRow(srcTable string, b bool) { if b { conv.Stats.GoodRows[srcTable]++ } } // StatsAddBadRow increments the bad-row stats for 'srcTable' if b is // true. See StatsAddRow comments for context. func (conv *Conv) StatsAddBadRow(srcTable string, b bool) { if b { conv.Stats.BadRows[srcTable]++ } } func (conv *Conv) getStatementStat(s string) *statementStat { if conv.Stats.Statement[s] == nil { conv.Stats.Statement[s] = &statementStat{} } return conv.Stats.Statement[s] } // SkipStatement increments the skip statement stats for 'stmtType'. func (conv *Conv) SkipStatement(stmtType string) { if conv.SchemaMode() { // Record statement stats on first pass only. VerbosePrintf("Skipping statement: %s\n", stmtType) logger.Log.Debug("Skipping statement", zap.String("stmtType", stmtType)) conv.getStatementStat(stmtType).Skip++ } } // ErrorInStatement increments the error statement stats for 'stmtType'. func (conv *Conv) ErrorInStatement(stmtType string) { if conv.SchemaMode() { // Record statement stats on first pass only. VerbosePrintf("Error processing statement: %s\n", stmtType) logger.Log.Debug("Error processing statement", zap.String("stmtType", stmtType)) conv.getStatementStat(stmtType).Error++ } } // SchemaStatement increments the schema statement stats for 'stmtType'. func (conv *Conv) SchemaStatement(stmtType string) { if conv.SchemaMode() { // Record statement stats on first pass only. conv.getStatementStat(stmtType).Schema++ } } // DataStatement increments the data statement stats for 'stmtType'. func (conv *Conv) DataStatement(stmtType string) { if conv.SchemaMode() { // Record statement stats on first pass only. conv.getStatementStat(stmtType).Data++ } } // SchemaMode returns true if conv is configured to schemaOnly. func (conv *Conv) SchemaMode() bool { return conv.mode == schemaOnly } // DataMode returns true if conv is configured to dataOnly. func (conv *Conv) DataMode() bool { return conv.mode == dataOnly } func byteSize(r *row) int64 { n := int64(len(r.table)) for _, c := range r.cols { n += int64(len(c)) } for _, v := range r.vals { n += int64(len(v)) } return n }