sources/dynamodb/data.go (184 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynamodb
import (
"encoding/json"
"fmt"
"math/big"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/schema"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
)
func ProcessDataRow(m map[string]*dynamodb.AttributeValue, conv *internal.Conv, tableId string, srcSchema schema.Table, colIds []string, spSchema ddl.CreateTable) {
spVals, badCols, srcStrVals := cvtRow(m, srcSchema, spSchema, colIds)
srcTableName := srcSchema.Name
spTableName := spSchema.Name
spColNames := []string{}
srcColNames := []string{}
for _, colId := range colIds {
srcColNames = append(srcColNames, srcSchema.ColDefs[colId].Name)
spColNames = append(spColNames, spSchema.ColDefs[colId].Name)
}
if len(badCols) == 0 {
conv.WriteRow(srcTableName, spTableName, spColNames, spVals)
} else {
conv.Unexpected(fmt.Sprintf("Data conversion error for table %s in column(s) %s\n", srcTableName, badCols))
conv.StatsAddBadRow(srcTableName, conv.DataMode())
conv.CollectBadRow(srcTableName, srcColNames, srcStrVals)
}
}
func cvtRow(attrsMap map[string]*dynamodb.AttributeValue, srcSchema schema.Table, spSchema ddl.CreateTable, colIds []string) ([]interface{}, []string, []string) {
var err error
var srcStrVals []string
var spVals []interface{}
var badCols []string
for _, colId := range colIds {
srcColName := srcSchema.ColDefs[colId].Name
var spVal interface{}
var srcStrVal string
if attrsMap[srcColName] == nil {
spVal = nil
srcStrVal = "null"
} else {
// Convert data to the target type.
spColDef := spSchema.ColDefs[colId]
srcColDef := srcSchema.ColDefs[colId]
if spColDef.T.IsArray {
spVal, err = convArray(attrsMap[srcColName], srcColDef.Type.Name, spColDef.T.Name)
} else {
spVal, err = convScalar(attrsMap[srcColName], srcColDef.Type.Name, spColDef.T.Name)
}
if err != nil {
badCols = append(badCols, srcColName)
}
srcStrVal = attrsMap[srcColName].GoString()
}
srcStrVals = append(srcStrVals, srcStrVal)
spVals = append(spVals, spVal)
}
return spVals, badCols, srcStrVals
}
func convArray(attrVal *dynamodb.AttributeValue, srcType string, spType string) (interface{}, error) {
switch spType {
case ddl.Bytes:
switch srcType {
case typeBinarySet:
return attrVal.BS, nil
}
case ddl.String:
switch srcType {
case typeStringSet:
var strArr []string
for _, s := range attrVal.SS {
strArr = append(strArr, *s)
}
return strArr, nil
case typeNumberStringSet:
var strArr []string
for _, s := range attrVal.NS {
strArr = append(strArr, *s)
}
return strArr, nil
}
case ddl.Numeric:
switch srcType {
case typeNumberSet:
var numArr []big.Rat
for _, s := range attrVal.NS {
val, ok := (&big.Rat{}).SetString(*s)
if !ok {
return nil, fmt.Errorf("failed to convert '%v' to an NUMERIC array", attrVal.NS)
}
numArr = append(numArr, *val)
}
return numArr, nil
}
}
return nil, fmt.Errorf("can't convert value of type %s to Spanner type %s", attrVal.GoString(), spType)
}
func convScalar(attrVal *dynamodb.AttributeValue, srcType string, spType string) (interface{}, error) {
switch spType {
case ddl.Bool:
switch srcType {
case typeBool:
return *attrVal.BOOL, nil
}
case ddl.Bytes:
switch srcType {
case typeBinary:
return attrVal.B, nil
}
case ddl.String:
switch srcType {
case typeString:
return *attrVal.S, nil
case typeNumberString:
return *attrVal.N, nil
case typeMap, typeList, typeStringSet, typeNumberStringSet, typeNumberSet, typeBinarySet:
// For typeMap and typeList, attrVal is a very verbose data
// structure that contains null entries for unused type cases. We
// strip these out using stripNull. If it is important that the
// Spanner values can be easily unmarshalled back to
// dynamodb.AttributeValue types, then replace the following five
// lines with just:
// b, err := json.Marshal(attrVal)
// but note that this will consume extra Spanner storage.
val, err := stripNull(attrVal)
if err != nil {
return nil, fmt.Errorf("failed to convert %v to a go struct", attrVal.GoString())
}
b, err := json.Marshal(val)
if err != nil {
return nil, fmt.Errorf("failed to convert %v to a json string", attrVal.GoString())
}
return string(b), nil
}
case ddl.Numeric:
switch srcType {
case typeNumber:
s := *attrVal.N
val, ok := (&big.Rat{}).SetString(s)
if !ok {
return nil, fmt.Errorf("failed to convert '%v' to an NUMERIC type", s)
}
return *val, nil
}
}
return nil, fmt.Errorf("can't convert value of type %s to Spanner type %s", attrVal.GoString(), spType)
}
// stripNull converts a dynamodb.AttributeValue to a Go struct which can
// be easily encoded to a json string. If we use the normal json encoder, it
// will have many null values. The purpose of this function is to remove the
// null values in the json string.
func stripNull(a *dynamodb.AttributeValue) (interface{}, error) {
var err error
switch {
case a.M != nil:
cvtMap := make(map[string]interface{})
for k, v := range a.M {
cvtMap[k], err = stripNull(v)
if err != nil {
return nil, err
}
}
return cvtMap, nil
case a.L != nil:
var cvtList []interface{}
for _, v := range a.L {
c, err := stripNull(v)
if err != nil {
return nil, err
}
cvtList = append(cvtList, c)
}
return cvtList, nil
case a.B != nil:
return string(a.B), nil
case a.BOOL != nil:
return a.BOOL, nil
case a.BS != nil:
var bs []string
for _, b := range a.BS {
bs = append(bs, string(b))
}
return bs, nil
case a.N != nil:
return *a.N, nil
case a.NS != nil:
return a.NS, nil
case a.NULL != nil:
return a.NULL, nil
case a.S != nil:
return *a.S, nil
case a.SS != nil:
return a.SS, nil
default:
return nil, fmt.Errorf("unknown type of AttributeValue: %v", a)
}
}