sources/mysql/data.go (209 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mysql import ( "fmt" "math/big" "math/bits" "strconv" "strings" "time" "cloud.google.com/go/civil" "cloud.google.com/go/spanner" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" ) // ProcessDataRow converts a row of data and writes it out to Spanner. // srcTable and srcCols are the source table and columns respectively, // and vals contains string data to be converted to appropriate types // to send to Spanner. ProcessDataRow is only called in DataMode. func ProcessDataRow(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, vals []string, additionalAttributes internal.AdditionalDataAttributes) { srcTableName := srcSchema.Name srcCols := []string{} for _, colId := range colIds { srcCols = append(srcCols, srcSchema.ColDefs[colId].Name) } spTableName, cvtCols, cvtVals, err := ConvertData(conv, tableId, colIds, srcSchema, spSchema, vals, additionalAttributes) if err != nil { conv.Unexpected(fmt.Sprintf("Error while converting data: %s\n", err)) conv.StatsAddBadRow(srcTableName, conv.DataMode()) conv.CollectBadRow(srcTableName, srcCols, vals) } else { conv.WriteRow(srcTableName, spTableName, cvtCols, cvtVals) } } // ConvertData maps the source DB data in vals into Spanner data, // based on the Spanner and source DB schemas. Note that since entries // in vals may be empty, we also return the list of columns (empty // cols are dropped). func ConvertData(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, vals []string, additionalAttributes internal.AdditionalDataAttributes) (string, []string, []interface{}, error) { var c []string var v []interface{} if len(colIds) != len(vals) { return "", []string{}, []interface{}{}, fmt.Errorf("ConvertData: colIds and vals don't all have the same lengths: len(colIds)=%d, len(vals)=%d", len(colIds), len(vals)) } for i, colId := range colIds { // Skip columns with 'NULL' values. When processing data rows from mysqldump, these values // are represented as nil (by pingcap/tidb/types/parser_driver's ValueExpr), which is // converted to the string '<nil>'. When processing data rows obtained from the MySQL driver, // 'NULL' values are represented as "NULL" (because we retrieve the values as strings). if vals[i] == "<nil>" || vals[i] == "NULL" { continue } spColDef, ok1 := spSchema.ColDefs[colId] srcColDef, ok2 := srcSchema.ColDefs[colId] if !ok1 || !ok2 { return "", []string{}, []interface{}{}, fmt.Errorf("can't find Spanner and source-db schema for colId %s", colId) } spCol := spColDef.Name var x interface{} var err error if spColDef.T.IsArray { x, err = convArray(spColDef.T, srcColDef.Type.Name, vals[i]) } else { x, err = convScalar(conv, spColDef.T, srcColDef.Type.Name, conv.TimezoneOffset, vals[i]) } if err != nil { return "", []string{}, []interface{}{}, err } v = append(v, x) c = append(c, spCol) } if aux, ok := conv.SyntheticPKeys[tableId]; ok { c = append(c, conv.SpSchema[tableId].ColDefs[aux.ColId].Name) v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[tableId] = aux } colId := conv.SpSchema[tableId].ShardIdColumn if colId != "" { c = append(c, conv.SpSchema[tableId].ColDefs[colId].Name) v = append(v, additionalAttributes.ShardId) } return conv.SpSchema[tableId].Name, c, v, nil } // convScalar converts a source database string value to an // appropriate Spanner value. It is the caller's responsibility to // detect and handle NULL values: convScalar will return error if a // NULL value is passed. func convScalar(conv *internal.Conv, spannerType ddl.Type, srcTypeName string, TimezoneOffset string, val string) (interface{}, error) { // Whitespace within the val string is considered part of the data value. // Note that many of the underlying conversions functions we use (like // strconv.ParseFloat and strconv.ParseInt) return "invalid syntax" // errors if whitespace were to appear at the start or end of a string. // We do not expect mysqldump to generate such output. switch spannerType.Name { case ddl.Bool: return convBool(conv, spannerType, srcTypeName, val) case ddl.Bytes: return convBytes(val) case ddl.Date: return convDate(val) case ddl.Float32: return convFloat32(val) case ddl.Float64: return convFloat64(val) case ddl.Int64: return convInt64(val) case ddl.Numeric: return convNumeric(conv, val) case ddl.String: return val, nil case ddl.Timestamp: return convTimestamp(srcTypeName, TimezoneOffset, val) case ddl.JSON: return val, nil default: return val, fmt.Errorf("data conversion not implemented for type %v", spannerType.Name) } } func convBool(conv *internal.Conv, spannerType ddl.Type, srcTypeName string, val string) (bool, error) { b, err := strconv.ParseBool(val) if err != nil { if srcTypeName == "bit" { // To handle scenarios where bit is used to store boolean // zero treated as false // one treated as true switch val { case "\x00": return false, nil case "\x01": return true, nil } } // MySQL uses TINYINT(1) to implement BOOL/BOOLEAN, and does not // enforce/validate boolean values i.e. any value that can be stored // in a TINYINT (-128 to 127) can be stored in BOOL/BOOLEAN. // If ParseBool(val) fails, this is very likely the cause. // To handle this, re-parse as INT64 and treat as true if value is non-zero. // Note: if ParseBool(val) fails, then val is probably a non-zero number. i, err2 := convInt64(val) if err2 == nil && i >= -128 && i <= 127 { b = i != 0 conv.Unexpected(fmt.Sprintf("Expected boolean value, but found integer value %v; mapping it to %v\n", val, b)) return b, err2 } return b, fmt.Errorf("can't convert to bool: %w", err) } return b, err } func convBytes(val string) ([]byte, error) { // convert a string to a byte slice. b := []byte(val) return b, nil } func convDate(val string) (civil.Date, error) { d, err := civil.ParseDate(val) if err != nil { return d, fmt.Errorf("can't convert to date: %w", err) } return d, err } func convFloat32(val string) (float32, error) { f, err := strconv.ParseFloat(val, 32) if err != nil { return float32(f), fmt.Errorf("can't convert to float32: %w", err) } return float32(f), err } func convFloat64(val string) (float64, error) { f, err := strconv.ParseFloat(val, 64) if err != nil { return f, fmt.Errorf("can't convert to float64: %w", err) } return f, err } func convInt64(val string) (int64, error) { i, err := strconv.ParseInt(val, 10, 64) if err != nil { return i, fmt.Errorf("can't convert to int64: %w", err) } return i, err } // convNumeric maps a source database string value (representing a numeric) // into a string representing a valid Spanner numeric. func convNumeric(conv *internal.Conv, val string) (interface{}, error) { if conv.SpDialect == constants.DIALECT_POSTGRESQL { return spanner.PGNumeric{Numeric: val, Valid: true}, nil } else { r := new(big.Rat) if _, ok := r.SetString(val); !ok { return "", fmt.Errorf("can't convert %q to big.Rat", val) } return r, nil } } // convTimestamp maps a source DB timestamp into a go Time Spanner timestamp // It handles both datetime and timestamp conversions. func convTimestamp(srcTypeName string, TimezoneOffset string, val string) (t time.Time, err error) { // mysqldump outputs timestamps as ISO 8601, except // it uses space instead of T. if srcTypeName == "timestamp" { // We consider timezone for timestamp datatype. // If timezone is not specified in mysqldump, we consider UTC time. if TimezoneOffset == "" { TimezoneOffset = "+00:00" } // convert timestamp from format "2006-01-02 15:04:05" to // "2006-01-02T15:04:05+00:00". timeNew := strings.Split(val, " ") timeJoined := strings.Join(timeNew, "T") timeJoined = timeJoined + TimezoneOffset t, err = time.Parse(time.RFC3339, timeJoined) } else { // datetime: data should just consist of date and time. // timestamp conversion should ignore timezone. We mimic this using Parse // i.e. treat it as UTC, so it will be stored 'as-is' in Spanner. t, err = time.Parse("2006-01-02 15:04:05", val) } if err != nil { return t, fmt.Errorf("can't convert to timestamp (mysql type: %s)", srcTypeName) } return t, err } // convArray converts a source database string value (representing an // array) to an appropriate Spanner array value. It is the caller's // responsibility to detect and handle the case where the entire array // is NULL. However, convArray does handle the case where individual // array elements are NULL. In other words, convArray handles "{1, // NULL, 2}", but it does not handle "NULL" (it returns error). // NOTE : convArray would only be called when MySQL 'SET' datatype is encountered. func convArray(spannerType ddl.Type, srcTypeName string, v string) (interface{}, error) { v = strings.TrimSpace(v) // Handle empty array. Note that we use an empty NullString array // for all Spanner array types since this will be converted to the // appropriate type by the Spanner client. if v == "" { return []spanner.NullString{}, nil } a := strings.Split(v, ",") // The Spanner client for go does not accept []interface{} for arrays. // Instead it only accepts slices of a specific type eg: []string // Hence we have to do the following case analysis. // NOTE: MySQL only supports SET of string which will be translated // to spanner array<string>. switch spannerType.Name { case ddl.String: var r []spanner.NullString for _, s := range a { if s == "NULL" { r = append(r, spanner.NullString{Valid: false}) continue } s, err := processQuote(s) if err != nil { return []spanner.NullString{}, err } r = append(r, spanner.NullString{StringVal: s, Valid: true}) } return r, nil } return []interface{}{}, fmt.Errorf("array type conversion not implemented for type %v", spannerType.Name) } // processQuote returns the unquoted version of s. // Note: The element values of a MySQL array ('SET' datatype) may have double // quotes around them. The array output routine will put double // quotes around element values if they are empty strings, contain // curly braces, delimiter characters, double quotes, backslashes, or // white space, or match the word NULL. Double quotes and backslashes // embedded in element values will be backslash-escaped. func processQuote(s string) (string, error) { if len(s) >= 2 && s[0] == '"' && s[len(s)-1] == '"' { return strconv.Unquote(s) } return s, nil }