sources/oracle/data.go (288 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package oracle import ( "encoding/json" "fmt" "math/big" "math/bits" "strconv" "strings" "time" "cloud.google.com/go/civil" "cloud.google.com/go/spanner" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/schema" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" xj "github.com/basgys/goxml2json" ) func ProcessDataRow(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, vals []string) { spTableName, cvtCols, cvtVals, err := convertData(conv, tableId, colIds, srcSchema, spSchema, vals) srcTableName := srcSchema.Name srcCols := []string{} for _, colId := range colIds { srcCols = append(srcCols, srcSchema.ColDefs[colId].Name) } if err != nil { conv.Unexpected(fmt.Sprintf("Error while converting data: %s\n", err)) conv.StatsAddBadRow(srcTableName, conv.DataMode()) conv.CollectBadRow(srcTableName, srcCols, vals) } else { conv.WriteRow(srcTableName, spTableName, cvtCols, cvtVals) } } // ConvertData maps the source DB data in vals into Spanner data, // based on the Spanner and source DB schemas. Note that since entries // in vals may be empty, we also return the list of columns (empty // cols are dropped). func convertData(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, vals []string) (string, []string, []interface{}, error) { var c []string var v []interface{} if len(colIds) != len(vals) { return "", []string{}, []interface{}{}, fmt.Errorf("ConvertData: colIds and vals don't all have the same lengths: len(colIds)=%d, len(vals)=%d", len(colIds), len(vals)) } for i, colId := range colIds { // Skip columns with 'NULL' values., these values // 'NULL' values are represented as "NULL" (because we retrieve the values as strings). if vals[i] == "NULL" { continue } spColDef, ok1 := spSchema.ColDefs[colId] srcColDef, ok2 := srcSchema.ColDefs[colId] if !ok1 || !ok2 { return "", []string{}, []interface{}{}, fmt.Errorf("can't find Spanner and source-db schema for column id %s", colId) } var x interface{} var err error if spColDef.T.IsArray { x, err = convArray(spColDef.T, srcColDef.Type.Name, vals[i]) } else { x, err = convScalar(conv, spColDef.T, srcColDef.Type.Name, conv.TimezoneOffset, vals[i]) } if err != nil { return "", []string{}, []interface{}{}, err } v = append(v, x) c = append(c, spColDef.Name) } if aux, ok := conv.SyntheticPKeys[tableId]; ok { c = append(c, conv.SpSchema[tableId].ColDefs[aux.ColId].Name) v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[tableId] = aux } return spSchema.Name, c, v, nil } // convScalar converts a source database string value to an // appropriate Spanner value. It is the caller's responsibility to // detect and handle NULL values: convScalar will return error if a // NULL value is passed. func convScalar(conv *internal.Conv, spannerType ddl.Type, srcTypeName string, TimezoneOffset string, val string) (interface{}, error) { // Whitespace within the val string is considered part of the data value. // Note that many of the underlying conversions functions we use (like // strconv.ParseFloat and strconv.ParseInt) return "invalid syntax" // errors if whitespace were to appear at the start or end of a string. switch spannerType.Name { case ddl.Bool: return convBool(conv, val) case ddl.Bytes: return convBytes(val) case ddl.Date: return convDate(val) case ddl.Float32: return convFloat32(val) case ddl.Float64: return convFloat64(val) case ddl.Int64: return convInt64(val) case ddl.Numeric: return convNumeric(conv, val) case ddl.String: return val, nil case ddl.Timestamp: return convTimestamp(srcTypeName, val) case ddl.JSON: if srcTypeName == "OBJECT" { return convertXmlToJson(val) } return val, nil default: return val, fmt.Errorf("data conversion not implemented for type %v", spannerType.Name) } } func convBool(conv *internal.Conv, val string) (bool, error) { b, err := strconv.ParseBool(val) if err != nil { i, err2 := convInt64(val) if err2 == nil && i >= -128 && i <= 127 { b = i != 0 conv.Unexpected(fmt.Sprintf("Expected boolean value, but found integer value %v; mapping it to %v\n", val, b)) return b, err2 } return b, fmt.Errorf("can't convert to bool: %w", err) } return b, err } func convBytes(val string) ([]byte, error) { // convert a string to a byte slice. b := []byte(val) return b, nil } func convDate(val string) (civil.Date, error) { date := strings.Split(val, "T")[0] d, err := civil.ParseDate(date) if err != nil { return d, fmt.Errorf("can't convert to date: %w", err) } return d, err } func convFloat32(val string) (float32, error) { f, err := strconv.ParseFloat(val, 32) if err != nil { return float32(f), fmt.Errorf("can't convert to float32: %w", err) } return float32(f), err } func convFloat64(val string) (float64, error) { f, err := strconv.ParseFloat(val, 64) if err != nil { return f, fmt.Errorf("can't convert to float64: %w", err) } return f, err } func convInt64(val string) (int64, error) { i, err := strconv.ParseInt(val, 10, 64) if err != nil { return i, fmt.Errorf("can't convert to int64: %w", err) } return i, err } // convNumeric maps a source database string value (representing a numeric) // into a string representing a valid Spanner numeric. func convNumeric(conv *internal.Conv, val string) (interface{}, error) { if conv.SpDialect == constants.DIALECT_POSTGRESQL { return spanner.PGNumeric{Numeric: val, Valid: true}, nil } else { r := new(big.Rat) if _, ok := r.SetString(val); !ok { return "", fmt.Errorf("can't convert %q to big.Rat", val) } return r, nil } } // convTimestamp maps a source DB timestamp into a go Time Spanner timestamp func convTimestamp(srcTypeName string, val string) (t time.Time, err error) { // we are getting all timestamp value in UTC from the oracle. // e.g. 2022-02-01T08:14:36.254Z (timestamp) // e.g. 2022-02-01T12:14:36.254Z (timestamp with timezone) // e.g. 2022-02-01T06:14:36.254Z (timestamp with local timezone) t, err = time.Parse(time.RFC3339, val) if err != nil { return t, fmt.Errorf("can't convert to timestamp (type: %s)", srcTypeName) } return t, err } func convArray(spannerType ddl.Type, srcTypeName string, v string) (interface{}, error) { v = strings.TrimSpace(v) // Handle empty array. Note that we use an empty NullString array // for all Spanner array types since this will be converted to the // appropriate type by the Spanner client. if v == "" { return []spanner.NullString{}, nil } // The Spanner client for go does not accept []interface{} for arrays. // Instead it only accepts slices of a specific type eg: []string // Hence we have to do the following case analysis. switch spannerType.Name { case ddl.String: var a []string var r []spanner.NullString err := json.Unmarshal([]byte(v), &a) if err != nil { return []spanner.NullString{}, err } for _, s := range a { if s == "NULL" { r = append(r, spanner.NullString{Valid: false}) continue } r = append(r, spanner.NullString{StringVal: s, Valid: true}) } return r, nil case ddl.Numeric: var a []interface{} var r []spanner.NullNumeric err := json.Unmarshal([]byte(v), &a) if err != nil { return []spanner.NullNumeric{}, err } for _, s := range a { if s == "NULL" { r = append(r, spanner.NullNumeric{Valid: false}) continue } val := new(big.Rat) if _, ok := val.SetString(fmt.Sprint(s)); !ok { return []spanner.NullNumeric{}, fmt.Errorf("can't convert %q to big.Rat", s) } r = append(r, spanner.NullNumeric{Numeric: *val, Valid: true}) } return r, nil case ddl.Int64: var a []interface{} var r []spanner.NullInt64 err := json.Unmarshal([]byte(v), &a) if err != nil { return []spanner.NullInt64{}, err } for _, s := range a { if s == "NULL" { r = append(r, spanner.NullInt64{Valid: false}) continue } val, err := convInt64(fmt.Sprint(s)) if err != nil { return []spanner.NullInt64{}, err } r = append(r, spanner.NullInt64{Int64: val, Valid: true}) } return r, nil case ddl.Float32: var a []interface{} var r []spanner.NullFloat32 err := json.Unmarshal([]byte(v), &a) if err != nil { return []spanner.NullFloat32{}, err } for _, s := range a { if s == "NULL" { r = append(r, spanner.NullFloat32{Valid: false}) continue } val, err := convFloat32(fmt.Sprint(s)) if err != nil { return []spanner.NullFloat32{}, err } r = append(r, spanner.NullFloat32{Float32: val, Valid: true}) } return r, nil case ddl.Float64: var a []interface{} var r []spanner.NullFloat64 err := json.Unmarshal([]byte(v), &a) if err != nil { return []spanner.NullFloat64{}, err } for _, s := range a { if s == "NULL" { r = append(r, spanner.NullFloat64{Valid: false}) continue } val, err := convFloat64(fmt.Sprint(s)) if err != nil { return []spanner.NullFloat64{}, err } r = append(r, spanner.NullFloat64{Float64: val, Valid: true}) } return r, nil case ddl.Date: var a []interface{} err := json.Unmarshal([]byte(v), &a) if err != nil { return []spanner.NullDate{}, err } var r []spanner.NullDate for _, s := range a { if s == "NULL" { r = append(r, spanner.NullDate{Valid: false}) continue } val, err := convDate(fmt.Sprint(s)) if err != nil { return []spanner.NullDate{}, err } r = append(r, spanner.NullDate{Date: val, Valid: true}) } return r, nil } return []interface{}{}, fmt.Errorf("array type conversion not implemented for type %v", spannerType.Name) } func convertXmlToJson(v string) (ans string, err error) { xml := strings.NewReader(v) j, err := xj.Convert(xml) if err != nil { return "", fmt.Errorf("not able to convert object to JSON: %v ", v) } return j.String(), nil }