cmd/import_data.go (95 lines of code) (raw):
/* Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.*/
package cmd
import (
"context"
"flag"
"fmt"
"time"
spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/import_data"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/spanner"
"github.com/google/subcommands"
"go.uber.org/zap"
)
type ImportDataCmd struct {
instanceId string
databaseName string
tableName string
sourceUri string
sourceFormat string
schemaUri string
csvLineDelimiter string
csvFieldDelimiter string
project string
}
func (cmd *ImportDataCmd) SetFlags(set *flag.FlagSet) {
set.StringVar(&cmd.instanceId, "instance-id", "", "Spanner instance Id")
set.StringVar(&cmd.databaseName, "database-name", "", "Spanner database name")
set.StringVar(&cmd.tableName, "table-name", "", "Spanner table name")
set.StringVar(&cmd.sourceUri, "source-uri", "", "URI of the file to import")
set.StringVar(&cmd.sourceFormat, "source-format", "", "Format of the file to import. Valid values {csv}")
set.StringVar(&cmd.schemaUri, "schema-uri", "", "URI of the file with schema for the csv to import. Only used for csv format.")
set.StringVar(&cmd.csvLineDelimiter, "csv-line-delimiter", "", "Token to be used as line delimiter for csv format. Defaults to '\\n'. Only used for csv format.")
set.StringVar(&cmd.csvFieldDelimiter, "csv-field-delimiter", "", "Token to be used as field delimiter for csv format. Defaults to ','. Only used for csv format.")
set.StringVar(&cmd.project, "project", "", "Project id for all resources related to this import")
}
func (cmd *ImportDataCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
logger.Log.Debug(fmt.Sprintf("instanceId %s, dbName %s, schemaUri %s\n", cmd.instanceId, cmd.databaseName, cmd.schemaUri))
dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", cmd.project, cmd.instanceId, cmd.databaseName)
infoSchema, err := spanner.NewInfoSchemaImplWithSpannerClient(ctx, dbURI, constants.DIALECT_GOOGLESQL)
if err != nil {
logger.Log.Error(fmt.Sprintf("Unable to read Spanner schema %v", err))
return subcommands.ExitFailure
}
switch cmd.sourceFormat {
case constants.CSV:
err := cmd.handleCsv(ctx, infoSchema)
if err != nil {
logger.Log.Error(fmt.Sprintf("Unable to handle Csv %v", err))
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
default:
logger.Log.Warn(fmt.Sprintf("format %s not supported yet", cmd.sourceFormat))
}
return subcommands.ExitFailure
}
func (cmd *ImportDataCmd) handleCsv(ctx context.Context, infoSchema *spanner.InfoSchemaImpl) error {
//TODO: handle POSTGRESQL
dialect := constants.DIALECT_GOOGLESQL
dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", cmd.project, cmd.instanceId, cmd.databaseName)
sp, err := spanneraccessor.NewSpannerAccessorClientImplWithSpannerClient(ctx, dbURI)
if err != nil {
logger.Log.Error(fmt.Sprintf("Unable to instantiate spanner client %v", err))
return err
}
startTime := time.Now()
csvSchema := import_data.CsvSchemaImpl{ProjectId: cmd.project, InstanceId: cmd.instanceId,
TableName: cmd.tableName, DbName: cmd.databaseName, SchemaUri: cmd.schemaUri, CsvFieldDelimiter: cmd.csvFieldDelimiter}
err = csvSchema.CreateSchema(ctx, dialect, sp)
endTime1 := time.Now()
elapsedTime := endTime1.Sub(startTime)
fmt.Println("Schema creation took ", elapsedTime.Seconds(), " secs")
if err != nil {
return err
}
csvData := import_data.CsvDataImpl{ProjectId: cmd.project, InstanceId: cmd.instanceId,
TableName: cmd.tableName, DbName: cmd.databaseName, SourceUri: cmd.sourceUri, CsvFieldDelimiter: cmd.csvFieldDelimiter}
err = csvData.ImportData(ctx, infoSchema, dialect)
endTime2 := time.Now()
elapsedTime = endTime2.Sub(endTime1)
fmt.Println("Data import took ", elapsedTime.Seconds(), " secs")
return err
}
func init() {
logger.Log, _ = zap.NewDevelopment()
}
func (cmd *ImportDataCmd) Name() string {
return "import"
}
// Synopsis returns summary of operation.
func (cmd *ImportDataCmd) Synopsis() string {
return "Import data from supported source files to spanner"
}
// Usage returns usage info of the command.
func (cmd *ImportDataCmd) Usage() string {
//TODO implement me
return fmt.Sprintf("test usage")
}