registry/datastore/migrations/premigrations/migrator.go (90 lines of code) (raw):
package premigrations
import (
"context"
"database/sql"
"fmt"
"github.com/docker/distribution/registry/bbm"
"github.com/docker/distribution/registry/datastore"
"github.com/docker/distribution/registry/datastore/migrations"
"github.com/hashicorp/go-multierror"
)
// PreDeployTypeName is the name of the pre-deployment migrator.
const PreDeployTypeName = "pre-deployment"
// MigratorImpl is the implementation of the pre-deployment migrator.
type MigratorImpl struct {
migrations.MigratorImpl // Embed the base MigratorImpl for common functionality.
db *sql.DB
migrations []*migrations.Migration
postMigrator *migrations.DepMigratorImp
bbmWorker *bbm.SyncWorker
skipPostDeployment bool
}
// Name returns the name of the pre-deployment migrator.
func (*MigratorImpl) Name() string {
return PreDeployTypeName
}
// NewMigrator creates a new pre-deployment migrator.
func NewMigrator(dsdb *datastore.DB, opts ...MigratorOption) *MigratorImpl {
var db *sql.DB
if dsdb != nil {
db = dsdb.DB
}
m := &MigratorImpl{
MigratorImpl: *migrations.NewMigrator(dsdb, migrations.WithTable(migrations.PreDeployMigrationTableName), migrations.WithMigrations(migrations.AllPreMigrations())),
db: db,
migrations: migrations.AllPreMigrations(),
bbmWorker: bbm.NewSyncWorker(dsdb),
postMigrator: &migrations.DepMigratorImp{
MigratorImpl: *migrations.NewMigrator(dsdb, migrations.WithTable(migrations.PostDeployMigrationTableName), migrations.WithMigrations(migrations.AllPostMigrations())),
},
}
for _, o := range opts {
o(m)
}
return m
}
// MigratorOption is a functional option for the configuration of the pre-deployment migrator.
type MigratorOption func(m *MigratorImpl)
// SkipPostDeployment configures the pre-deployment migrator to not apply post-deployment migrations.
func SkipPostDeployment() MigratorOption {
return func(m *MigratorImpl) {
m.skipPostDeployment = true
}
}
// Up applies all pending up migrations. Returns the number of applied migrations and background migrations.
func (m *MigratorImpl) Up(_ ...migrations.MigrationDependencyResolver) (migrations.MigrationResult, error) {
return m.MigratorImpl.Up(postDeployCheckFunc(m))
}
// UpN applies up to n pending up migrations. All pending migrations will be applied if n is 0. Returns the number of
// applied migrations and background migrations.
func (m *MigratorImpl) UpN(n int, _ ...migrations.MigrationDependencyResolver) (migrations.MigrationResult, error) {
return m.MigratorImpl.UpN(n, postDeployCheckFunc(m))
}
// postDeployCheckFunc checks if all required post-deployment migrations for a given migration are completed.
func postDeployCheckFunc(m *MigratorImpl) migrations.MigrationDependencyResolver {
spotApplyRequiredMigration := !m.skipPostDeployment
return func(_ context.Context, migration *migrations.Migration) (int, error) {
// If no migrations are required, return early
if len(migration.RequiredPostDeploy) == 0 {
return 0, nil
}
// Check if all required migrations are completed
statuses := m.postMigrator.StatusCache()
var depErrs *multierror.Error
var appliedPostDeployCount int
for _, id := range migration.RequiredPostDeploy {
val, found := statuses[id]
if !found {
return appliedPostDeployCount, fmt.Errorf("post-deploy migration %s does not exist in migration source", id)
}
if val.AppliedAt != nil {
continue
}
if !spotApplyRequiredMigration {
depErrs = multierror.Append(depErrs, fmt.Errorf("required post-deploy schema migration %s not yet applied and can not be skipped", id))
continue
}
// only try to apply a post migration if there are no dependency errors
if depErrs.ErrorOrNil() == nil {
// run post deployment migrations up till the point point of the required migration id (inclusive)
mig := m.postMigrator.FindMigrationByID(id)
src, err := m.postMigrator.EligibleMigrationSource()
if err != nil {
return appliedPostDeployCount, fmt.Errorf("getting eligible migration source: %w", err)
}
n, err := m.postMigrator.ApplyMigration(src, mig.Migration)
if err != nil {
return appliedPostDeployCount, fmt.Errorf("applying post-deploy migration up to migration %s: %w", migration.Id, err)
}
appliedPostDeployCount += n
}
}
return appliedPostDeployCount, depErrs.ErrorOrNil()
}
}