registry/root.go (693 lines of code) (raw):

package registry import ( "errors" "fmt" "io" "net/http" "os" "regexp" "sort" "strconv" "strings" "time" "github.com/docker/distribution/configuration" dcontext "github.com/docker/distribution/context" "github.com/docker/distribution/internal/feature" "github.com/docker/distribution/registry/bbm" "github.com/docker/distribution/registry/datastore" "github.com/docker/distribution/registry/datastore/migrations" "github.com/docker/distribution/registry/datastore/migrations/postmigrations" "github.com/docker/distribution/registry/datastore/migrations/premigrations" "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/factory" "github.com/docker/distribution/version" "github.com/docker/libtrust" "github.com/olekukonko/tablewriter" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" ) func init() { RootCmd.AddCommand(ServeCmd) RootCmd.AddCommand(GCCmd) RootCmd.AddCommand(DBCmd) RootCmd.AddCommand(BBMCmd) RootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "show the version and exit") GCCmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "do everything except remove the blobs") GCCmd.Flags().BoolVarP(&removeUntagged, "delete-untagged", "m", false, "delete manifests that are not currently referenced via tag") GCCmd.Flags().StringVarP(&debugAddr, "debug-server", "s", "", "run a pprof debug server at <address:port>") MigrateCmd.AddCommand(MigrateVersionCmd) MigrateStatusCmd.Flags().BoolVarP(&upToDateCheck, "up-to-date", "u", false, "check if all known migrations are applied") MigrateStatusCmd.Flags().BoolVarP(&SkipPostDeployment, "skip-post-deployment", "s", false, "ignore post deployment migrations") MigrateStatusCmd.PreRunE = setBoolFlagWithEnv("SKIP_POST_DEPLOYMENT_MIGRATIONS", "skip-post-deployment") MigrateCmd.AddCommand(MigrateStatusCmd) MigrateUpCmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "do not commit changes to the database") MigrateUpCmd.Flags().VarP(nullableInt{&MaxNumPreMigrations}, "limit", "n", "limit the number of migrations (all by default)") MigrateUpCmd.Flags().VarP(nullableInt{&MaxNumPostMigrations}, "post-deploy-limit", "p", "limit the number of post-deploy migrations (all by default)") MigrateUpCmd.Flags().BoolVarP(&SkipPostDeployment, "skip-post-deployment", "s", false, "do not apply post deployment migrations") MigrateUpCmd.PreRunE = setBoolFlagWithEnv("SKIP_POST_DEPLOYMENT_MIGRATIONS", "skip-post-deployment") MigrateCmd.AddCommand(MigrateUpCmd) MigrateDownCmd.Flags().BoolVarP(&Force, "force", "f", false, "no confirmation message") MigrateDownCmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "do not commit changes to the database") MigrateDownCmd.Flags().VarP(nullableInt{&MaxNumPreMigrations}, "limit", "n", "limit the number of migrations (all by default)") MigrateDownCmd.Flags().VarP(nullableInt{&MaxNumPostMigrations}, "post-deploy-limit", "p", "limit the number of post-deploy migrations (all by default)") MigrateCmd.AddCommand(MigrateDownCmd) DBCmd.AddCommand(MigrateCmd) DBCmd.AddCommand(ImportCmd) ImportCmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "do not commit changes to the database") ImportCmd.Flags().BoolVarP(&rowCount, "row-count", "c", false, "count and log number of rows across relevant database tables on (pre)import completion") ImportCmd.Flags().BoolVarP(&preImport, "pre-import", "p", false, "import immutable repository-scoped data to speed up a following import") ImportCmd.Flags().BoolVarP(&preImport, "step-one", "1", false, "perform step one of a multi-step import: alias for `pre-import`") ImportCmd.Flags().BoolVarP(&importAllRepos, "all-repositories", "r", false, "import all repository-scoped data") ImportCmd.Flags().BoolVarP(&importAllRepos, "step-two", "2", false, "perform step two of a multi-step import: alias for `all-repositories`") ImportCmd.Flags().BoolVarP(&importCommonBlobs, "common-blobs", "B", false, "import all blob metadata from common storage") ImportCmd.Flags().BoolVarP(&importCommonBlobs, "step-three", "3", false, "perform step three of a multi-step import: alias for `common-blobs`") ImportCmd.Flags().BoolVarP(&logToSTDOUT, "log-to-stdout", "l", false, "write detailed log to std instead of showing progress bars") ImportCmd.Flags().BoolVarP(&dynamicMediaTypes, "dynamic-media-types", "m", true, "record unknown media types during import") ImportCmd.Flags().StringVarP(&debugAddr, "debug-server", "s", "", "run a pprof debug server at <address:port>") ImportCmd.Flags().VarP(nullableInt{&tagConcurrency}, "tag-concurrency", "t", "limit the number of tags to retrieve concurrently, only applicable on gcs backed storage") BBMCmd.AddCommand(BBMStatusCmd) BBMCmd.AddCommand(BBMPauseCmd) BBMCmd.AddCommand(BBMResumeCmd) BBMCmd.AddCommand(BBMRunCmd) BBMRunCmd.Flags().VarP(nullableInt{&maxBBMJobRetry}, "max-job-retry", "r", "Set the maximum number of job retry attempts (default 2, must be between 1 and 10)") RootCmd.SetFlagErrorFunc(func(c *cobra.Command, err error) error { return fmt.Errorf("%w\n\n%s", err, c.UsageString()) }) viper.AutomaticEnv() } // Command flag vars var ( debugAddr string dryRun bool Force bool MaxNumPreMigrations *int MaxNumPostMigrations *int removeUntagged bool showVersion bool SkipPostDeployment bool upToDateCheck bool preImport bool rowCount bool importCommonBlobs bool importAllRepos bool tagConcurrency *int logToSTDOUT bool dynamicMediaTypes bool maxBBMJobRetry *int ) var parallelwalkKey = "parallelwalk" // nullableInt implements spf13/pflag#Value as a custom nullable integer to capture spf13/cobra command flags. // https://pkg.go.dev/github.com/spf13/pflag?tab=doc#Value type nullableInt struct { ptr **int } // setBoolFlagWithEnv binds a boolean flag to an environment variable and overrides the flag if the env var is set. // It returns an error if the binding or setting fails. func setBoolFlagWithEnv(envVarKey, flagName string) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, _ []string) error { if err := viper.BindPFlag(envVarKey, cmd.Flags().Lookup(flagName)); err != nil { return fmt.Errorf("error binding env var %q to flag %q: %w", envVarKey, flagName, err) } if !cmd.Flags().Changed(flagName) { if viper.IsSet(envVarKey) { value := viper.GetBool(envVarKey) if err := cmd.Flags().Set(flagName, strconv.FormatBool(value)); err != nil { return fmt.Errorf("error setting flag %q from env var %q: %w", flagName, envVarKey, err) } } } return nil } } func (f nullableInt) String() string { if *f.ptr == nil { return "0" } return strconv.Itoa(**f.ptr) } func (nullableInt) Type() string { return "int" } func (f nullableInt) Set(s string) error { v, err := strconv.Atoi(s) if err != nil { return err } *f.ptr = &v return nil } // RootCmd is the main command for the 'registry' binary. var RootCmd = &cobra.Command{ Use: "registry", Short: "`registry`", Long: "`registry`", SilenceErrors: true, SilenceUsage: true, RunE: func(cmd *cobra.Command, _ []string) error { if showVersion { version.PrintVersion() return nil } return cmd.Usage() }, } // GCCmd is the cobra command that corresponds to the garbage-collect subcommand var GCCmd = &cobra.Command{ Use: "garbage-collect <config>", Short: "`garbage-collect` deletes layers not referenced by any manifests", Long: "`garbage-collect` deletes layers not referenced by any manifests", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args) if err != nil { return fmt.Errorf("configuration error: %w", err) } if config.Database.Enabled { return errors.New("the garbage-collect command is not compatible with database metadata, please use online garbage collection instead") } ctx := dcontext.Background() ctx, err = configureLogging(ctx, config) if err != nil { return fmt.Errorf("unable to configure logging with config: %w", err) } maxParallelManifestGets := 1 parameters := config.Storage.Parameters() if v, ok := (parameters[parallelwalkKey]).(bool); ok && v { maxParallelManifestGets = 10 } parameters[driver.ParamLogger] = dcontext.GetLogger(ctx) driver, err := factory.Create(config.Storage.Type(), parameters) if err != nil { return fmt.Errorf("failed to construct %s driver: %w", config.Storage.Type(), err) } dcontext.GetLogger(ctx).Debugf("getting a maximum of %d manifests in parallel per repository during the mark phase", maxParallelManifestGets) k, err := libtrust.GenerateECP256PrivateKey() if err != nil { return fmt.Errorf("generating ECP256 private key: %w", err) } registry, err := storage.NewRegistry(ctx, driver, storage.Schema1SigningKey(k)) if err != nil { return fmt.Errorf("failed to construct registry: %w", err) } if debugAddr != "" { go func() { dcontext.GetLoggerWithField(ctx, "address", debugAddr).Info("debug server listening") // nolint: gosec // this is just a debug server if err := http.ListenAndServe(debugAddr, nil); err != nil { dcontext.GetLoggerWithField(ctx, "error", err).Fatal("error listening on debug interface") } }() } err = storage.MarkAndSweep(ctx, driver, registry, storage.GCOpts{ DryRun: dryRun, RemoveUntagged: removeUntagged, MaxParallelManifestGets: maxParallelManifestGets, }) if err != nil { return fmt.Errorf("failed to garbage collect: %w", err) } return nil }, } // DBCmd is the root of the `database` command. var DBCmd = &cobra.Command{ Use: "database", Short: "Manages the registry metadata database", Long: "Manages the registry metadata database", RunE: func(cmd *cobra.Command, _ []string) error { return cmd.Usage() }, } // MigrateCmd is the `migrate` sub-command of `database` that manages database migrations. var MigrateCmd = &cobra.Command{ Use: "migrate", Short: "Manage migrations", Long: "Manage migrations", RunE: func(cmd *cobra.Command, _ []string) error { return cmd.Usage() }, } var MigrateUpCmd = &cobra.Command{ Use: "up", Short: "Apply up migrations", Long: "Apply up migrations", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } // Handle cases where no migration limits are set. var skipNonRequredPreDeployment, skipNonRequredPostDeployment bool switch { case MaxNumPostMigrations == nil && MaxNumPreMigrations == nil: var all int MaxNumPostMigrations = &all MaxNumPreMigrations = &all case MaxNumPostMigrations == nil && MaxNumPreMigrations != nil: skipNonRequredPostDeployment = true case MaxNumPreMigrations == nil && MaxNumPostMigrations != nil: skipNonRequredPreDeployment = true case *MaxNumPreMigrations < 1 || *MaxNumPostMigrations < 1: return errors.New("both pre and post migration limits must be greater than or equal to 1") } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } // Initialize the list of migrators based on the migration settings. var m []migrations.PureMigrator if SkipPostDeployment && !skipNonRequredPreDeployment { m = append(m, premigrations.NewMigrator(db, premigrations.SkipPostDeployment())) } // Add pre-deployment or post-deployment migrators as needed. if !SkipPostDeployment { // if only one type of limit is specified only do the migration pertaining to the specified limit parameter (and any enforced migrations) if skipNonRequredPreDeployment { m = append(m, postmigrations.NewMigrator(db)) } else if skipNonRequredPostDeployment { m = append(m, premigrations.NewMigrator(db)) } else { m = append(m, premigrations.NewMigrator(db), postmigrations.NewMigrator(db)) } } // Track applied migration counts and execution time. var ( totalPostDeployApllied, totalPreDeployApllied, totalBBMApllied int totalMigrationTime time.Duration ) for _, mig := range m { // Determine the max number of migrations to apply based on type. var maxNumMigrations int if mig.Name() == postmigrations.PostDeployTypeName { maxNumMigrations = *MaxNumPostMigrations } else { maxNumMigrations = *MaxNumPreMigrations } // Generate a plan for migrations to be applied. // Note: Currently, the plan does not reveal dependent migrations (e.g., post-deployment or background migrations). // It only shows direct migrations of the selected type. Adding dependency visibility is complex and currently not implemented. plan, err := mig.UpNPlan(maxNumMigrations) if err != nil { return fmt.Errorf("failed to prepare Up plan: %w", err) } if len(plan) > 0 { fmt.Printf("%s:\n%s\n", mig.Name(), strings.Join(plan, "\n")) } if !dryRun { start := time.Now() mr, err := mig.UpN(maxNumMigrations) if err != nil { return fmt.Errorf("failed to run database migrations: %w", err) } totalMigrationTime += time.Since(start) // Track the number of applied migrations based on type. if mig.Name() == postmigrations.PostDeployTypeName { totalPostDeployApllied += mr.AppliedCount totalPreDeployApllied += mr.AppliedDependencyCount } else { totalPostDeployApllied += mr.AppliedDependencyCount totalPreDeployApllied += mr.AppliedCount } totalBBMApllied += mr.AppliedBBMCount } } if !dryRun { fmt.Printf("OK: applied %d pre-deployment migration(s), %d post-deployment migration(s) and %d background migration(s) in %.3fs\n", totalPreDeployApllied, totalPostDeployApllied, totalBBMApllied, totalMigrationTime.Seconds()) } return nil }, } var MigrateDownCmd = &cobra.Command{ Use: "down", Short: "Apply down migrations", Long: "Apply down migrations", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } // Handle cases where no migration limits are set. var skipNonRequredPreDeployment, skipNonRequredPostDeployment bool switch { case MaxNumPostMigrations == nil && MaxNumPreMigrations == nil: var all int MaxNumPostMigrations = &all MaxNumPreMigrations = &all case MaxNumPostMigrations == nil && MaxNumPreMigrations != nil: skipNonRequredPostDeployment = true case MaxNumPreMigrations == nil && MaxNumPostMigrations != nil: skipNonRequredPreDeployment = true case *MaxNumPreMigrations < 1 || *MaxNumPostMigrations < 1: return errors.New("both pre and post migration limits must be greater than or equal to 1") } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } // Initialize the list of migrators based on the migration settings. var m []migrations.PureMigrator if SkipPostDeployment && !skipNonRequredPreDeployment { m = append(m, premigrations.NewMigrator(db, premigrations.SkipPostDeployment())) } // Add pre-deployment or post-deployment migrators as needed. if !SkipPostDeployment { if skipNonRequredPreDeployment { m = append(m, postmigrations.NewMigrator(db)) } else if skipNonRequredPostDeployment { m = append(m, premigrations.NewMigrator(db)) } else { m = append(m, postmigrations.NewMigrator(db), premigrations.NewMigrator(db)) } } // Determine the max number of migrations to remove based on type. for _, mig := range m { var maxNumMigrations int if mig.Name() == postmigrations.PostDeployTypeName { maxNumMigrations = *MaxNumPostMigrations } else { maxNumMigrations = *MaxNumPreMigrations } // Generate a plan for migrations to be removed. // Note: Currently, the plan does not reveal dependent migrations (e.g., post-deployment or background migrations). // It only shows direct migrations of the selected type. Adding dependency visibility is complex and currently not implemented. plan, err := mig.DownNPlan(maxNumMigrations) if err != nil { return fmt.Errorf("failed to prepare Down plan: %w", err) } if len(plan) > 0 { fmt.Printf("%s:\n%s\n", mig.Name(), strings.Join(plan, "\n")) } if !dryRun && len(plan) > 0 { if !Force { var response string _, _ = fmt.Print("Preparing to apply the above down migrations. Are you sure? [y/N] ") _, err := fmt.Scanln(&response) if err != nil && errors.Is(err, io.EOF) { return fmt.Errorf("failed to scan user input: %w", err) } if !regexp.MustCompile(`(?i)^y(es)?$`).MatchString(response) { return nil } } start := time.Now() n, err := mig.DownN(maxNumMigrations) if err != nil { return fmt.Errorf("failed to run database migrations: %w", err) } fmt.Printf("OK: applied %d %s migration(s) in %.3fs\n", n, mig.Name(), time.Since(start).Seconds()) } } return nil }, } // MigrateVersionCmd is the `version` sub-command of `database migrate` that shows the current migration version. var MigrateVersionCmd = &cobra.Command{ Use: "version", Short: "Show current migration version", Long: "Show current migration version", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } var m []migrations.PureMigrator if SkipPostDeployment { m = append(m, premigrations.NewMigrator(db)) } if !SkipPostDeployment { m = append(m, premigrations.NewMigrator(db), postmigrations.NewMigrator(db)) } for _, mig := range m { v, err := mig.Version() if err != nil { return fmt.Errorf("failed to detect database version: %w", err) } if v == "" { v = "Unknown" } fmt.Printf("%s:%s\n", mig.Name(), v) } return nil }, } // MigrateStatusCmd is the `status` sub-command of `database migrate` that shows the migrations status. var MigrateStatusCmd = &cobra.Command{ Use: "status", Short: "Show migration status", Long: "Show migration status", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } var m []migrations.PureMigrator if SkipPostDeployment { m = append(m, premigrations.NewMigrator(db)) } if !SkipPostDeployment { m = append(m, premigrations.NewMigrator(db), postmigrations.NewMigrator(db)) } for _, mig := range m { statuses, err := mig.Status() if err != nil { return fmt.Errorf("failed to detect database status: %w", err) } if upToDateCheck { upToDate := true for _, s := range statuses { if s.AppliedAt == nil { if !SkipPostDeployment { upToDate = false break } } } _, err = fmt.Println(upToDate) if err != nil { return fmt.Errorf("printing line: %w", err) } return nil } table := tablewriter.NewWriter(os.Stdout) table.SetHeader([]string{"Migration", "Applied"}) table.SetColWidth(80) // Display table rows sorted by migration ID var ids []string for id := range statuses { ids = append(ids, id) } sort.Strings(ids) for _, id := range ids { name := id if statuses[id].Unknown { name += " (unknown)" } var appliedAt string if statuses[id].AppliedAt != nil { appliedAt = statuses[id].AppliedAt.String() } table.Append([]string{name, appliedAt}) } _, _ = fmt.Println(mig.Name()) table.Render() } return nil }, } // ImportCmd is the `import` sub-command of `database` that imports metadata from the filesystem into the database. var ImportCmd = &cobra.Command{ Use: "import", Short: "Import filesystem metadata into the database", Long: "Import filesystem metadata into the database.\n" + "Untagged manifests are not imported.\n " + "This tool can not be used with the parallelwalk storage configuration enabled.", RunE: func(_ *cobra.Command, args []string) error { // Ensure no more than one step flag is set. if preImport && (importAllRepos || importCommonBlobs) { return errors.New("steps two or three can't be used with step one") } if importAllRepos && importCommonBlobs { return errors.New("step three can't be used with step two") } config, err := resolveConfiguration(args) if err != nil { return fmt.Errorf("configuration error: %w", err) } if tagConcurrency != nil && (*tagConcurrency < 1 || *tagConcurrency > 5) { return errors.New("tag-concurrency must be between 1 and 5") } ctx := dcontext.Background() ctx, err = configureLogging(ctx, config) if err != nil { return fmt.Errorf("unable to configure logging with config: %w", err) } parameters := config.Storage.Parameters() if val, ok := parameters[parallelwalkKey].(bool); ok && val { parameters[parallelwalkKey] = false logrus.Info("the 'parallelwalk' configuration parameter has been disabled") } parameters[driver.ParamLogger] = dcontext.GetLogger(ctx) driver, err := factory.Create(config.Storage.Type(), parameters) if err != nil { return fmt.Errorf("failed to construct %s driver: %w", config.Storage.Type(), err) } k, err := libtrust.GenerateECP256PrivateKey() if err != nil { return fmt.Errorf("generatibng ECP256 private key") } registry, err := storage.NewRegistry(ctx, driver, storage.Schema1SigningKey(k)) if err != nil { return fmt.Errorf("failed to construct registry: %w", err) } db, err := dbFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } m := migrations.NewMigrator(db) pending, err := m.HasPending() if err != nil { return fmt.Errorf("failed to check database migrations status: %w", err) } if pending { return errors.New("there are pending database migrations, use the 'registry database migrate' CLI " + "command to check and apply them") } if debugAddr != "" { go func() { dcontext.GetLoggerWithField(ctx, "address", debugAddr).Info("debug server listening") // nolint: gosec // this is just a debug server if err := http.ListenAndServe(debugAddr, nil); err != nil { dcontext.GetLogger(ctx).WithError(err).Fatal("error listening on debug interface") } }() } var opts []datastore.ImporterOption if dryRun { opts = append(opts, datastore.WithDryRun) } if rowCount { opts = append(opts, datastore.WithRowCount) } if tagConcurrency != nil { if config.Storage.Type() != "gcs" { return errors.New("the tag concurrency option is only compatible with a gcs backed registry storage") } opts = append(opts, datastore.WithTagConcurrency(*tagConcurrency)) } if !logToSTDOUT { opts = append(opts, datastore.WithProgressBar) } err = os.Setenv(feature.DynamicMediaTypes.EnvVariable, strconv.FormatBool(dynamicMediaTypes)) if err != nil { return fmt.Errorf("failed to set environment variable %s: %w", feature.DynamicMediaTypes.EnvVariable, err) } p := datastore.NewImporter(db, registry, opts...) switch { case preImport: err = p.PreImportAll(ctx) case importAllRepos: err = p.ImportAllRepositories(ctx) case importCommonBlobs: err = p.ImportBlobs(ctx) default: err = p.FullImport(ctx) } if err != nil { return fmt.Errorf("failed to import metadata: %w", err) } return nil }, } // BBMCmd is the cobra command that corresponds to the background-migrate subcommand var BBMCmd = &cobra.Command{ Use: "background-migrate <config> {status|pause|run}", Short: "Manage batched background migrations", Long: "Manage batched background migrations", RunE: func(cmd *cobra.Command, _ []string) error { return cmd.Usage() }, } // BBMStatusCmd is the `status` sub-command of `background-migrate` that shows the batched background migrations status. var BBMStatusCmd = &cobra.Command{ Use: "status <config>", Short: "Show the current status of all batched background migrations", Long: "Show the current status of all batched background migrations.", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } bbmw := bbm.NewWorker(nil, bbm.WithDB(db)) bbMigrations, err := bbmw.AllMigrations(dcontext.Background()) if err != nil { return fmt.Errorf("failed to fetch background migrations: %w", err) } table := tablewriter.NewWriter(os.Stdout) table.SetHeader([]string{"Batched Background Migration", "Status"}) table.SetColWidth(80) // Display table rows for _, bbm := range bbMigrations { table.Append([]string{bbm.Name, bbm.Status.String()}) } table.Render() return nil }, } // BBMPauseCmd is the `pause` sub-command of `background-migrate` that pauses a batched background migrations. var BBMPauseCmd = &cobra.Command{ Use: "pause <config>", Short: "Pause all running or active batched background migrations", Long: "Pause all running or active batched background migrations", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } bbmw := bbm.NewWorker(nil, bbm.WithDB(db)) err = bbmw.PauseEligibleMigrations(dcontext.Background()) if err != nil { return fmt.Errorf("failed to pause background migrations: %w", err) } return nil }, } // BBMResumeCmd is the `resume` sub-command of `background-migrate` that resumes all previously paused batched background migrations. var BBMResumeCmd = &cobra.Command{ Use: "resume", Short: "Resume all paused batched background migrations", Long: "Resume all paused batched background migrations", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } bbmw := bbm.NewWorker(nil, bbm.WithDB(db)) err = bbmw.ResumeEligibleMigrations(dcontext.Background()) if err != nil { return fmt.Errorf("failed to resume background migrations: %w", err) } return nil }, } // BBMRunCmd is the `run` sub-command of `background-migrate` that runs unfinished background migration. var BBMRunCmd = &cobra.Command{ Use: "run <config> [--max-job-retry <n>]", Short: "Run all unfinished batched background migrations", Long: "Run all unfinished batched background migrations", RunE: func(_ *cobra.Command, args []string) error { config, err := resolveConfiguration(args, configuration.WithoutStorageValidation()) if err != nil { return fmt.Errorf("configuration error: %w", err) } db, err := migrationDBFromConfig(config) if err != nil { return fmt.Errorf("failed to construct database connection: %w", err) } // Set default max job retry if not set, and validate its range if maxBBMJobRetry == nil { defaultBBMJobRetry := 2 maxBBMJobRetry = &defaultBBMJobRetry } else if *maxBBMJobRetry < 1 || *maxBBMJobRetry > 10 { return errors.New("limit must be greater than 0 and less than 10") } // Create a new sync worker with the database and max job attempt options, and run it wk := bbm.NewSyncWorker(db, bbm.WithSyncMaxJobAttempt(*maxBBMJobRetry)) // Unpause any paused background migrations so they can be processed by the worker in `run` below err = wk.ResumeEligibleMigrations(dcontext.Background()) if err != nil { return fmt.Errorf("failed to resume background migrations: %w", err) } retryRunInterval := 10 * time.Second for { err := wk.Run(dcontext.Background()) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "running background migrations failed: %v\n", err) // keep retrying to run at a fixed interval until user stops command. _, _ = fmt.Fprintf(os.Stdout, "retrying run in %v...\n", retryRunInterval) time.Sleep(retryRunInterval) continue } break } return nil }, }