sources/postgres/data.go (331 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"encoding/hex"
"fmt"
"math/big"
"math/bits"
"reflect"
"strconv"
"strings"
"time"
"cloud.google.com/go/civil"
"cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
)
// ProcessDataRow converts a row of data and writes it out to Spanner.
// srcTable and srcCols are the source table and columns respectively,
// and vals contains string data to be converted to appropriate types
// to send to Spanner. ProcessDataRow is only called in DataMode.
func ProcessDataRow(conv *internal.Conv, tableId string, colIds, vals []string) {
spTableName, spCols, spVals, err := ConvertData(conv, tableId, colIds, vals)
srcTable := conv.SrcSchema[tableId]
srcTableName := srcTable.Name
srcCols := []string{}
for _, colId := range colIds {
srcCols = append(srcCols, srcTable.ColDefs[colId].Name)
}
if err != nil {
conv.Unexpected(fmt.Sprintf("Error while converting data: %s\n", err))
conv.StatsAddBadRow(srcTableName, conv.DataMode())
conv.CollectBadRow(srcTableName, srcCols, vals)
} else {
conv.WriteRow(srcTableName, spTableName, spCols, spVals)
}
}
// ConvertData maps the source DB data in vals into Spanner data,
// based on the Spanner and source DB schemas. Note that since entries
// in vals may be empty, we also return the list of columns (empty
// cols are dropped).
func ConvertData(conv *internal.Conv, tableId string, colIds []string, vals []string) (string, []string, []interface{}, error) {
// Note: if there are many rows for the same srcTable/srcCols,
// then the following functionality will be (redundantly)
// repeated for every row converted. If this becomes a
// performance issue, we could consider moving this block of
// code to the callers of ConverData to avoid the redundancy.
spSchema, ok1 := conv.SpSchema[tableId]
srcSchema, ok2 := conv.SrcSchema[tableId]
if !ok1 || !ok2 {
return "", []string{}, []interface{}{}, fmt.Errorf("can't find table %s in schema", conv.SpSchema[tableId].Name)
}
var c []string
var v []interface{}
if len(colIds) != len(vals) {
return "", []string{}, []interface{}{}, fmt.Errorf("ConvertData: colIds and vals don't all have the same lengths: len(colIds)=%d, len(vals)=%d", len(colIds), len(vals))
}
for i, colId := range colIds {
// "\\N" is for PostgreSQL representation of empty column in COPY-FROM blocks.
// TODO: Consider using NullString to differentiate between an actual column having "NULL" as a string
// and NULL values.
if vals[i] == "\\N" || vals[i] == "NULL" {
continue
}
spColDef, ok1 := spSchema.ColDefs[colId]
srcColDef, ok2 := srcSchema.ColDefs[colId]
if !ok1 || !ok2 {
return "", []string{}, []interface{}{}, fmt.Errorf("can't find Spanner and source-db schema for colId %s", colId)
}
var x interface{}
var err error
if spColDef.T.IsArray {
x, err = convArray(spColDef.T, srcColDef.Type.Name, conv.Location, vals[i])
} else {
x, err = convScalar(conv, spColDef.T, srcColDef.Type.Name, conv.Location, vals[i])
}
if err != nil {
return "", []string{}, []interface{}{}, err
}
v = append(v, x)
c = append(c, spColDef.Name)
}
if aux, ok := conv.SyntheticPKeys[tableId]; ok {
c = append(c, conv.SpSchema[tableId].ColDefs[aux.ColId].Name)
v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence)))))
aux.Sequence++
conv.SyntheticPKeys[tableId] = aux
}
return spSchema.Name, c, v, nil
}
// convScalar converts a source database string value to an
// appropriate Spanner value. It is the caller's responsibility to
// detect and handle NULL values: convScalar will return error if a
// NULL value is passed.
func convScalar(conv *internal.Conv, spannerType ddl.Type, srcTypeName string, location *time.Location, val string) (interface{}, error) {
// Whitespace within the val string is considered part of the data value.
// Note that many of the underlying conversions functions we use (like
// strconv.ParseFloat and strconv.ParseInt) return "invalid syntax"
// errors if whitespace were to appear at the start or end of a string.
// We do not expect pg_dump to generate such output.
switch spannerType.Name {
case ddl.Bool:
return convBool(val)
case ddl.Bytes:
return convBytes(val)
case ddl.Date:
return convDate(val)
case ddl.Float32:
return convFloat32(val)
case ddl.Float64:
return convFloat64(val)
case ddl.Int64:
return convInt64(val)
case ddl.Numeric:
return convNumeric(conv, val)
case ddl.String:
return val, nil
case ddl.Timestamp:
return convTimestamp(srcTypeName, location, val)
case ddl.JSON:
return val, nil
default:
return val, fmt.Errorf("data conversion not implemented for type %v", spannerType.Name)
}
}
func convBool(val string) (bool, error) {
b, err := strconv.ParseBool(val)
if err != nil {
return b, fmt.Errorf("can't convert to bool: %w", err)
}
return b, err
}
func convBytes(val string) ([]byte, error) {
if val[0:2] != `\x` {
return []byte{}, fmt.Errorf("can't convert to bytes: doesn't start with \\x prefix")
}
b, err := hex.DecodeString(val[2:])
if err != nil {
return b, fmt.Errorf("can't convert to bytes: %w", err)
}
return b, err
}
func convDate(val string) (civil.Date, error) {
d, err := civil.ParseDate(val)
if err != nil {
return d, fmt.Errorf("can't convert to date: %w", err)
}
return d, err
}
func convFloat32(val string) (float32, error) {
f, err := strconv.ParseFloat(val, 32)
if err != nil {
return float32(f), fmt.Errorf("can't convert to float32: %w", err)
}
return float32(f), err
}
func convFloat64(val string) (float64, error) {
f, err := strconv.ParseFloat(val, 64)
if err != nil {
return f, fmt.Errorf("can't convert to float64: %w", err)
}
return f, err
}
func convInt64(val string) (int64, error) {
i, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return i, fmt.Errorf("can't convert to int64: %w", err)
}
return i, err
}
// convNumeric maps a source database string value (representing a numeric)
// into a string representing a valid Spanner numeric.
func convNumeric(conv *internal.Conv, val string) (interface{}, error) {
if conv.SpDialect == constants.DIALECT_POSTGRESQL {
return spanner.PGNumeric{Numeric: val, Valid: true}, nil
} else {
r := new(big.Rat)
if _, ok := r.SetString(val); !ok {
return "", fmt.Errorf("can't convert %q to big.Rat", val)
}
return r, nil
}
}
// convTimestamp maps a source DB timestamp into a go Time (which
// is translated to a Spanner timestamp by the go Spanner client library).
// It handles both timestamptz and timestamp conversions.
// Note that PostgreSQL supports a wide variety of different timestamp
// formats (see https://www.postgresql.org/docs/9.1/datatype-datetime.html).
// We don't attempt to support all of these timestamp formats. Our goal
// is more modest: we just need to support the formats generated by
// pg_dump.
func convTimestamp(srcTypeName string, location *time.Location, val string) (t time.Time, err error) {
// pg_dump outputs timestamps as ISO 8601, except:
// a) it uses space instead of T
// b) timezones are abbreviated to just hour (minute is specified only if non-zero).
if srcTypeName == "timestamptz" || srcTypeName == "timestamp with time zone" {
// PostgreSQL abbreviates timezone to just hour where possible.
t, err = time.Parse("2006-01-02 15:04:05Z07", val)
if err != nil {
// Try using hour and min for timezone e.g. PGTZ set to 'Asia/Kolkata'.
t, err = time.Parse("2006-01-02 15:04:05Z07:00", val)
}
if err != nil {
// Try parsing without timezone. Some pg_dump files
// generate timestamps without timezone for timestampz data
// e.g. the Pagila port of Sakila. We interpret these timestamps
// using the current time location (default is local time).
// Note: we might want to look for "SET TIME ZONE" in the pg_dump
// and interpret wrt that timezone.
t, err = time.ParseInLocation("2006-01-02 15:04:05", val, location)
}
} else {
// timestamp without time zone: data should just consist of date and time.
// timestamp conversion should ignore timezone. We mimic this using Parse
// i.e. treat it as UTC, so it will be stored 'as-is' in Spanner.
t, err = time.Parse("2006-01-02 15:04:05", val)
}
if err != nil {
return t, fmt.Errorf("can't convert to timestamp (posgres type: %s)", srcTypeName)
}
return t, err
}
// convArray converts a source database string value (representing an
// array) to an appropriate Spanner array value. It is the caller's
// responsibility to detect and handle the case where the entire array
// is NULL. However, convArray does handle the case where individual
// array elements are NULL. In other words, convArray handles "{1,
// NULL, 2}", but it does not handle "NULL" (it returns error).
func convArray(spannerType ddl.Type, srcTypeName string, location *time.Location, v string) (interface{}, error) {
v = strings.TrimSpace(v)
// Handle empty array. Note that we use an empty NullString array
// for all Spanner array types since this will be converted to the
// appropriate type by the Spanner client.
if v == "{}" {
return []spanner.NullString{}, nil
}
if v[0] != '{' || v[len(v)-1] != '}' {
return []interface{}{}, fmt.Errorf("unrecognized data format for array: expected {v1, v2, ...}")
}
a := strings.Split(v[1:len(v)-1], ",")
// The Spanner client for go does not accept []interface{} for arrays.
// Instead it only accepts slices of a specific type e.g. []int64, []string.
// Hence we have to do the following case analysis.
switch spannerType.Name {
case ddl.Bool:
var r []spanner.NullBool
for _, s := range a {
if s == "NULL" {
r = append(r, spanner.NullBool{Valid: false})
continue
}
s, err := processQuote(s)
if err != nil {
return []spanner.NullBool{}, err
}
b, err := convBool(s)
if err != nil {
return []spanner.NullBool{}, err
}
r = append(r, spanner.NullBool{Bool: b, Valid: true})
}
return r, nil
case ddl.Bytes:
var r [][]byte
for _, s := range a {
if s == "NULL" {
r = append(r, nil)
continue
}
s, err := processQuote(s)
if err != nil {
return [][]byte{}, err
}
b, err := convBytes(s)
if err != nil {
return [][]byte{}, err
}
r = append(r, b)
}
return r, nil
case ddl.Date:
var r []spanner.NullDate
for _, s := range a {
if s == "NULL" {
r = append(r, spanner.NullDate{Valid: false})
continue
}
s, err := processQuote(s)
if err != nil {
return []spanner.NullDate{}, err
}
date, err := convDate(s)
if err != nil {
return []spanner.NullDate{}, err
}
r = append(r, spanner.NullDate{Date: date, Valid: true})
}
return r, nil
case ddl.Float32:
var r []spanner.NullFloat32
for _, s := range a {
if s == "NULL" {
r = append(r, spanner.NullFloat32{Valid: false})
continue
}
s, err := processQuote(s)
if err != nil {
return []spanner.NullFloat32{}, err
}
f, err := convFloat32(s)
if err != nil {
return []spanner.NullFloat32{}, err
}
r = append(r, spanner.NullFloat32{Float32: f, Valid: true})
}
return r, nil
case ddl.Float64:
var r []spanner.NullFloat64
for _, s := range a {
if s == "NULL" {
r = append(r, spanner.NullFloat64{Valid: false})
continue
}
s, err := processQuote(s)
if err != nil {
return []spanner.NullFloat64{}, err
}
f, err := convFloat64(s)
if err != nil {
return []spanner.NullFloat64{}, err
}
r = append(r, spanner.NullFloat64{Float64: f, Valid: true})
}
return r, nil
case ddl.Int64:
var r []spanner.NullInt64
for _, s := range a {
if s == "NULL" {
r = append(r, spanner.NullInt64{Valid: false})
continue
}
s, err := processQuote(s)
if err != nil {
return []spanner.NullInt64{}, err
}
i, err := convInt64(s)
if err != nil {
return r, err
}
r = append(r, spanner.NullInt64{Int64: i, Valid: true})
}
return r, nil
case ddl.String:
var r []spanner.NullString
for _, s := range a {
if s == "NULL" {
r = append(r, spanner.NullString{Valid: false})
continue
}
s, err := processQuote(s)
if err != nil {
return []spanner.NullString{}, err
}
r = append(r, spanner.NullString{StringVal: s, Valid: true})
}
return r, nil
case ddl.Timestamp:
var r []spanner.NullTime
for _, s := range a {
if s == "NULL" {
r = append(r, spanner.NullTime{Valid: false})
continue
}
s, err := processQuote(s)
if err != nil {
return []spanner.NullTime{}, err
}
t, err := convTimestamp(srcTypeName, location, s)
if err != nil {
return []spanner.NullTime{}, err
}
r = append(r, spanner.NullTime{Time: t, Valid: true})
}
return r, nil
}
return []interface{}{}, fmt.Errorf("array type conversion not implemented for type %v", reflect.TypeOf(spannerType))
}
// processQuote returns the unquoted version of s.
// Note: The element values of PostgreSQL arrays may have double
// quotes around them. The array output routine will put double
// quotes around element values if they are empty strings, contain
// curly braces, delimiter characters, double quotes, backslashes, or
// white space, or match the word NULL. Double quotes and backslashes
// embedded in element values will be backslash-escaped. See section
// 8.14.6.of www.postgresql.org/docs/9.1/arrays.html.
func processQuote(s string) (string, error) {
if len(s) >= 2 && s[0] == '"' && s[len(s)-1] == '"' {
return strconv.Unquote(s)
}
return s, nil
}