sources/sqlserver/data.go (148 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlserver
import (
"fmt"
"math/big"
"math/bits"
"strconv"
"strings"
"time"
"cloud.google.com/go/civil"
"cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/schema"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
)
// ProcessDataRow converts a row of data and writes it out to Spanner.
// srcTable and srcCols are the source table and columns respectively,
// and vals contains string data to be converted to appropriate types
// to send to Spanner. ProcessDataRow is only called in DataMode.
func ProcessDataRow(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, vals []string) {
spTableName, cvtCols, cvtVals, err := ConvertData(conv, tableId, colIds, srcSchema, spSchema, vals)
srcTableName := srcSchema.Name
srcCols := []string{}
for _, colId := range colIds {
srcCols = append(srcCols, srcSchema.ColDefs[colId].Name)
}
if err != nil {
conv.Unexpected(fmt.Sprintf("Error while converting data: %s\n", err))
conv.StatsAddBadRow(srcTableName, conv.DataMode())
conv.CollectBadRow(srcTableName, srcCols, vals)
} else {
conv.WriteRow(srcTableName, spTableName, cvtCols, cvtVals)
}
}
// ConvertData maps the source DB data in vals into Spanner data,
// based on the Spanner and source DB schemas. Note that since entries
// in vals may be empty, we also return the list of columns (empty
// cols are dropped).
func ConvertData(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, vals []string) (string, []string, []interface{}, error) {
var c []string
var v []interface{}
if len(colIds) != len(vals) {
return "", []string{}, []interface{}{}, fmt.Errorf("ConvertData: colId and vals don't all have the same lengths: len(colIds)=%d, len(vals)=%d", len(colIds), len(vals))
}
for i, colId := range colIds {
// Skip columns with 'NULL' values.
if vals[i] == "NULL" {
continue
}
spColDef, ok1 := spSchema.ColDefs[colId]
srcColDef, ok2 := srcSchema.ColDefs[colId]
if !ok1 || !ok2 {
return "", []string{}, []interface{}{}, fmt.Errorf("can't find Spanner and source-db schema for colId %s", colId)
}
var x interface{}
var err error
x, err = convScalar(conv, spColDef.T, srcColDef.Type.Name, conv.TimezoneOffset, vals[i])
if err != nil {
return "", []string{}, []interface{}{}, err
}
v = append(v, x)
c = append(c, spColDef.Name)
}
if aux, ok := conv.SyntheticPKeys[tableId]; ok {
c = append(c, conv.SpSchema[tableId].ColDefs[aux.ColId].Name)
v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence)))))
aux.Sequence++
conv.SyntheticPKeys[tableId] = aux
}
return spSchema.Name, c, v, nil
}
// convScalar converts a source database string value to an
// appropriate Spanner value. It is the caller's responsibility to
// detect and handle NULL values: convScalar will return error if a
// NULL value is passed.
func convScalar(conv *internal.Conv, spannerType ddl.Type, srcTypeName string, timezoneOffset string, val string) (interface{}, error) {
// Whitespace within the val string is considered part of the data value.
// Note that many of the underlying conversions functions we use (like
// strconv.ParseFloat and strconv.ParseInt) return "invalid syntax"
// errors if whitespace were to appear at the start or end of a string.
switch spannerType.Name {
case ddl.Bool:
return convBool(val)
case ddl.Bytes:
return convBytes(val)
case ddl.Date:
return convDate(val)
case ddl.Float32:
return convFloat32(val)
case ddl.Float64:
return convFloat64(val)
case ddl.Int64:
return convInt64(val)
case ddl.Numeric:
return convNumeric(conv, val)
case ddl.String:
return val, nil
case ddl.Timestamp:
return convTimestamp(srcTypeName, val)
default:
return val, fmt.Errorf("data conversion not implemented for type %v", spannerType.Name)
}
}
func convBool(val string) (bool, error) {
b, err := strconv.ParseBool(val)
if err != nil {
return b, fmt.Errorf("can't convert to bool: %w", err)
}
return b, err
}
func convBytes(val string) ([]byte, error) {
// convert a string to a byte slice.
b := []byte(val)
return b, nil
}
func convDate(val string) (civil.Date, error) {
date := strings.Fields(val)
d, err := civil.ParseDate(date[0])
if err != nil {
return d, fmt.Errorf("can't convert to date: %w", err)
}
return d, err
}
func convFloat32(val string) (float32, error) {
float, err := strconv.ParseFloat(val, 32)
if err != nil {
return float32(float), fmt.Errorf("can't convert to float32: %w", err)
}
return float32(float), err
}
func convFloat64(val string) (float64, error) {
float, err := strconv.ParseFloat(val, 64)
if err != nil {
return float, fmt.Errorf("can't convert to float64: %w", err)
}
return float, err
}
func convInt64(val string) (int64, error) {
i, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return i, fmt.Errorf("can't convert to int64: %w", err)
}
return i, err
}
// convNumeric maps a source database string value (representing a numeric)
// into a string representing a valid Spanner numeric.
func convNumeric(conv *internal.Conv, val string) (interface{}, error) {
if conv.SpDialect == constants.DIALECT_POSTGRESQL {
return spanner.PGNumeric{Numeric: val, Valid: true}, nil
} else {
r := new(big.Rat)
if _, ok := r.SetString(val); !ok {
return "", fmt.Errorf("can't convert %q to big.Rat", val)
}
return r, nil
}
}
// convTimestamp maps a source DB datetime types to Spanner timestamp
func convTimestamp(srcTypeName string, val string) (t time.Time, err error) {
// the query returns the datetime in ISO8601
// e.g. 2021-12-15T07:39:52.943 (datetime)
// e.g. 2021-12-15T07:39:52.9433333 (datetime2)
// e.g. 2021-12-15T07:40:00 (smalldatetime)
// e.g. 2021-12-08T03:00:52.9500000+01:00 (datetimeoffset)
if srcTypeName == dateTimeOffsetType {
t, err = time.Parse(time.RFC3339, val)
} else {
t, err = time.Parse("2006-01-02T15:04:05", val)
}
if err != nil {
return t, fmt.Errorf("can't convert to timestamp (mssql type: %s)", srcTypeName)
}
return t, err
}