registry/datastore/migrations/migrator.go (319 lines of code) (raw):
//go:generate mockgen -package mocks -destination mocks/migration.go . Migrator
package migrations
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/docker/distribution/registry/bbm"
"github.com/docker/distribution/registry/datastore"
migrate "github.com/rubenv/sql-migrate"
)
const (
PreDeployMigrationTableName = "schema_migrations"
PostDeployMigrationTableName = "post_deploy_schema_migrations"
dialect = "postgres"
)
var ErrBBMNotComplete = errors.New("background migration must be complete before proceeding")
// MigrationDependencyResolver determines whether a migration can be applied by verifying its pre/post deploy dependencies.
// It returns the number of dependencies successfully applied (if any) or an error if the process fails.
type MigrationDependencyResolver func(ctx context.Context, migration *Migration) (int, error)
// MigrationResult holds the outcome of a migration operation.
// It includes the count of applied Batched Background Migrations (BBMs)
// and the total number of applied schema migrations.
type MigrationResult struct {
AppliedBBMCount int
AppliedCount int
AppliedDependencyCount int
}
type Migrator interface {
Up() (MigrationResult, error)
Down() (int, error)
}
type DepMigratorImp struct {
MigratorImpl
}
var (
once sync.Once
StatusCache map[string]*MigrationStatus
)
func (s *DepMigratorImp) StatusCache() map[string]*MigrationStatus {
once.Do(func() {
StatusCache, _ = s.Status()
})
// In tests we may force reset the cache by setting it to nil.
// This allows us to re-trigger getting the latest status in case
// the old status map is stale or was corrupted by another test.
if StatusCache == nil {
StatusCache, _ = s.Status()
}
return StatusCache
}
type MigratorImpl struct {
db *datastore.DB
migrations []*Migration
bbmWorker *bbm.SyncWorker
ms migrate.MigrationSet
}
type PureMigrator interface {
Name() string
Down() (int, error)
DownN(n int) (int, error)
DownNPlan(n int) ([]string, error)
FindMigrationByID(id string) *Migration
HasPending() (bool, error)
LatestVersion() (string, error)
Reconfigure(f MigratorOption)
Status() (map[string]*MigrationStatus, error)
Up(extraCheck ...MigrationDependencyResolver) (MigrationResult, error)
UpN(n int, extraCheck ...MigrationDependencyResolver) (MigrationResult, error)
UpNPlan(n int) ([]string, error)
Version() (string, error)
}
// NewMigrator creates new Migrator.
// NOTE(prozlach): we can not use interface here because in some cases the code
// is accessing private fields of the package/object. This would need a deeper
// cleanup to make the API clean.
func NewMigrator(db *datastore.DB, opts ...MigratorOption) *MigratorImpl {
m := &MigratorImpl{
db: db,
migrations: allMigrations,
bbmWorker: bbm.NewSyncWorker(db),
}
for _, o := range opts {
o(m)
}
return m
}
// MigratorOption enables the creation of functional options for the
// configuration of the migrator.
type MigratorOption func(m *MigratorImpl)
// Source allows the migrator to use an alternative source of migrations, used
// for testing.
func Source(a []*Migration) MigratorOption {
return func(m *MigratorImpl) {
m.migrations = a
}
}
// WithBBMWorker allows the migrator to use an alternative BBM worker, used
// for testing.
func WithBBMWorker(w *bbm.SyncWorker) MigratorOption {
return func(m *MigratorImpl) {
m.bbmWorker = w
}
}
// WithTable allows the migrator to use an alternative table to track migrations.
func WithTable(tableName string) MigratorOption {
return func(m *MigratorImpl) {
m.ms = migrate.MigrationSet{TableName: tableName}
}
}
// WithMigrations allows the migrator to use an alternative set of migrations.
func WithMigrations(all []*Migration) MigratorOption {
return func(m *MigratorImpl) {
m.migrations = all
}
}
// Name of migrator.
func (*MigratorImpl) Name() string {
return ""
}
// Reconfigure is used to change the configuration of an existing Migrator
// using given config option.
func (m *MigratorImpl) Reconfigure(f MigratorOption) {
f(m)
}
// Version returns the current applied migration version (if any).
func (m *MigratorImpl) Version() (string, error) {
records, err := m.ms.GetMigrationRecords(m.db.DB, dialect)
if err != nil {
return "", err
}
if len(records) == 0 {
return "", nil
}
return records[len(records)-1].Id, nil
}
// LatestVersion identifies the version of the most recent migration in the repository (if any).
func (m *MigratorImpl) LatestVersion() (string, error) {
all, err := m.eligibleMigrations()
if err != nil {
return "", err
}
if len(all) == 0 {
return "", nil
}
return all[len(all)-1].Id, nil
}
func (m *MigratorImpl) migrate(direction migrate.MigrationDirection, limit int) (int, error) {
src, err := m.EligibleMigrationSource()
if err != nil {
return 0, err
}
return m.ms.ExecMax(m.db.DB, dialect, src, direction, limit)
}
// Up applies all pending up migrations. Returns the number of applied migrations and background migrations.
func (m *MigratorImpl) Up(extraCheck ...MigrationDependencyResolver) (MigrationResult, error) {
return m.migrateUpWithCheck(0, extraCheck...)
}
// UpN applies up to n pending up migrations. All pending migrations will be applied if n is 0. Returns the number of
// applied migrations and background migrations.
func (m *MigratorImpl) UpN(n int, extraCheck ...MigrationDependencyResolver) (MigrationResult, error) {
return m.migrateUpWithCheck(n, extraCheck...)
}
// UpNPlan plans up to n pending up migrations and returns the ordered list of migration IDs. All pending migrations
// will be planned if n is 0.
func (m *MigratorImpl) UpNPlan(n int) ([]string, error) {
return m.plan(migrate.Up, n)
}
// Down applies all pending down migrations. Returns the number of applied migrations.
func (m *MigratorImpl) Down() (int, error) {
return m.migrate(migrate.Down, 0)
}
// DownN applies up to n pending down migrations. All migrations will be applied if n is 0. Returns the number of
// applied migrations.
func (m *MigratorImpl) DownN(n int) (int, error) {
return m.migrate(migrate.Down, n)
}
// DownNPlan plans up to n pending down migrations and returns the ordered list of migration IDs. All pending migrations
// will be planned if n is 0.
func (m *MigratorImpl) DownNPlan(n int) ([]string, error) {
return m.plan(migrate.Down, n)
}
// MigrationStatus represents the status of a migration. Unknown will be set to true if a migration was applied but is
// not known by the current build.
type MigrationStatus struct {
Unknown bool
AppliedAt *time.Time
}
// Status returns the status of all migrations, indexed by migration ID.
func (m *MigratorImpl) Status() (map[string]*MigrationStatus, error) {
applied, err := m.ms.GetMigrationRecords(m.db.DB, dialect)
if err != nil {
return nil, err
}
known, err := m.allMigrations()
if err != nil {
return nil, err
}
statuses := make(map[string]*MigrationStatus, len(applied))
for _, k := range known {
statuses[k.Id] = &MigrationStatus{}
}
for _, m := range applied {
if _, ok := statuses[m.Id]; !ok {
statuses[m.Id] = &MigrationStatus{Unknown: true}
}
statuses[m.Id].AppliedAt = &m.AppliedAt
}
return statuses, nil
}
// HasPending determines whether all known migrations are applied or not.
func (m *MigratorImpl) HasPending() (bool, error) {
records, err := m.ms.GetMigrationRecords(m.db.DB, dialect)
if err != nil {
return false, err
}
eligible, err := m.eligibleMigrations()
if err != nil {
return false, err
}
for _, k := range eligible {
if !migrationApplied(records, k.Id) {
return true, nil
}
}
return false, nil
}
func (m *MigratorImpl) plan(direction migrate.MigrationDirection, limit int) ([]string, error) {
src, err := m.EligibleMigrationSource()
if err != nil {
return nil, err
}
planned, _, err := m.ms.PlanMigration(m.db.DB, dialect, src, direction, limit)
if err != nil {
return nil, err
}
result := make([]string, 0, len(planned))
for _, m := range planned {
result = append(result, m.Id)
}
return result, nil
}
func (m *MigratorImpl) allMigrations() ([]*migrate.Migration, error) {
return m.allMigrationSource().FindMigrations()
}
func (m *MigratorImpl) allMigrationSource() *migrate.MemoryMigrationSource {
src := &migrate.MemoryMigrationSource{}
for _, migration := range m.migrations {
src.Migrations = append(src.Migrations, migration.Migration)
}
return src
}
func (m *MigratorImpl) eligibleMigrations() ([]*migrate.Migration, error) {
src, err := m.EligibleMigrationSource()
if err != nil {
return nil, err
}
return src.FindMigrations()
}
func (m *MigratorImpl) EligibleMigrationSource() (*migrate.MemoryMigrationSource, error) {
src := &migrate.MemoryMigrationSource{}
for _, migration := range m.migrations {
src.Migrations = append(src.Migrations, migration.Migration)
}
return src, nil
}
func migrationApplied(records []*migrate.MigrationRecord, id string) bool {
for _, r := range records {
if r.Id == id {
return true
}
}
return false
}
func (m *MigratorImpl) FindMigrationByID(id string) *Migration {
for _, mig := range m.migrations {
if mig.Id == id {
return mig
}
}
return nil
}
// migrateUpWithCheck applies up to 'max' database migrations (0 for unlimited).
// It verifies Batched Background Migration (BBM) dependencies before each migration.
// Additionally, it allows for custom checks to be performed on each migration using MigrationDependencyResolver.
// These checks can verify if a migration depends on a pre/post migration and either run it or fail the migration.
// Returns the number of applied (pre/post) schema migrations, background migrations, or an error if any step fails.
func (m *MigratorImpl) migrateUpWithCheck(maximum int, extraCheck ...MigrationDependencyResolver) (MigrationResult, error) {
var mr MigrationResult
// Initialize a new store to manage background migrations
bbmStore := datastore.NewBackgroundMigrationStore(m.db)
// Retrieve the source of eligible migrations
src, err := m.EligibleMigrationSource()
if err != nil {
return mr, fmt.Errorf("getting eligible migration source: %w", err)
}
// Fetch the migration records that have already been applied
migrationRecords, err := m.ms.GetMigrationRecords(m.db.DB, dialect)
if err != nil {
return mr, fmt.Errorf("retrieving migration records: %w", err)
}
// no migrations have been applied means this is an new install
newInstall := len(migrationRecords) == 0
// Create a map to store applied migrations for quick lookup
migrationRecordsMap := make(map[string]struct{}, len(migrationRecords))
for _, record := range migrationRecords {
migrationRecordsMap[record.Id] = struct{}{}
}
// Retrieve and sort all available migrations by ID
sortedMigrations, err := src.FindMigrations()
if err != nil {
return mr, fmt.Errorf("finding migrations: %w", err)
}
// Map to hold all local migrations for reference during the process
localMigrationsMap := make(map[string]*Migration, len(m.migrations))
for _, migration := range m.migrations {
localMigrationsMap[migration.Id] = migration
}
ctx := context.Background()
// Iterate through each migration, applying them if necessary
for _, migration := range sortedMigrations {
// Stop if we reach the specified 'max' number of migrations
if maximum != 0 && mr.AppliedCount == maximum {
break
}
// Skip migrations that have already been applied
if _, applied := migrationRecordsMap[migration.Id]; applied {
continue
}
// Ensure all Batched Background Migrations (BBMs) are completed before applying migration
if err := m.ensureBBMsComplete(ctx, bbmStore, localMigrationsMap[migration.Id]); err != nil {
// Apply incomplete BBMs during new installations
if !errors.Is(err, ErrBBMNotComplete) || !newInstall {
// Return the error if it's not a new installation and there are incomplete BBMs
return mr, err
}
// Run the BBM worker
if err := m.bbmWorker.Run(ctx); err != nil {
return mr, err
}
// Increment the count of finished BBMs
mr.AppliedBBMCount += m.bbmWorker.FinishedMigrationCount()
}
// runs custom checks per migration
for _, check := range extraCheck {
var appliedDependentMigration int
if appliedDependentMigration, err = check(ctx, localMigrationsMap[migration.Id]); err != nil {
return mr, err
}
// Increment the count of applied migrations
mr.AppliedDependencyCount += appliedDependentMigration
}
// Apply the migration
var appliedCount int
if appliedCount, err = m.ApplyMigration(src, migration); err != nil {
return mr, err
}
// Increment the count of applied migrations
mr.AppliedCount += appliedCount
}
// Return the number of applied migrations
return mr, nil
}
// ensureBBMsComplete checks if all required Batched Background Migrations (BBMs) are complete for a schema migration.
func (*MigratorImpl) ensureBBMsComplete(ctx context.Context, bbmStore datastore.BackgroundMigrationStore, migration *Migration) error {
// If no BBMs are required, return early
if len(migration.RequiredBBMs) == 0 {
return nil
}
// Check if all required BBMs are completed
complete, err := bbmStore.AreFinished(ctx, migration.RequiredBBMs)
if err != nil {
return fmt.Errorf("checking BBM completion: %w", err)
}
// If BBMs are not complete, return an error
if !complete {
return fmt.Errorf("schema migration: %s failed on ensuring batched background migration: %v: %w", migration.Id, migration.RequiredBBMs, ErrBBMNotComplete)
}
return nil
}
// ApplyMigration applies all migration in the 'Up' direction up to `migration`.
func (m *MigratorImpl) ApplyMigration(src *migrate.MemoryMigrationSource, migration *migrate.Migration) (int, error) {
// Execute the migration and move the database schema "Up"
n, err := m.ms.ExecVersion(m.db.DB, dialect, src, migrate.Up, migration.VersionInt())
if err != nil {
return 0, fmt.Errorf("applying migration %s: %w", migration.Id, err)
}
return n, nil
}