import_data/csv_data.go (93 lines of code) (raw):
package import_data
import (
"context"
"fmt"
"sync/atomic"
sp "cloud.google.com/go/spanner"
spannerclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/client"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/proto/migration"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/csv"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/writer"
"go.uber.org/zap"
)
type CsvData interface {
ImportData(ctx context.Context, infoSchema *spanner.InfoSchemaImpl, dialect string) error
}
type CsvDataImpl struct {
ProjectId string
InstanceId string
DbName string
TableName string
SourceUri string
CsvFieldDelimiter string
}
func (source *CsvDataImpl) ImportData(ctx context.Context, infoSchema *spanner.InfoSchemaImpl, dialect string) error {
// TODO: start with single table imports
// TODO: Response code - error /success contract between gcloud and SMT
// TODO: get CSV locally. start with unchunked and later figure out chunking for larger sizes
conv := getConvObject(source.ProjectId, source.InstanceId, dialect)
batchWriter := getBatchWriterWithConfig(infoSchema.SpannerClient, conv)
err := infoSchema.PopulateSpannerSchema(ctx, conv)
if err != nil {
logger.Log.Error(fmt.Sprintf("Unable to read Spanner schema %v", err))
return err
}
tableId, err := internal.GetTableIdFromSpName(conv.SpSchema, source.TableName)
if err != nil {
logger.Log.Error(fmt.Sprintf("Table %s not found in Spanner", source.TableName))
return err
}
columnNames := []string{}
for _, v := range conv.SpSchema[tableId].ColIds {
columnNames = append(columnNames, conv.SpSchema[tableId].ColDefs[v].Name)
}
csv := csv.CsvImpl{}
err = csv.ProcessSingleCSV(conv, source.TableName, columnNames,
conv.SpSchema[tableId].ColDefs, source.SourceUri, "", rune(source.CsvFieldDelimiter[0]))
if err != nil {
return err
}
batchWriter.Flush()
return err
}
func getConvObject(projectId, instanceId, dialect string) *internal.Conv {
conv := internal.MakeConv()
conv.Audit.MigrationType = migration.MigrationData_DATA_ONLY.Enum()
conv.Audit.SkipMetricsPopulation = true
conv.Audit.DryRun = false
conv.SpDialect = dialect
conv.SpProjectId = projectId
conv.SpInstanceId = instanceId
return conv
}
func getBatchWriterWithConfig(spannerClient spannerclient.SpannerClient, conv *internal.Conv) *writer.BatchWriter {
// TODO: review these limits
config := writer.BatchWriterConfig{
BytesLimit: 100 * 1000 * 1000,
WriteLimit: 2000,
RetryLimit: 1000,
Verbose: internal.Verbose(),
}
rows := int64(0)
config.Write = func(m []*sp.Mutation) error {
ctx := context.Background()
_, err := spannerClient.Apply(ctx, m)
if err != nil {
return err
}
atomic.AddInt64(&rows, int64(len(m)))
return nil
}
batchWriter := writer.NewBatchWriter(config)
conv.SetDataMode()
conv.SetDataSink(
func(table string, cols []string, vals []interface{}) {
batchWriter.AddRow(table, cols, vals)
})
conv.DataFlush = func() {
batchWriter.Flush()
}
return batchWriter
}
func init() {
logger.Log = zap.NewNop()
}