func()

in cmd/schema_and_data.go [93:203]


func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
	// Cleanup smt tmp data directory in case residuals remain from prev runs.
	os.RemoveAll(filepath.Join(os.TempDir(), constants.SMT_TMP_DIR))
	var err error
	defer func() {
		if err != nil {
			logger.Log.Fatal("FATAL error", zap.Error(err))
		}
	}()
	err = logger.InitializeLogger(cmd.logLevel)
	if err != nil {
		fmt.Println("Error initialising logger, did you specify a valid log-level? [DEBUG, INFO, WARN, ERROR, FATAL]", err)
		return subcommands.ExitFailure
	}
	defer logger.Log.Sync()
	utils.SetDataflowTemplatePath(cmd.dataflowTemplate)
	// validate and parse source-profile, target-profile and source
	sourceProfile, targetProfile, ioHelper, dbName, err := PrepareMigrationPrerequisites(cmd.sourceProfile, cmd.targetProfile, cmd.source)
	if err != nil {
		err = fmt.Errorf("error while preparing prerequisites for migration: %v", err)
		return subcommands.ExitUsageError
	}
	if cmd.project == "" {
		getInfo := &utils.GetUtilInfoImpl{}
		cmd.project, err = getInfo.GetProject()
		if err != nil {
			logger.Log.Error("Could not get project id from gcloud environment or --project flag. Either pass the projectId in the --project flag or configure in gcloud CLI using gcloud config set", zap.Error(err))
			return subcommands.ExitUsageError
		}
	}
	if cmd.validate {
		return subcommands.ExitSuccess
	}
	schemaConversionStartTime := time.Now()

	// If filePrefix not explicitly set, use dbName as prefix.
	if cmd.filePrefix == "" {
		cmd.filePrefix = dbName
	}

	var (
		conv   *internal.Conv
		bw     *writer.BatchWriter
		banner string
		dbURI  string
	)
	convImpl := &conversion.ConvImpl{}
	ddlVerifier, err := expressions_api.NewDDLVerifierImpl(ctx, "", "")
	if err != nil {
		logger.Log.Error(fmt.Sprintf("error trying create ddl verifier: %v", err))
		return subcommands.ExitFailure
	}
	sfs := &conversion.SchemaFromSourceImpl{
		DdlVerifier: ddlVerifier,
	}
	conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, sfs)
	if err != nil {
		panic(err)
	}
	schemaCoversionEndTime := time.Now()
	conv.Audit.SchemaConversionDuration = schemaCoversionEndTime.Sub(schemaConversionStartTime)

	// Populate migration request id and migration type in conv object.
	conv.Audit.MigrationRequestId, _ = utils.GenerateName("smt-job")
	conv.Audit.MigrationRequestId = strings.Replace(conv.Audit.MigrationRequestId, "_", "-", -1)
	conv.Audit.MigrationType = migration.MigrationData_SCHEMA_AND_DATA.Enum()

	conversion.WriteSchemaFile(conv, schemaConversionStartTime, cmd.filePrefix+schemaFile, ioHelper.Out, sourceProfile.Driver)
	conversion.WriteSessionFile(conv, cmd.filePrefix+sessionFile, ioHelper.Out)
	conv.Audit.SkipMetricsPopulation = os.Getenv("SKIP_METRICS_POPULATION") == "true"
	reportImpl := conversion.ReportImpl{}
	if !cmd.dryRun {
		reportImpl.GenerateReport(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix, dbName, ioHelper.Out)
		bw, err = MigrateDatabase(ctx, cmd.project, targetProfile, sourceProfile, dbName, &ioHelper, cmd, conv, nil)
		if err != nil {
			err = fmt.Errorf("can't finish database migration for db %s: %v", dbName, err)
			return subcommands.ExitFailure
		}
		dataCoversionEndTime := time.Now()
		conv.Audit.DataConversionDuration = dataCoversionEndTime.Sub(schemaCoversionEndTime)
		banner = utils.GetBanner(schemaConversionStartTime, dbURI)

	} else {
		conv.Audit.DryRun = true
		schemaCoversionEndTime := time.Now()
		conv.Audit.SchemaConversionDuration = schemaCoversionEndTime.Sub(schemaConversionStartTime)
		// If migration type is Minimal Downtime, validate if required resources can be generated
		if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
			err := ValidateResourceGenerationHelper(ctx, cmd.project, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
			if err != nil {
				logger.Log.Error(err.Error())
				return subcommands.ExitFailure
			}
		}

		bw, err = convImpl.DataConv(ctx, cmd.project, sourceProfile, targetProfile, &ioHelper, nil, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
		if err != nil {
			err = fmt.Errorf("can't finish data conversion for db %s: %v", dbName, err)
			return subcommands.ExitFailure
		}
		dataCoversionEndTime := time.Now()
		conv.Audit.DataConversionDuration = dataCoversionEndTime.Sub(schemaCoversionEndTime)
		banner = utils.GetBanner(schemaConversionStartTime, dbName)
	}
	reportImpl.GenerateReport(sourceProfile.Driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix, dbName, ioHelper.Out)
	conversion.WriteBadData(bw, conv, banner, cmd.filePrefix+badDataFile, ioHelper.Out)

	// Cleanup smt tmp data directory.
	os.RemoveAll(filepath.Join(os.TempDir(), constants.SMT_TMP_DIR))
	return subcommands.ExitSuccess
}