in cmd/data.go [97:203]
func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
// Cleanup smt tmp data directory in case residuals remain from prev runs.
os.RemoveAll(filepath.Join(os.TempDir(), constants.SMT_TMP_DIR))
var err error
defer func() {
if err != nil {
logger.Log.Fatal("FATAL error", zap.Error(err))
}
}()
err = logger.InitializeLogger(cmd.logLevel)
if err != nil {
fmt.Println("Error initialising logger, did you specify a valid log-level? [DEBUG, INFO, WARN, ERROR, FATAL]", err)
return subcommands.ExitFailure
}
defer logger.Log.Sync()
conv := internal.MakeConv()
utils.SetDataflowTemplatePath(cmd.dataflowTemplate)
// validate and parse source-profile, target-profile and source
sourceProfile, targetProfile, ioHelper, dbName, err := PrepareMigrationPrerequisites(cmd.sourceProfile, cmd.targetProfile, cmd.source)
if err != nil {
err = fmt.Errorf("error while preparing prerequisites for migration: %v", err)
return subcommands.ExitUsageError
}
if cmd.project == "" {
getInfo := &utils.GetUtilInfoImpl{}
cmd.project, err = getInfo.GetProject()
if err != nil {
logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err))
return subcommands.ExitUsageError
}
}
var (
bw *writer.BatchWriter
banner string
)
// Populate migration request id and migration type in conv object.
conv.Audit.MigrationRequestId, _ = utils.GenerateName("smt-job")
conv.Audit.MigrationRequestId = strings.Replace(conv.Audit.MigrationRequestId, "_", "-", -1)
conv.Audit.MigrationType = migration.MigrationData_DATA_ONLY.Enum()
conv.Audit.SkipMetricsPopulation = os.Getenv("SKIP_METRICS_POPULATION") == "true"
dataCoversionStartTime := time.Now()
if cmd.validate {
if cmd.sessionJSON == "" {
err = fmt.Errorf("cannot leave --session flag empty, please specify session file path e.g., --session=./session.json etc")
return subcommands.ExitUsageError
}
return subcommands.ExitSuccess
}
if !sourceProfile.UseTargetSchema() {
err = conversion.ReadSessionFile(conv, cmd.sessionJSON)
if err != nil {
return subcommands.ExitUsageError
}
if targetProfile.Conn.Sp.Dialect != "" && conv.SpDialect != targetProfile.Conn.Sp.Dialect {
err = fmt.Errorf("running data migration for Spanner dialect: %v, whereas schema mapping was done for dialect: %v", targetProfile.Conn.Sp.Dialect, conv.SpDialect)
return subcommands.ExitUsageError
}
}
var (
dbURI string
)
if !cmd.dryRun {
now := time.Now()
bw, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
if err != nil {
err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err)
return subcommands.ExitFailure
}
banner = utils.GetBanner(now, dbURI)
} else {
conv.Audit.DryRun = true
// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, cmd.project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
if err != nil {
return subcommands.ExitFailure
}
}
convImpl := &conversion.ConvImpl{}
bw, err = convImpl.DataConv(ctx, cmd.project, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbName, err)
return subcommands.ExitFailure
}
banner = utils.GetBanner(dataCoversionStartTime, dbName)
}
dataCoversionEndTime := time.Now()
dataCoversionDuration := dataCoversionEndTime.Sub(dataCoversionStartTime)
conv.Audit.DataConversionDuration = dataCoversionDuration
// If filePrefix not explicitly set, use dbName as prefix.
if cmd.filePrefix == "" {
cmd.filePrefix = targetProfile.Conn.Sp.Dbname
}
reportImpl := conversion.ReportImpl{}
reportImpl.GenerateReport(sourceProfile.Driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix, dbName, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, cmd.filePrefix+badDataFile, ioHelper.Out)
// Cleanup smt tmp data directory.
os.RemoveAll(filepath.Join(os.TempDir(), constants.SMT_TMP_DIR))
return subcommands.ExitSuccess
}