registry/handlers/app.go (1,648 lines of code) (raw):

package handlers import ( "bytes" "context" cryptorand "crypto/rand" "crypto/tls" "database/sql" "encoding/json" "errors" "expvar" "fmt" "io" "math/rand/v2" "net" "net/http" "net/url" "os" "os/signal" "regexp" "strconv" "strings" "syscall" "time" "github.com/benbjohnson/clock" "github.com/docker/distribution" "github.com/docker/distribution/configuration" dcontext "github.com/docker/distribution/context" "github.com/docker/distribution/health" "github.com/docker/distribution/health/checks" "github.com/docker/distribution/internal/feature" dlog "github.com/docker/distribution/log" prometheus "github.com/docker/distribution/metrics" "github.com/docker/distribution/notifications" "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/errcode" v1 "github.com/docker/distribution/registry/api/gitlab/v1" "github.com/docker/distribution/registry/api/urls" v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/auth" "github.com/docker/distribution/registry/bbm" "github.com/docker/distribution/registry/datastore" "github.com/docker/distribution/registry/datastore/migrations/premigrations" "github.com/docker/distribution/registry/datastore/models" "github.com/docker/distribution/registry/gc" "github.com/docker/distribution/registry/gc/worker" "github.com/docker/distribution/registry/internal" redismetrics "github.com/docker/distribution/registry/internal/metrics/redis" iredis "github.com/docker/distribution/registry/internal/redis" registrymiddleware "github.com/docker/distribution/registry/middleware/registry" repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" "github.com/docker/distribution/registry/storage" memorycache "github.com/docker/distribution/registry/storage/cache/memory" rediscache "github.com/docker/distribution/registry/storage/cache/redis" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/factory" storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" "github.com/docker/distribution/registry/storage/validation" "github.com/docker/distribution/testutil" "github.com/docker/distribution/version" "github.com/getsentry/sentry-go" "github.com/gorilla/mux" "github.com/hashicorp/go-multierror" promclient "github.com/prometheus/client_golang/prometheus" "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/labkit/correlation" "gitlab.com/gitlab-org/labkit/errortracking" metricskit "gitlab.com/gitlab-org/labkit/metrics" ) // randomSecretSize is the number of random bytes to generate if no secret // was specified. const randomSecretSize = 32 // defaultCheckInterval is the default time in between health checks const defaultCheckInterval = 10 * time.Second // defaultDBCheckTimeout is the default timeout for DB connection checks. Chosen // arbitrary const defaultDBCheckTimeout = 5 * time.Second // redisCacheTTL is the global expiry duration for objects cached in Redis. const redisCacheTTL = 6 * time.Hour // redisPingTimeout is the timeout applied to the connection health check performed when creating a Redis client. // TODO: reduce this as part of https://gitlab.com/gitlab-org/container-registry/-/issues/1530 const redisPingTimeout = 1 * time.Second var ( // ErrFilesystemInUse is returned when the registry attempts to start with an existing filesystem-in-use lockfile // and the database is also enabled. ErrFilesystemInUse = errors.New(`registry filesystem metadata in use, please import data before enabling the database, see https://docs.gitlab.com/ee/administration/packages/container_registry_metadata_database.html#existing-registries`) // ErrDatabaseInUse is returned when the registry attempts to start with an existing database-in-use lockfile // and the database is disabled. ErrDatabaseInUse = errors.New(`registry metadata database in use, please enable the database https://docs.gitlab.com/ee/administration/packages/container_registry_metadata_database.html`) ) type shutdownFunc func(app *App, errCh chan error, l dlog.Logger) // App is a global registry application object. Shared resources can be placed // on this object that will be accessible from all requests. Any writable // fields should be protected. type App struct { context.Context Config *configuration.Configuration router *metaRouter // router dispatcher while we consolidate into the new router driver storagedriver.StorageDriver // driver maintains the app global storage driver instance. db datastore.LoadBalancer // db is the global database handle used across the app. registry distribution.Namespace // registry is the primary registry backend for the app instance. repoRemover distribution.RepositoryRemover // repoRemover provides ability to delete repos accessController auth.AccessController // main access controller for application // httpHost is a parsed representation of the http.host parameter from // the configuration. Only the Scheme and Host fields are used. httpHost url.URL // events contains notification related configuration. events struct { sink notifications.Sink source notifications.SourceRecord } redisBlobDesc redis.UniversalClient // readOnly is true if the registry is in a read-only maintenance mode readOnly bool manifestURLs validation.ManifestURLs manifestRefLimit int manifestPayloadSizeLimit int // redisCache is the abstraction for manipulating cached data on Redis. redisCache *iredis.Cache // redisLBCache is the abstraction for the database load balancing Redis cache. redisLBCache *iredis.Cache // rateLimiters expects a slice of ordered limiters by precedence // see configureRateLimiters for implementation details. rateLimiters []RateLimiter healthRegistry *health.Registry // shutdownFuncs is the slice of functions/code that needs to be called // during app object termination in order to gracefully clean up // resources/terminate goroutines shutdownFuncs []shutdownFunc } // NewApp takes a configuration and returns a configured app, ready to serve // requests. The app only implements ServeHTTP and can be wrapped in other // handlers accordingly. func NewApp(ctx context.Context, config *configuration.Configuration) (*App, error) { app := &App{ Config: config, Context: ctx, shutdownFuncs: make([]shutdownFunc, 0), } if err := app.initMetaRouter(); err != nil { return nil, fmt.Errorf("initializing metaRouter: %w", err) } storageParams := config.Storage.Parameters() if storageParams == nil { storageParams = make(configuration.Parameters) } log := dcontext.GetLogger(app) storageParams[storagedriver.ParamLogger] = log var err error app.driver, err = factory.Create(config.Storage.Type(), storageParams) if err != nil { return nil, err } purgeConfig := uploadPurgeDefaultConfig() if mc, ok := config.Storage["maintenance"]; ok { if v, ok := mc["uploadpurging"]; ok { purgeConfig, ok = v.(map[any]any) if !ok { return nil, fmt.Errorf("uploadpurging config key must contain additional keys") } } if v, ok := mc["readonly"]; ok { readOnly, ok := v.(map[any]any) if !ok { return nil, fmt.Errorf("readonly config key must contain additional keys") } // nolint: revive // max-control-nesting if readOnlyEnabled, ok := readOnly["enabled"]; ok { app.readOnly, ok = readOnlyEnabled.(bool) if !ok { return nil, fmt.Errorf("readonly's enabled config key must have a boolean value") } if app.readOnly { if enabled, ok := purgeConfig["enabled"].(bool); ok && enabled { log.Info("disabled upload purging in readonly mode") purgeConfig["enabled"] = false } } } } } if err := startUploadPurger(app, app.driver, log, purgeConfig); err != nil { return nil, err } app.driver, err = applyStorageMiddleware(app.driver, config.Middleware["storage"]) if err != nil { return nil, err } if err := app.configureSecret(config); err != nil { return nil, err } app.configureEvents(config) if err := app.configureRedisBlobDesc(ctx, config); err != nil { return nil, err } if err := app.configureRedisCache(ctx, config); err != nil { // Because the Redis cache is not a strictly required dependency (data will be served from the metadata DB if // we're unable to serve or find it in cache) we simply log and report a failure here and proceed to not prevent // the app from starting. log.WithError(err).Error("failed configuring Redis cache") errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) } if err := app.configureRedisRateLimiter(ctx, config); err != nil { // Redis rate-limiter is not a strictly required dependency, we simply log and report a failure here // and proceed to not prevent the app from starting. log.WithError(err).Error("failed configuring Redis rate-limiter") errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) return nil, err } options := registrymiddleware.GetRegistryOptions() // TODO: Once schema1 code is removed throughout the registry, we will not // need to explicitly configure this. options = append(options, storage.DisableSchema1Pulls) if config.HTTP.Host != "" { u, err := url.Parse(config.HTTP.Host) if err != nil { return nil, fmt.Errorf(`could not parse http "host" parameter: %w`, err) } app.httpHost = *u } // configure deletion if d, ok := config.Storage["delete"]; ok { e, ok := d["enabled"] if ok { if deleteEnabled, ok := e.(bool); ok && deleteEnabled { options = append(options, storage.EnableDelete) } } } // configure redirects var redirectDisabled bool if redirectConfig, ok := config.Storage["redirect"]; ok { v := redirectConfig["disable"] switch v := v.(type) { case bool: redirectDisabled = v case nil: // disable is not mandatory as we default to false, so do nothing if it doesn't exist default: return nil, fmt.Errorf("invalid type %T for 'storage.redirect.disable' (boolean)", v) } } if redirectDisabled { log.Info("backend redirection disabled") } else { l := log exceptions := config.Storage["redirect"]["exceptions"] if exceptions, ok := exceptions.([]any); ok && len(exceptions) > 0 { s := make([]string, len(exceptions)) for i, v := range exceptions { s[i] = fmt.Sprint(v) } l.WithField("exceptions", s) options = append(options, storage.EnableRedirectWithExceptions(s)) } else { options = append(options, storage.EnableRedirect) } // expiry delay delay := config.Storage["redirect"]["expirydelay"] var d time.Duration switch v := delay.(type) { case time.Duration: d = v case string: if d, err = time.ParseDuration(v); err != nil { return nil, fmt.Errorf("%q value for 'storage.redirect.expirydelay' is not a valid duration", v) } case nil: default: return nil, fmt.Errorf("invalid type %[1]T for 'storage.redirect.expirydelay' (duration)", delay) } if d > 0 { l = l.WithField("expiry_delay_s", d.Seconds()) options = append(options, storage.WithRedirectExpiryDelay(d)) } l.Info("storage backend redirection enabled") } if !config.Validation.Enabled { config.Validation.Enabled = !config.Validation.Disabled } // configure validation if config.Validation.Enabled { if len(config.Validation.Manifests.URLs.Allow) == 0 && len(config.Validation.Manifests.URLs.Deny) == 0 { // If Allow and Deny are empty, allow nothing. app.manifestURLs.Allow = regexp.MustCompile("^$") options = append(options, storage.ManifestURLsAllowRegexp(app.manifestURLs.Allow)) } else { // nolint: revive // max-control-nesting if len(config.Validation.Manifests.URLs.Allow) > 0 { for i, s := range config.Validation.Manifests.URLs.Allow { // Validate via compilation. if _, err := regexp.Compile(s); err != nil { return nil, fmt.Errorf("validation.manifests.urls.allow: %w", err) } // Wrap with non-capturing group. config.Validation.Manifests.URLs.Allow[i] = fmt.Sprintf("(?:%s)", s) } app.manifestURLs.Allow = regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Allow, "|")) options = append(options, storage.ManifestURLsAllowRegexp(app.manifestURLs.Allow)) } // nolint: revive // max-control-nesting if len(config.Validation.Manifests.URLs.Deny) > 0 { for i, s := range config.Validation.Manifests.URLs.Deny { // Validate via compilation. if _, err := regexp.Compile(s); err != nil { return nil, fmt.Errorf("validation.manifests.urls.deny: %w", err) } // Wrap with non-capturing group. config.Validation.Manifests.URLs.Deny[i] = fmt.Sprintf("(?:%s)", s) } app.manifestURLs.Deny = regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Deny, "|")) options = append(options, storage.ManifestURLsDenyRegexp(app.manifestURLs.Deny)) } } app.manifestRefLimit = config.Validation.Manifests.ReferenceLimit options = append(options, storage.ManifestReferenceLimit(app.manifestRefLimit)) app.manifestPayloadSizeLimit = config.Validation.Manifests.PayloadSizeLimit options = append(options, storage.ManifestPayloadSizeLimit(app.manifestPayloadSizeLimit)) } fsLocker := &storage.FilesystemInUseLocker{Driver: app.driver} dbLocker := &storage.DatabaseInUseLocker{Driver: app.driver} // Connect to the metadata database, if enabled. if config.Database.Enabled { // Temporary measure to enforce lock files while all the implementation is done // Seehttps://gitlab.com/gitlab-org/container-registry/-/issues/1335 if feature.EnforceLockfiles.Enabled() { fsLocked, err := fsLocker.IsLocked(ctx) if err != nil { log.WithError(err).Error("could not check if filesystem metadata is locked, see https://docs.gitlab.com/ee/administration/packages/container_registry_metadata_database.html") return nil, err } if fsLocked { return nil, ErrFilesystemInUse } } log.Info("using the metadata database") if config.GC.Disabled { log.Warn("garbage collection is disabled") } // Do not write or check for repository layer link metadata on the filesystem when the database is enabled. options = append(options, storage.UseDatabase) dsn := &datastore.DSN{ Host: config.Database.Host, Port: config.Database.Port, User: config.Database.User, Password: config.Database.Password, DBName: config.Database.DBName, SSLMode: config.Database.SSLMode, SSLCert: config.Database.SSLCert, SSLKey: config.Database.SSLKey, SSLRootCert: config.Database.SSLRootCert, ConnectTimeout: config.Database.ConnectTimeout, } dbOpts := []datastore.Option{ datastore.WithLogger(log.WithFields(logrus.Fields{"database": config.Database.DBName})), datastore.WithLogLevel(config.Log.Level), datastore.WithPreparedStatements(config.Database.PreparedStatements), datastore.WithPoolConfig(&datastore.PoolConfig{ MaxIdle: config.Database.Pool.MaxIdle, MaxOpen: config.Database.Pool.MaxOpen, MaxLifetime: config.Database.Pool.MaxLifetime, MaxIdleTime: config.Database.Pool.MaxIdleTime, }), } // nolint: revive // max-control-nesting if config.Database.LoadBalancing.Enabled { if err := app.configureRedisLoadBalancingCache(ctx, config); err != nil { return nil, err } dbOpts = append(dbOpts, datastore.WithLSNCache(datastore.NewCentralRepositoryCache(app.redisLBCache))) // service discovery takes precedence over fixed hosts if config.Database.LoadBalancing.Record != "" { nameserver := config.Database.LoadBalancing.Nameserver port := config.Database.LoadBalancing.Port record := config.Database.LoadBalancing.Record replicaCheckInterval := config.Database.LoadBalancing.ReplicaCheckInterval log.WithFields(dlog.Fields{ "nameserver": nameserver, "port": port, "record": record, "replica_check_interval_s": replicaCheckInterval.Seconds(), }).Info("enabling database load balancing with service discovery") resolver := datastore.NewDNSResolver(nameserver, port, record) dbOpts = append(dbOpts, datastore.WithServiceDiscovery(resolver), datastore.WithReplicaCheckInterval(replicaCheckInterval), ) } else if len(config.Database.LoadBalancing.Hosts) > 0 { hosts := config.Database.LoadBalancing.Hosts log.WithField("hosts", hosts).Info("enabling database load balancing with static hosts list") dbOpts = append(dbOpts, datastore.WithFixedHosts(hosts)) } } if config.HTTP.Debug.Prometheus.Enabled { dbOpts = append(dbOpts, datastore.WithMetricsCollection()) } db, err := datastore.NewDBLoadBalancer(ctx, dsn, dbOpts...) if err != nil { return nil, fmt.Errorf("failed to initialize database connections: %w", err) } if config.Database.LoadBalancing.Enabled && config.Database.LoadBalancing.ReplicaCheckInterval != 0 { startDBPoolRefresh(ctx, db) startDBLagCheck(ctx, db) } inRecovery, err := datastore.IsInRecovery(ctx, db.Primary()) if err != nil { log.WithError(err).Error("could not check database recovery status") return nil, err } if inRecovery { err = errors.New("the database is in read-only mode (in recovery)") log.WithError(err).Error("could not connect to database") log.Warn("if this is a Geo secondary instance, the registry must not point to the default replicated database. Please configure the registry to connect to a separate, writable database on the Geo secondary site.") log.Warn("if this is not a Geo secondary instance, the database must not be in read-only mode. Please ensure the database is correctly configured and set to read-write mode.") return nil, err } // Skip postdeployment migrations to prevent pending post deployment // migrations from preventing the registry from starting. m := premigrations.NewMigrator(db.Primary(), premigrations.SkipPostDeployment()) pending, err := m.HasPending() if err != nil { return nil, fmt.Errorf("failed to check database migrations status: %w", err) } if pending { return nil, fmt.Errorf("there are pending database migrations, use the 'registry database migrate' CLI " + "command to check and apply them") } app.db = db app.registerShutdownFunc( func(app *App, errCh chan error, l dlog.Logger) { l.Info("closing database connections") err := app.db.Close() if err != nil { err = fmt.Errorf("database shutdown: %w", err) } else { l.Info("database component has been shut down") } errCh <- err }, ) options = append(options, storage.Database(app.db)) // update online GC settings (if needed) in the background to avoid delaying the app start go func() { if err := updateOnlineGCSettings(app.Context, app.db.Primary(), config); err != nil { errortracking.Capture(err, errortracking.WithContext(app.Context), errortracking.WithStackTrace()) log.WithError(err).Error("failed to update online GC settings") } }() startOnlineGC(app.Context, app.db.Primary(), app.driver, config) // Now that we've started the database successfully, lock the filesystem // to signal that this object storage needs to be managed by the database. if feature.EnforceLockfiles.Enabled() { if err := dbLocker.Lock(app.Context); err != nil { log.WithError(err).Error("failed to mark filesystem for database only usage, see https://docs.gitlab.com/ee/administration/packages/container_registry_metadata_database.html") return nil, err } } if config.Database.BackgroundMigrations.Enabled && feature.BBMProcess.Enabled() { startBackgroundMigrations(app.Context, app.db.Primary(), config) } } // configure storage caches // It's possible that the metadata database will fill the same original need // as the blob descriptor cache (avoiding slow and/or expensive calls to // external storage) and enable the cache to be removed long term. // // For now, disabling concurrent use of the metadata database and cache // decreases the surface area which we are testing during database development. if cc, ok := config.Storage["cache"]; ok && !config.Database.Enabled { v, ok := cc["blobdescriptor"] if !ok { // Backwards compatible: "layerinfo" == "blobdescriptor" v = cc["layerinfo"] } switch v { case "redis": if app.redisBlobDesc == nil { return nil, fmt.Errorf("redis configuration required to use for layerinfo cache") } cacheProvider := rediscache.NewRedisBlobDescriptorCacheProvider(app.redisBlobDesc) options = append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) log.Info("using redis blob descriptor cache") case "inmemory": cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider() options = append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) log.Info("using inmemory blob descriptor cache") default: if v != "" { log.WithField("type", config.Storage["cache"]).Warn("unknown cache type, caching disabled") } } } else if ok && config.Database.Enabled { log.Warn("blob descriptor cache is not compatible with metadata database, caching disabled") } if app.registry == nil { // Initialize registry once with all options app.registry, err = storage.NewRegistry(app.Context, app.driver, options...) if err != nil { return nil, fmt.Errorf("could not create registry: %w", err) } } // Check filesystem lock file after registry is initialized if !config.Database.Enabled { if err := app.handleFilesystemLockFile(ctx, dbLocker, fsLocker); err != nil { return nil, err } log.Info("registry filesystem metadata in use") } app.registry, err = applyRegistryMiddleware(app, app.registry, config.Middleware["registry"]) if err != nil { return nil, err } authType := config.Auth.Type() if authType != "" && !strings.EqualFold(authType, "none") { accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters()) if err != nil { return nil, fmt.Errorf("unable to configure authorization (%s): %w", authType, err) } app.accessController = accessController log.WithField("auth_type", authType).Debug("configured access controller") } var ok bool app.repoRemover, ok = app.registry.(distribution.RepositoryRemover) if !ok { log.Warn("registry does not implement RepositoryRemover. Will not be able to delete repos and tags") } return app, nil } // handleFilesystemLockFile checks if the database lock file exists. // If it does not, it checks if the filesystem is in use and locks it // if there are existing repositories in the filesystem. func (app *App) handleFilesystemLockFile(ctx context.Context, dbLocker, fsLocker storage.Locker) error { if !feature.EnforceLockfiles.Enabled() { return nil } log := dcontext.GetLogger(app) dbLocked, err := dbLocker.IsLocked(ctx) if err != nil { log.WithError(err).Error("could not check if database metadata is locked, see https://docs.gitlab.com/ee/administration/packages/container_registry_metadata_database.html") return err } if dbLocked { return ErrDatabaseInUse } repositoryEnumerator, ok := app.registry.(distribution.RepositoryEnumerator) if !ok { return errors.New("error building repository enumerator") } // To prevent new installations from locking the filesystem we // try our best to enumerate repositories. Once we have found at // least one repository we know that the filesystem is in use. // See https://gitlab.com/gitlab-org/container-registry/-/issues/1523. err = repositoryEnumerator.Enumerate( ctx, func(string) error { return ErrFilesystemInUse }, ) if err != nil { switch { case errors.Is(err, ErrFilesystemInUse): // Lock the filesystem metadata to prevent // accidental start up of the registry in database mode // before an import has been completed. if err := fsLocker.Lock(app.Context); err != nil { log.WithError(err).Error("failed to mark filesystem only usage, continuing") return err } log.Info("there are existing repositories in the filesystem, locking filesystem") case errors.As(err, new(storagedriver.PathNotFoundError)): log.Debug("no filesystem path found, will not lock filesystem until there are repositories present") default: return fmt.Errorf("could not enumerate repositories for filesystem lock check: %w", err) } } return nil } var ( onlineGCUpdateJitterMaxSeconds int64 = 60 onlineGCUpdateTimeout = 2 * time.Second // for testing purposes (mocks) systemClock internal.Clock = clock.New() gcSettingsStoreConstructor = datastore.NewGCSettingsStore ) func updateOnlineGCSettings(ctx context.Context, db datastore.Queryer, config *configuration.Configuration) error { if !config.Database.Enabled || config.GC.Disabled || (config.GC.Blobs.Disabled && config.GC.Manifests.Disabled) { return nil } if config.GC.ReviewAfter == 0 { return nil } d := config.GC.ReviewAfter // -1 means no review delay, so set it to 0 here if d == -1 { d = 0 } log := dcontext.GetLogger(ctx) // execute DB update after a randomized jitter of up to 60 seconds to ease concurrency in clustered environments // nolint: gosec // G404: used only for jitter calculation r := rand.New(rand.NewChaCha8(testutil.SeedFromUnixNano(systemClock.Now().UnixNano()))) jitter := time.Duration(r.Int64N(onlineGCUpdateJitterMaxSeconds)) * time.Second log.WithField("jitter_s", jitter.Seconds()).Info("preparing to update online GC settings") systemClock.Sleep(jitter) // set a tight timeout to avoid delaying the app start for too long, another instance is likely to succeed start := systemClock.Now() ctx2, cancel := context.WithDeadline(ctx, start.Add(onlineGCUpdateTimeout)) defer cancel() // for now we use the same value for all events, so we simply update all rows in `gc_review_after_defaults` s := gcSettingsStoreConstructor(db) updated, err := s.UpdateAllReviewAfterDefaults(ctx2, d) if err != nil { return err } elapsed := systemClock.Since(start).Seconds() if updated { log.WithField("duration_s", elapsed).Info("online GC settings updated successfully") } else { log.WithField("duration_s", elapsed).Info("online GC settings are up to date") } return nil } func startOnlineGC(ctx context.Context, db *datastore.DB, storageDriver storagedriver.StorageDriver, config *configuration.Configuration) { if !config.Database.Enabled || config.GC.Disabled || (config.GC.Blobs.Disabled && config.GC.Manifests.Disabled) { return } l := dlog.GetLogger(dlog.WithContext(ctx)) aOpts := []gc.AgentOption{ gc.WithLogger(l), } if config.GC.NoIdleBackoff { aOpts = append(aOpts, gc.WithoutIdleBackoff()) } if config.GC.MaxBackoff > 0 { aOpts = append(aOpts, gc.WithMaxBackoff(config.GC.MaxBackoff)) } if config.GC.ErrorCooldownPeriod > 0 { aOpts = append(aOpts, gc.WithErrorCooldown(config.GC.ErrorCooldownPeriod)) } var agents []*gc.Agent if !config.GC.Blobs.Disabled { bwOpts := []worker.BlobWorkerOption{ worker.WithBlobLogger(l), } if config.GC.TransactionTimeout > 0 { bwOpts = append(bwOpts, worker.WithBlobTxTimeout(config.GC.TransactionTimeout)) } if config.GC.Blobs.StorageTimeout > 0 { bwOpts = append(bwOpts, worker.WithBlobStorageTimeout(config.GC.Blobs.StorageTimeout)) } bw := worker.NewBlobWorker(db, storageDriver, bwOpts...) baOpts := aOpts if config.GC.Blobs.Interval > 0 { baOpts = append(baOpts, gc.WithInitialInterval(config.GC.Blobs.Interval)) } ba := gc.NewAgent(bw, baOpts...) agents = append(agents, ba) } if !config.GC.Manifests.Disabled { mwOpts := []worker.ManifestWorkerOption{ worker.WithManifestLogger(l), } if config.GC.TransactionTimeout > 0 { mwOpts = append(mwOpts, worker.WithManifestTxTimeout(config.GC.TransactionTimeout)) } mw := worker.NewManifestWorker(db, mwOpts...) maOpts := aOpts if config.GC.Manifests.Interval > 0 { maOpts = append(maOpts, gc.WithInitialInterval(config.GC.Manifests.Interval)) } ma := gc.NewAgent(mw, maOpts...) agents = append(agents, ma) } for _, a := range agents { go func(a *gc.Agent) { // This function can only end in two situations: panic or context cancellation. If a panic occurs we should // log, report to Sentry and then re-panic, as the instance would be in an inconsistent/unknown state. In // case of context cancellation, the app is shutting down, so there is nothing to worry about. defer func() { if err := recover(); err != nil { l.WithFields(dlog.Fields{"error": err}).Error("online GC agent stopped with panic") sentry.CurrentHub().Recover(err) sentry.Flush(5 * time.Second) panic(err) } }() if err := a.Start(ctx); err != nil { if errors.Is(err, context.Canceled) { // leaving this here for now for additional confidence and improved observability l.Warn("shutting down online GC agent due due to context cancellation") } else { // this should never happen, but leaving it here for future proofing against bugs within Agent.Start errortracking.Capture(fmt.Errorf("online GC agent stopped with error: %w", err), errortracking.WithStackTrace()) l.WithError(err).Error("online GC agent stopped") } } }(a) } } const dlbPeriodicTaskJitterMaxSeconds = 10 // startDBLoadBalancerPeriodicTask starts a goroutine to periodically execute a database load balancer task. // It handles jittered startup, error recovery, and appropriate logging. func startDBLoadBalancerPeriodicTask(ctx context.Context, taskName string, taskFn func(context.Context) error) { l := dlog.GetLogger(dlog.WithContext(ctx)) // delay startup using a randomized jitter to ease concurrency in clustered environments // nolint: gosec // G404: used only for jitter calculation r := rand.New(rand.NewChaCha8(testutil.SeedFromUnixNano(systemClock.Now().UnixNano()))) jitter := time.Duration(r.Int64N(dlbPeriodicTaskJitterMaxSeconds)) * time.Second l.WithFields(dlog.Fields{"jitter_s": jitter.Seconds()}). Info(fmt.Sprintf("preparing to start database load balancing %s", taskName)) go func() { systemClock.Sleep(jitter) // This function can only end in three situations: 1) the task is disabled and therefore there is // nothing left to do (no error) 2) context cancellation 3) panic. If a panic occurs we should log, report to // Sentry and then re-panic, as the instance would be in an inconsistent/unknown state. In case of context // cancellation, the app is shutting down, so there is nothing to worry about. defer func() { if err := recover(); err != nil { l.WithFields(dlog.Fields{"error": err}).Error(fmt.Sprintf("database load balancing %s stopped with panic", taskName)) sentry.CurrentHub().Recover(err) sentry.Flush(5 * time.Second) panic(err) } }() if err := taskFn(ctx); err != nil { if errors.Is(err, context.Canceled) { // leaving this here for now for additional confidence and improved observability l.Warn(fmt.Sprintf("database load balancing %s stopped due to context cancellation", taskName)) } else { // this should never happen, but leaving it here for future proofing against bugs e := fmt.Errorf("database load balancing %s stopped with error: %w", taskName, err) errortracking.Capture(e, errortracking.WithStackTrace()) l.WithError(err).Error(fmt.Sprintf("database load balancing %s stopped with error", taskName)) } } }() } // startDBPoolRefresh starts a goroutine to periodically refresh the database replica pool. func startDBPoolRefresh(ctx context.Context, lb datastore.LoadBalancer) { startDBLoadBalancerPeriodicTask(ctx, "pool refresh", lb.StartPoolRefresh) } // startDBLagCheck starts a goroutine to periodically check and track replication lag for all replicas. func startDBLagCheck(ctx context.Context, lb datastore.LoadBalancer) { startDBLoadBalancerPeriodicTask(ctx, "lag check", lb.StartLagCheck) } // RegisterHealthChecks is an awful hack to defer health check registration // control to callers. This should only ever be called once per registry // process, typically in a main function. The correct way would be register // health checks outside of app, since multiple apps may exist in the same // process. Because the configuration and app are tightly coupled, // implementing this properly will require a refactor. This method may panic // if called twice in the same process. func (app *App) RegisterHealthChecks(healthRegistries ...*health.Registry) error { logger := dcontext.GetLogger(app) if len(healthRegistries) > 1 { return fmt.Errorf("RegisterHealthChecks called with more than one registry") } // Allow for dependency injection: if len(healthRegistries) > 0 { app.healthRegistry = healthRegistries[0] } else { app.healthRegistry = health.DefaultRegistry } app.registerShutdownFunc( func(app *App, errCh chan error, l dlog.Logger) { l.Info("closing healthchecks registry") err := app.healthRegistry.Shutdown() if err != nil { err = fmt.Errorf("healthchecks registry shutdown: %w", err) } else { l.Info("healthchecks registry has been shut down") } errCh <- err }, ) if app.Config.Health.StorageDriver.Enabled { interval := app.Config.Health.StorageDriver.Interval if interval == 0 { interval = defaultCheckInterval } storageDriverCheck := func() error { _, err := app.driver.Stat(app, "/") if errors.As(err, new(storagedriver.PathNotFoundError)) { err = nil // pass this through, backend is responding, but this path doesn't exist. } if err != nil { logger.Errorf("storage driver health check: %v", err) } return err } logger.WithFields( dlog.Fields{ "threshold": app.Config.Health.StorageDriver.Threshold, "interval_s": interval.Seconds(), }, ).Info("configuring storage health check") if app.Config.Health.StorageDriver.Threshold != 0 { app.healthRegistry.RegisterPeriodicThresholdFunc("storagedriver_"+app.Config.Storage.Type(), interval, app.Config.Health.StorageDriver.Threshold, storageDriverCheck) } else { app.healthRegistry.RegisterPeriodicFunc("storagedriver_"+app.Config.Storage.Type(), interval, storageDriverCheck) } } if app.Config.Health.Database.Enabled { if !app.Config.Database.Enabled { logger.Warn("ignoring database health checks settings as metadata database is not enabled") } else { interval := app.Config.Health.Database.Interval if interval == 0 { interval = defaultCheckInterval } timeout := app.Config.Health.Database.Timeout if timeout == 0 { timeout = defaultDBCheckTimeout } check := checks.DBChecker(app.Context, timeout, app.db) logger.WithFields( dlog.Fields{ "timeout": timeout.String(), "interval_s": interval.Seconds(), }, ).Info("configuring database health check") if app.Config.Health.Database.Threshold != 0 { app.healthRegistry.RegisterPeriodicThresholdFunc("database_connection", interval, app.Config.Health.Database.Threshold, check) } else { app.healthRegistry.RegisterPeriodicFunc("database_connection", interval, check) } } } for _, fileChecker := range app.Config.Health.FileCheckers { interval := fileChecker.Interval if interval == 0 { interval = defaultCheckInterval } dcontext.GetLogger(app).Infof("configuring file health check path=%s, interval=%d", fileChecker.File, interval/time.Second) app.healthRegistry.Register(fileChecker.File, health.PeriodicChecker(checks.FileChecker(fileChecker.File), interval)) } for _, httpChecker := range app.Config.Health.HTTPCheckers { interval := httpChecker.Interval if interval == 0 { interval = defaultCheckInterval } statusCode := httpChecker.StatusCode if statusCode == 0 { statusCode = 200 } checker := checks.HTTPChecker(httpChecker.URI, statusCode, httpChecker.Timeout, httpChecker.Headers) if httpChecker.Threshold != 0 { dcontext.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d, threshold=%d", httpChecker.URI, interval/time.Second, httpChecker.Threshold) app.healthRegistry.Register(httpChecker.URI, health.PeriodicThresholdChecker(checker, interval, httpChecker.Threshold)) } else { dcontext.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d", httpChecker.URI, interval/time.Second) app.healthRegistry.Register(httpChecker.URI, health.PeriodicChecker(checker, interval)) } } for _, tcpChecker := range app.Config.Health.TCPCheckers { interval := tcpChecker.Interval if interval == 0 { interval = defaultCheckInterval } checker := checks.TCPChecker(tcpChecker.Addr, tcpChecker.Timeout) if tcpChecker.Threshold != 0 { dcontext.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d, threshold=%d", tcpChecker.Addr, interval/time.Second, tcpChecker.Threshold) app.healthRegistry.Register(tcpChecker.Addr, health.PeriodicThresholdChecker(checker, interval, tcpChecker.Threshold)) } else { dcontext.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d", tcpChecker.Addr, interval/time.Second) app.healthRegistry.Register(tcpChecker.Addr, health.PeriodicChecker(checker, interval)) } } return nil } var routeMetricsMiddleware = metricskit.NewHandlerFactory( metricskit.WithNamespace(prometheus.NamespacePrefix), metricskit.WithLabels("route"), // Keeping the same buckets used before LabKit, as defined in // https://github.com/docker/go-metrics/blob/b619b3592b65de4f087d9f16863a7e6ff905973c/handler.go#L31:L32 metricskit.WithRequestDurationBuckets([]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 60}), metricskit.WithByteSizeBuckets(promclient.ExponentialBuckets(1024, 2, 22)), // 1K to 4G ) // register a handler with the application, by route name. The handler will be // passed through the application filters and context will be constructed at // request time. func (app *App) registerDistribution(routeName string, dispatch dispatchFunc) { handler := app.dispatcher(dispatch) // Chain the handler with prometheus instrumented handler if app.Config.HTTP.Debug.Prometheus.Enabled { handler = routeMetricsMiddleware( handler, metricskit.WithLabelValues(map[string]string{"route": v2.RoutePath(routeName)}), ) } app.router.distribution.GetRoute(routeName).Handler(handler) } func (app *App) registerGitlab(route v1.Route, dispatch dispatchFunc) { handler := app.dispatcherGitlab(dispatch) // Chain the handler with prometheus instrumented handler if app.Config.HTTP.Debug.Prometheus.Enabled { handler = routeMetricsMiddleware( handler, metricskit.WithLabelValues(map[string]string{"route": route.ID}), ) } app.router.gitlab.GetRoute(route.Name).Handler(handler) } // configureEvents prepares the event sink for action. func (app *App) configureEvents(registryConfig *configuration.Configuration) { // Configure all of the endpoint sinks. var sinks []notifications.Sink for _, endpoint := range registryConfig.Notifications.Endpoints { if endpoint.Disabled { dcontext.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name) continue } dcontext.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ Timeout: endpoint.Timeout, // nolint: staticcheck // needs more thorough investigation and fix Threshold: endpoint.Threshold, MaxRetries: endpoint.MaxRetries, Backoff: endpoint.Backoff, Headers: endpoint.Headers, IgnoredMediaTypes: endpoint.IgnoredMediaTypes, Ignore: endpoint.Ignore, QueuePurgeTimeout: endpoint.QueuePurgeTimeout, }) sinks = append(sinks, endpoint) } // TODO: replace broadcaster with a new worker that will consume events from the queue // https://gitlab.com/gitlab-org/container-registry/-/issues/765 app.events.sink = notifications.NewBroadcaster( registryConfig.Notifications.FanoutTimeout, sinks..., ) app.registerShutdownFunc( func(app *App, errCh chan error, l dlog.Logger) { l.Info("closing events notification sink") err := app.events.sink.Close() if err != nil { err = fmt.Errorf("events notification sink shutdown: %w", err) } else { l.Info("events notification sink has been shut down") } errCh <- err }, ) // Populate registry event source hostname, err := os.Hostname() if err != nil { hostname = registryConfig.HTTP.Addr } else { // try to pick the port off the config _, port, err := net.SplitHostPort(registryConfig.HTTP.Addr) if err == nil { hostname = net.JoinHostPort(hostname, port) } } app.events.source = notifications.SourceRecord{ Addr: hostname, InstanceID: dcontext.GetStringValue(app, "instance.id"), } } func configureRedisClient(ctx context.Context, config configuration.RedisCommon, metricsEnabled bool, instanceName string) (redis.UniversalClient, error) { opts := &redis.UniversalOptions{ Addrs: strings.Split(config.Addr, ","), DB: config.DB, Username: config.Username, Password: config.Password, DialTimeout: config.DialTimeout, ReadTimeout: config.ReadTimeout, WriteTimeout: config.WriteTimeout, PoolSize: config.Pool.Size, ConnMaxLifetime: config.Pool.MaxLifetime, MasterName: config.MainName, SentinelUsername: config.SentinelUsername, SentinelPassword: config.SentinelPassword, } if config.TLS.Enabled { opts.TLSConfig = &tls.Config{ // nolint: gosec // used for development purposes only InsecureSkipVerify: config.TLS.Insecure, } } if config.Pool.IdleTimeout > 0 { opts.ConnMaxIdleTime = config.Pool.IdleTimeout } // redis.NewUniversalClient will take care of returning the appropriate client type (single, cluster or sentinel) // depending on the configuration options. See https://pkg.go.dev/github.com/go-redis/redis/v9#NewUniversalClient. client := redis.NewUniversalClient(opts) if metricsEnabled { redismetrics.InstrumentClient( client, redismetrics.WithInstanceName(instanceName), redismetrics.WithMaxConns(opts.PoolSize), ) } // Ensure the client is correctly configured and the server is reachable. We use a new local context here with a // tight timeout to avoid blocking the application start for too long. pingCtx, cancel := context.WithTimeout(ctx, redisPingTimeout) defer cancel() if cmd := client.Ping(pingCtx); cmd.Err() != nil { return nil, cmd.Err() } return client, nil } func (app *App) configureRedisLoadBalancingCache(ctx context.Context, config *configuration.Configuration) error { if !config.Redis.LoadBalancing.Enabled { return nil } client, err := configureRedisClient(ctx, config.Redis.LoadBalancing, config.HTTP.Debug.Prometheus.Enabled, "loadbalancing") if err != nil { return fmt.Errorf("failed to configure Redis for load balancing: %w", err) } app.redisLBCache = iredis.NewCache(client, iredis.WithDefaultTTL(redisCacheTTL)) dlog.GetLogger(dlog.WithContext(app.Context)).Info("redis configured successfully for load balancing") return nil } func (app *App) configureRedisRateLimiter(ctx context.Context, config *configuration.Configuration) error { if !config.Redis.RateLimiter.Enabled { if config.RateLimiter.Enabled { dlog.GetLogger(dlog.WithContext(app.Context)). Warn(`Redis is disabled but the rate-limiter is enabled. This will result in a no-op configuration.`, ) } return nil } if !config.RateLimiter.Enabled { dlog.GetLogger(dlog.WithContext(app.Context)). Warn(`Redis is enabled but the rate-limiter is disabled. This will result in a no-op configuration.`, ) return nil } redisClient, err := configureRedisClient(ctx, config.Redis.RateLimiter, config.HTTP.Debug.Prometheus.Enabled, "ratelimiting") if err != nil { return fmt.Errorf("failed to configure Redis for rate limiting: %w", err) } err = app.configureRateLimiters(redisClient, &config.RateLimiter) if err != nil { return fmt.Errorf("failed to configure rate limiting: %w", err) } dlog.GetLogger(dlog.WithContext(app.Context)).Info("redis configured successfully for rate limiting") return nil } func (app *App) configureRedisCache(ctx context.Context, config *configuration.Configuration) error { if !config.Redis.Cache.Enabled { return nil } client, err := configureRedisClient(ctx, config.Redis.Cache, config.HTTP.Debug.Prometheus.Enabled, "cache") if err != nil { return fmt.Errorf("failed to configure Redis for caching: %w", err) } app.redisCache = iredis.NewCache(client, iredis.WithDefaultTTL(redisCacheTTL)) dlog.GetLogger(dlog.WithContext(app.Context)).Info("redis configured successfully for caching") return nil } func (app *App) configureRedisBlobDesc(ctx context.Context, config *configuration.Configuration) error { if config.Redis.Addr == "" { return nil } client, err := configureRedisClient(ctx, config.Redis.RedisCommon, false, "") if err != nil { return fmt.Errorf("failed to configure Redis for blob descriptor cache: %w", err) } app.redisBlobDesc = client // setup expvar registry := expvar.Get("registry") if registry == nil { registry = expvar.NewMap("registry") } registry.(*expvar.Map).Set("redis", expvar.Func(func() any { poolStats := app.redisBlobDesc.PoolStats() return map[string]any{ "Config": config.Redis, "Active": poolStats.TotalConns - poolStats.IdleConns, } })) dlog.GetLogger(dlog.WithContext(app.Context)).Info("redis configured successfully for blob descriptor caching") return nil } // configureSecret creates a random secret if a secret wasn't included in the // configuration. func (app *App) configureSecret(registryConfig *configuration.Configuration) error { if registryConfig.HTTP.Secret == "" { var secretBytes [randomSecretSize]byte if _, err := cryptorand.Read(secretBytes[:]); err != nil { return fmt.Errorf("could not generate random bytes for HTTP secret: %w", err) } registryConfig.HTTP.Secret = string(secretBytes[:]) dcontext.GetLogger(app).Warn("No HTTP secret provided - generated random secret. This may cause problems with uploads if multiple registries are behind a load-balancer. To provide a shared secret, fill in http.secret in the configuration file or set the REGISTRY_HTTP_SECRET environment variable.") } return nil } func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() // ensure that request body is always closed. // Prepare the context with our own little decorations. ctx := r.Context() ctx = dcontext.WithRequest(ctx, r) ctx, w = dcontext.WithResponseWriter(ctx, w) // NOTE(prozlach): It is very important to pass to // dcontext.GetLogger the context of the app and not the // request context, as otherwise a new logger will be created instead of // chaining the one that was created during application start and according // to the configuration passed by the user. ctx = dcontext.WithLogger( ctx, dcontext.GetLogger(app.Context). WithField(correlation.FieldName, dcontext.GetRequestCorrelationID(ctx)), ) r = r.WithContext(ctx) if app.Config.Log.AccessLog.Disabled { defer func() { status, ok := ctx.Value("http.response.status").(int) if ok && status >= 200 && status <= 399 { dcontext.GetResponseLogger(r.Context()).Infof("response completed") } }() } app.router.ServeHTTP(w, r) } // metaRouter determines which router is appropreate for a given route. This // is a temporary measure while we consolidate all routes into a single chi router. // See: https://gitlab.com/groups/gitlab-org/-/epics/9467 type metaRouter struct { distribution *mux.Router // main application router, configured with dispatchers gitlab *mux.Router // gitlab specific router v1RouteRegex *regexp.Regexp } // initMetaRouter constructs a new metaRouter and attaches it to the app. func (app *App) initMetaRouter() error { app.router = &metaRouter{ distribution: v2.RouterWithPrefix(app.Config.HTTP.Prefix), gitlab: v1.RouterWithPrefix(app.Config.HTTP.Prefix), } // Register middleware. app.router.distribution.Use(app.gorillaLogMiddleware) app.router.distribution.Use(distributionAPIVersionMiddleware) app.router.gitlab.Use(app.gorillaLogMiddleware) if app.Config.RateLimiter.Enabled && app.Config.Redis.RateLimiter.Enabled { app.router.distribution.Use(app.rateLimiterMiddleware) app.router.gitlab.Use(app.rateLimiterMiddleware) } if app.Config.Database.Enabled && app.Config.Database.LoadBalancing.Enabled { app.router.distribution.Use(app.recordLSNMiddleware) app.router.gitlab.Use(app.recordLSNMiddleware) } // Register the handler dispatchers. app.registerDistribution(v2.RouteNameBase, func(_ *Context, _ *http.Request) http.Handler { return distributionAPIBase(app.Config.Database.Enabled) }) app.registerDistribution(v2.RouteNameManifest, manifestDispatcher) app.registerDistribution(v2.RouteNameCatalog, catalogDispatcher) app.registerDistribution(v2.RouteNameTags, tagsDispatcher) app.registerDistribution(v2.RouteNameBlob, blobDispatcher) app.registerDistribution(v2.RouteNameBlobUpload, blobUploadDispatcher) app.registerDistribution(v2.RouteNameBlobUploadChunk, blobUploadDispatcher) // Register Gitlab handlers dispatchers. h := dbAssertionhandler{dbEnabled: app.Config.Database.Enabled} app.registerGitlab(v1.Base, h.wrap(func(*Context, *http.Request) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { apiBase(w, r) }) })) app.registerGitlab(v1.RepositoryTags, h.wrap(repositoryTagsDispatcher)) app.registerGitlab(v1.RepositoryTagDetail, h.wrap(repositoryTagDetailsDispatcher)) app.registerGitlab(v1.Repositories, h.wrap(repositoryDispatcher)) app.registerGitlab(v1.SubRepositories, h.wrap(subRepositoriesDispatcher)) app.registerGitlab(v1.Statistics, h.wrap(statisticsDispatcher)) var err error v1PathWithPrefix := fmt.Sprintf("^%s%s.*", strings.TrimSuffix(app.Config.HTTP.Prefix, "/"), v1.Base.Path) app.router.v1RouteRegex, err = regexp.Compile(v1PathWithPrefix) if err != nil { return fmt.Errorf("compiling v1 route prefix: %w", err) } return nil } // ServeHTTP delegates urls to the appropriate router. func (m *metaRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) { if m.v1RouteRegex.MatchString(r.URL.Path) { m.gitlab.ServeHTTP(w, r) return } m.distribution.ServeHTTP(w, r) } // Temporary middleware to add router and http configuration information // while we switch over to a new router away from gorilla/mux. See: // https://gitlab.com/groups/gitlab-org/-/epics/9467 func (app *App) gorillaLogMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := app.context(w, r) c := app.Config.HTTP dlog.GetLogger(dlog.WithContext(ctx)).WithFields(dlog.Fields{ "router": "gorilla/mux", "method": r.Method, "path": r.URL.Path, "config_http_host": c.Host, "config_http_addr": c.Addr, "config_http_net": c.Net, "config_http_prefix": c.Prefix, "config_http_relative_urls": c.RelativeURLs, }).Info("router info") next.ServeHTTP(w, r) }) } type dbAssertionhandler struct{ dbEnabled bool } func (h dbAssertionhandler) wrap(child func(ctx *Context, r *http.Request) http.Handler) func(ctx *Context, r *http.Request) http.Handler { // Return a 404, signaling that the database is disabled and GitLab v1 API features are not available. if !h.dbEnabled { return func(*Context, *http.Request) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Gitlab-Container-Registry-Version", strings.TrimPrefix(version.Version, "v")) w.WriteHeader(http.StatusNotFound) }) } } return child } type statusRecordingResponseWriter struct { http.ResponseWriter statusCode int } func (rw *statusRecordingResponseWriter) WriteHeader(code int) { rw.statusCode = code rw.ResponseWriter.WriteHeader(code) } func newStatusRecordingResponseWriter(w http.ResponseWriter) *statusRecordingResponseWriter { // Default to 200 status code (for cases where WriteHeader may not be explicitly called) return &statusRecordingResponseWriter{w, http.StatusOK} } func (app *App) recordLSNMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Wrap the original ResponseWriter to capture status code srw := newStatusRecordingResponseWriter(w) // Call the next handler next.ServeHTTP(srw, r) // Only record primary LSN if 1) the request targets a repository 2) it's a write request 3) it succeeded if !app.nameRequired(r) { return } if r.Method != http.MethodPut && r.Method != http.MethodPost && r.Method != http.MethodPatch && r.Method != http.MethodDelete { return } if srw.statusCode < 200 || srw.statusCode >= 400 { return } // Get repository path from request context ctx := app.context(w, r) repo, err := app.repositoryFromContext(ctx, w) if err != nil { dcontext.GetLogger(ctx).WithError(err). Error("failed to get repository from request context to record primary LSN") return } if err := app.db.RecordLSN(r.Context(), &models.Repository{Path: repo.Named().Name()}); err != nil { dcontext.GetLogger(ctx).WithError(err). Error("failed to record primary LSN after successful repository write") } }) } // distributionAPIVersionMiddleware sets a header with the Docker Distribution // API Version for distribution API responses. func distributionAPIVersionMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") next.ServeHTTP(w, r) }) } // dispatchFunc takes a context and request and returns a constructed handler // for the route. The dispatcher will use this to dynamically create request // specific handlers for each endpoint without creating a new router for each // request. type dispatchFunc func(ctx *Context, r *http.Request) http.Handler // dispatcher returns a handler that constructs a request specific context and // handler, using the dispatch factory function. func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { for headerName, headerValues := range app.Config.HTTP.Headers { for _, value := range headerValues { w.Header().Add(headerName, value) } } ctx := app.context(w, r) // attach CF-RayID header to context and pass the key to the logger ctx.Context = dcontext.WithCFRayID(ctx.Context, r) ctx.Context = dcontext.WithLogger(ctx.Context, dcontext.GetLogger(ctx.Context, dcontext.CFRayIDLogKey)) if err := app.authorized(w, r, ctx); err != nil { var authErr auth.Challenge if !errors.As(err, &authErr) { dcontext.GetLogger(ctx).WithError(err).Warn("error authorizing context") } return } // Add extra context to request logging ctx.Context = dcontext.WithLogger(ctx.Context, dcontext.GetLogger(ctx.Context, auth.UserNameKey, auth.UserTypeKey, auth.ResourceProjectPathsKey)) // sync up context on the request. r = r.WithContext(ctx) // get all metadata either from the database or from the filesystem if app.Config.Database.Enabled { ctx.useDatabase = true } if app.nameRequired(r) { bp, ok := app.registry.Blobs().(distribution.BlobProvider) if !ok { err := fmt.Errorf("unable to convert BlobEnumerator into BlobProvider") dcontext.GetLogger(ctx).Error(err) ctx.Errors = append(ctx.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) } ctx.blobProvider = bp repository, err := app.repositoryFromContext(ctx, w) if err != nil { return } ctx.queueBridge = app.queueBridge(ctx, r) // assign and decorate the authorized repository with an event bridge. ctx.Repository, ctx.RepositoryRemover = notifications.Listen( repository, ctx.App.repoRemover, app.eventBridge(ctx, r), ctx.useDatabase) ctx.Repository, err = applyRepoMiddleware(app, ctx.Repository, app.Config.Middleware["repository"]) if err != nil { dcontext.GetLogger(ctx).Errorf("error initializing repository middleware: %v", err) ctx.Errors = append(ctx.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) if err := errcode.ServeJSON(w, ctx.Errors); err != nil { dcontext.GetLogger(ctx).Errorf("error serving error json: %v (from %v)", err, ctx.Errors) } return } } if ctx.useDatabase { if app.redisCache != nil { ctx.repoCache = datastore.NewCentralRepositoryCache(app.redisCache) } else { ctx.repoCache = datastore.NewSingleRepositoryCache() } } dispatch(ctx, r).ServeHTTP(w, r) // Automated error response handling here. Handlers may return their // own errors if they need different behavior (such as range errors // for layer upload). if ctx.Errors.Len() > 0 { if err := errcode.ServeJSON(w, ctx.Errors); err != nil { dcontext.GetLogger(ctx).Errorf("error serving error json: %v (from %v)", err, ctx.Errors) } app.logError(ctx, r, ctx.Errors) } }) } func (app *App) dispatcherGitlab(dispatch dispatchFunc) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := &Context{ App: app, Context: dcontext.WithVars(r.Context(), r), } // attach CF-RayID header to context and pass the key to the logger ctx.Context = dcontext.WithCFRayID(ctx.Context, r) ctx.Context = dcontext.WithLogger(ctx.Context, dcontext.GetLogger(ctx.Context, dcontext.CFRayIDLogKey)) if err := app.authorized(w, r, ctx); err != nil { var authErr auth.Challenge if !errors.As(err, &authErr) { dcontext.GetLogger(ctx).WithError(err).Warn("error authorizing context") } return } // Add extra context to request logging ctx.Context = dcontext.WithLogger(ctx.Context, dcontext.GetLogger(ctx.Context, auth.UserNameKey, auth.UserTypeKey, auth.ResourceProjectPathsKey)) // sync up context on the request. r = r.WithContext(ctx) if app.nameRequired(r) { repository, err := app.repositoryFromContext(ctx, w) if err != nil { return } ctx.Repository = repository ctx.queueBridge = app.queueBridge(ctx, r) } dispatch(ctx, r).ServeHTTP(w, r) // Automated error response handling here. Handlers may return their // own errors if they need different behavior (such as range errors // for layer upload). if ctx.Errors.Len() > 0 { if err := errcode.ServeJSON(w, ctx.Errors); err != nil { dcontext.GetLogger(ctx).Errorf("error serving error json: %v (from %v)", err, ctx.Errors) } app.logError(ctx, r, ctx.Errors) } }) } func (*App) logError(ctx context.Context, r *http.Request, errs errcode.Errors) { for _, e := range errs { var code errcode.ErrorCode var message, detail string switch ex := e.(type) { case errcode.Error: code = ex.Code message = ex.Message detail = fmt.Sprintf("%+v", ex.Detail) case errcode.ErrorCode: code = ex message = ex.Message() default: // just normal go 'error' code = errcode.ErrorCodeUnknown message = ex.Error() } // inject request specifc fields into the error logs l := dcontext.GetMappedRequestLogger(ctx).WithField("code", code.String()) if detail != "" { l = l.WithField("detail", detail) } // HEAD requests check for manifests and blobs that are often not present as // part of normal request flow, so logging these errors is superfluous. if r.Method == http.MethodHead && (code == v2.ErrorCodeBlobUnknown || code == v2.ErrorCodeManifestUnknown) { l.WithError(e).Debug(message) } else { l.WithError(e).Error(message) } // only report 500 errors to Sentry if code == errcode.ErrorCodeUnknown { // Encode detail in error message so that it shows up in Sentry. This is a hack until we refactor error // handling across the whole application to enforce consistent behavior and formatting. // see https://gitlab.com/gitlab-org/container-registry/-/issues/198 detailSuffix := "" if detail != "" { detailSuffix = fmt.Sprintf(": %s", detail) } err := errcode.ErrorCodeUnknown.WithMessage(fmt.Sprintf("%s%s", message, detailSuffix)) errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithRequest(r), errortracking.WithStackTrace()) } } } // context constructs the context object for the application. This only be // called once per request. func (app *App) context(_ http.ResponseWriter, r *http.Request) *Context { ctx := r.Context() ctx = dcontext.WithVars(ctx, r) name := dcontext.GetStringValue(ctx, "vars.name") // nolint: staticcheck,revive // SA1029: should not use built-in type string as key for value ctx = context.WithValue(ctx, "root_repo", strings.Split(name, "/")[0]) ctx = dcontext.WithLogger(ctx, dcontext.GetLogger(ctx, "root_repo", "vars.name", "vars.reference", "vars.digest", "vars.uuid")) appContext := &Context{ App: app, Context: ctx, } if app.httpHost.Scheme != "" && app.httpHost.Host != "" { // A "host" item in the configuration takes precedence over // X-Forwarded-Proto and X-Forwarded-Host headers, and the // hostname in the request. appContext.urlBuilder = urls.NewBuilder(&app.httpHost, false) } else { appContext.urlBuilder = urls.NewBuilderFromRequest(r, app.Config.HTTP.RelativeURLs) } return appContext } // authorized checks if the request can proceed with access to the requested // repository. If it succeeds, the context may access the requested // repository. An error will be returned if access is not available. func (app *App) authorized(w http.ResponseWriter, r *http.Request, appContext *Context) error { dcontext.GetLogger(appContext).Debug("authorizing request") repo := getName(appContext) if app.accessController == nil { return nil // access controller is not enabled. } var ( accessRecords []auth.Access err error errCode error ) if repo != "" { accessRecords = appendAccessRecords(accessRecords, r.Method, repo) accessRecords = appendRepositoryDetailsAccessRecords(accessRecords, r, repo) accessRecords, err, errCode = appendRepositoryNamespaceAccessRecords(accessRecords, r) if err != nil { if err := errcode.ServeJSON(w, errCode); err != nil { dcontext.GetLogger(appContext).Errorf("error serving error json: %v (from %v)", err, appContext.Errors) } return fmt.Errorf("error creating access records: %w", err) } if fromRepo := r.FormValue("from"); fromRepo != "" { // mounting a blob from one repository to another requires pull (GET) // access to the source repository. accessRecords = appendAccessRecords(accessRecords, http.MethodGet, fromRepo) } } else { // Only allow the name not to be set on the base route. if app.nameRequired(r) { // For this to be properly secured, repo must always be set for a // resource that may make a modification. The only condition under // which name is not set and we still allow access is when the // base route is accessed. This section prevents us from making // that mistake elsewhere in the code, allowing any operation to // proceed. if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized); err != nil { dcontext.GetLogger(appContext).Errorf("error serving error json: %v (from %v)", err, appContext.Errors) } return fmt.Errorf("forbidden: no repository name") } accessRecords = appendCatalogAccessRecord(accessRecords, r) accessRecords = appendStatisticsAccessRecord(accessRecords, r) } ctx, err := app.accessController.Authorized(appContext.Context, accessRecords...) if err != nil { switch err := err.(type) { case auth.Challenge: // Add the appropriate WWW-Auth header err.SetHeaders(r, w) if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized.WithDetail(accessRecords)); err != nil { dcontext.GetLogger(appContext).Errorf("error serving error json: %v (from %v)", err, appContext.Errors) } default: // This condition is a potential security problem either in // the configuration or whatever is backing the access // controller. Just return a bad request with no information // to avoid exposure. The request should not proceed. dcontext.GetLogger(appContext).Errorf("error checking authorization: %v", err) w.WriteHeader(http.StatusBadRequest) } return err } dcontext.GetLogger(ctx, auth.UserNameKey, auth.UserTypeKey, auth.ResourceProjectPathsKey).Info("authorized request") appContext.Context = ctx return nil } // eventBridge returns a bridge for the current request, configured with the // correct actor and source. func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener { actor := notifications.ActorRecord{ Name: getUserName(ctx, r), UserType: getUserType(ctx), } request := notifications.NewRequestRecord(dcontext.GetRequestID(ctx), r) return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink, app.Config.Notifications.EventConfig.IncludeReferences) } func (app *App) queueBridge(ctx *Context, r *http.Request) *notifications.QueueBridge { actor := notifications.ActorRecord{ Name: getUserName(ctx, r), UserType: getUserType(ctx), User: getUserJWT(ctx), } request := notifications.NewRequestRecord(dcontext.GetRequestID(ctx), r) return notifications.NewQueueBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink, app.Config.Notifications.EventConfig.IncludeReferences) } // nameRequired returns true if the route requires a name. func (*App) nameRequired(r *http.Request) bool { route := mux.CurrentRoute(r) if route == nil { return true } routeName := route.GetName() switch routeName { case v2.RouteNameBase, v2.RouteNameCatalog, v1.Base.Name, v1.Statistics.Name: return false } return true } // distributionAPIBase provides clients with extra information about extended // features the distribution API via the Gitlab-Container-Registry-Features header. func distributionAPIBase(dbEnabled bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Gitlab-Container-Registry-Features", version.ExtFeatures) w.Header().Set("Gitlab-Container-Registry-Database-Enabled", strconv.FormatBool(dbEnabled)) apiBase(w, r) } } // apiBase implements a simple yes-man for doing overall checks against the // api. This can support auth roundtrips to support docker login. func apiBase(w http.ResponseWriter, _ *http.Request) { const emptyJSON = "{}" // Provide a simple /v2/ 200 OK response with empty json response. w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON))) w.Header().Set("Gitlab-Container-Registry-Version", strings.TrimPrefix(version.Version, "v")) _, _ = fmt.Fprint(w, emptyJSON) } // appendAccessRecords checks the method and adds the appropriate Access records to the records list. func appendAccessRecords(records []auth.Access, method, repo string) []auth.Access { resource := auth.Resource{ Type: "repository", Name: repo, } switch method { case http.MethodGet, http.MethodHead: records = append(records, auth.Access{ Resource: resource, Action: "pull", }) case http.MethodPost, http.MethodPut, http.MethodPatch: records = append(records, auth.Access{ Resource: resource, Action: "pull", }, auth.Access{ Resource: resource, Action: "push", }) case http.MethodDelete: records = append(records, auth.Access{ Resource: resource, Action: "delete", }) } return records } // Add the access record for the catalog if it's our current route func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []auth.Access { route := mux.CurrentRoute(r) routeName := route.GetName() if routeName == v2.RouteNameCatalog { resource := auth.Resource{ Type: "registry", Name: "catalog", } accessRecords = append(accessRecords, auth.Access{ Resource: resource, Action: "*", }) } return accessRecords } func appendRepositoryDetailsAccessRecords(accessRecords []auth.Access, r *http.Request, repo string) []auth.Access { route := mux.CurrentRoute(r) routeName := route.GetName() // For now, we only have three operations requiring a custom access record, which are: // 1. for returning the size of a repository including its descendants. // 2. for returning all the repositories under a given repository base path (including the base repository) // 3. renaming a base repository (name and path) and updating the sub-repositories (path) accordingly // These three operations require an access record of type `repository` and name `<name>/*` // (to grant access on all descendants), in addition to the standard access record of type `repository` and // name `<name>` (to grant read access to the base repository), which was appended in the preceding call to // `appendAccessRecords`. if routeName == v1.SubRepositories.Name || (routeName == v1.Repositories.Name && (sizeQueryParamValue(r) == sizeQueryParamSelfWithDescendantsValue || r.Method == http.MethodPatch)) { accessRecords = append(accessRecords, auth.Access{ Resource: auth.Resource{ Type: "repository", Name: fmt.Sprintf("%s/*", repo), }, Action: "pull", }) } return accessRecords } // appendRepositoryNamespaceAccessRecords adds the needed access records for moving a project's repositories // from one namespace to another as facilitated by the PATCH repository request. // Once it detects that the current request is a request to move repositories, // it adds the correct access records to the list of access records that must be present in the token. func appendRepositoryNamespaceAccessRecords(accessRecords []auth.Access, r *http.Request) ([]auth.Access, error, error) { route := mux.CurrentRoute(r) routeName := route.GetName() if r.Method == http.MethodPatch && routeName == v1.Repositories.Name { // Read the request body buf := new(bytes.Buffer) // Read from r.Body and write to buf simultaneously teeReader := io.TeeReader(r.Body, buf) // Read the body from the TeeReader body, err := io.ReadAll(teeReader) if err != nil { return accessRecords, err, v1.ErrorCodeInvalidJSONBody.WithDetail("invalid json") } // Rewind the request body reader to its original position to allow downstream handlers to read it if needed. r.Body = io.NopCloser(buf) // Parse the request body into a struct. var data RenameRepositoryAPIRequest if err := json.Unmarshal(body, &data); err != nil { return accessRecords, err, v1.ErrorCodeInvalidJSONBody.WithDetail("invalid json") } if data.Namespace != "" { accessRecords = append(accessRecords, auth.Access{ Resource: auth.Resource{ Type: "repository", Name: fmt.Sprintf("%s/*", data.Namespace), }, Action: "push", }) } } return accessRecords, nil, nil } // appendStatisticsAccessRecord adds the access records for the statistics endpoint. func appendStatisticsAccessRecord(accessRecords []auth.Access, r *http.Request) []auth.Access { route := mux.CurrentRoute(r) routeName := route.GetName() if routeName != v1.Statistics.Name { return accessRecords } accessRecords = append(accessRecords, auth.Access{ Resource: auth.Resource{Type: "registry", Name: "statistics"}, Action: "*", }) return accessRecords } // applyRegistryMiddleware wraps a registry instance with the configured middlewares func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) { for _, mw := range middlewares { rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry) if err != nil { return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err) } registry = rmw } return registry, nil } // applyRepoMiddleware wraps a repository with the configured middlewares func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { for _, mw := range middlewares { rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository) if err != nil { return nil, err } repository = rmw } return repository, nil } // applyStorageMiddleware wraps a storage driver with the configured middlewares func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) { for _, mw := range middlewares { smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver) if err != nil { return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err) } driver = smw } return driver, nil } // uploadPurgeDefaultConfig provides a default configuration for upload // purging to be used in the absence of configuration in the // configuration file func uploadPurgeDefaultConfig() map[any]any { config := make(map[any]any) config["enabled"] = true config["age"] = "168h" config["interval"] = "24h" config["dryrun"] = false return config } func badPurgeUploadConfig(reason string) error { return fmt.Errorf("unable to parse upload purge configuration: %s", reason) } // startUploadPurger schedules a goroutine which will periodically // check upload directories for old files and delete them func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log dcontext.Logger, config map[any]any) error { if v, ok := (config["enabled"]).(bool); ok && !v { return nil } var purgeAgeDuration time.Duration var err error purgeAge, ok := config["age"] if ok { ageStr, ok := purgeAge.(string) if !ok { return badPurgeUploadConfig("age is not a string") } purgeAgeDuration, err = time.ParseDuration(ageStr) if err != nil { return badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error())) } } else { return badPurgeUploadConfig("age missing") } var intervalDuration time.Duration interval, ok := config["interval"] if ok { intervalStr, ok := interval.(string) if !ok { return badPurgeUploadConfig("interval is not a string") } intervalDuration, err = time.ParseDuration(intervalStr) if err != nil { return badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error())) } } else { return badPurgeUploadConfig("interval missing") } var dryRunBool bool dryRun, ok := config["dryrun"] if !ok { return badPurgeUploadConfig("dryrun missing") } dryRunBool, ok = dryRun.(bool) if !ok { return badPurgeUploadConfig("cannot parse dryrun") } go func() { // nolint: gosec // used only for jitter calculation jitter := time.Duration(rand.Int()%60) * time.Minute log.Infof("Starting upload purge in %s", jitter) time.Sleep(jitter) for { storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool) log.Infof("Starting upload purge in %s", intervalDuration) time.Sleep(intervalDuration) } }() return nil } func (app *App) registerShutdownFunc(f shutdownFunc) { app.shutdownFuncs = append(app.shutdownFuncs, f) } // GracefulShutdown allows the app to free any resources before shutdown. func (app *App) GracefulShutdown(ctx context.Context) error { errs := new(multierror.Error) l := dlog.GetLogger(dlog.WithContext(ctx)) errCh := make(chan error, len(app.shutdownFuncs)) // NOTE(prozlach): it is important that we are quick during shutdown, as // e.g. k8s can forcefully terminate the pod with SIGKILL if the shutdown // takes too long. for _, f := range app.shutdownFuncs { go f(app, errCh, l) } for i := 0; i < len(app.shutdownFuncs); i++ { select { case <-ctx.Done(): return fmt.Errorf("app shutdown failed: %w", ctx.Err()) case err := <-errCh: errs = multierror.Append(errs, err) } } return errs.ErrorOrNil() } // DBStats returns the sql.DBStats for the metadata database connection handle. func (app *App) DBStats() sql.DBStats { return app.db.Primary().Stats() } func (app *App) repositoryFromContext(ctx *Context, w http.ResponseWriter) (distribution.Repository, error) { return repositoryFromContextWithRegistry(ctx, w, app.registry) } func repositoryFromContextWithRegistry(ctx *Context, w http.ResponseWriter, registry distribution.Namespace) (distribution.Repository, error) { nameRef, err := reference.WithName(getName(ctx)) if err != nil { dcontext.GetLogger(ctx).Errorf("error parsing reference from context: %v", err) ctx.Errors = append(ctx.Errors, distribution.ErrRepositoryNameInvalid{ Name: getName(ctx), Reason: err, }) if err := errcode.ServeJSON(w, ctx.Errors); err != nil { dcontext.GetLogger(ctx).Errorf("error serving error json: %v (from %v)", err, ctx.Errors) } return nil, err } repository, err := registry.Repository(ctx, nameRef) if err != nil { dcontext.GetLogger(ctx).Errorf("error resolving repository: %v", err) switch err := err.(type) { case distribution.ErrRepositoryUnknown: ctx.Errors = append(ctx.Errors, v2.ErrorCodeNameUnknown.WithDetail(err)) case distribution.ErrRepositoryNameInvalid: ctx.Errors = append(ctx.Errors, v2.ErrorCodeNameInvalid.WithDetail(err)) case errcode.Error: ctx.Errors = append(ctx.Errors, err) } if err := errcode.ServeJSON(w, ctx.Errors); err != nil { dcontext.GetLogger(ctx).Errorf("error serving error json: %v (from %v)", err, ctx.Errors) } return nil, err } return repository, nil } func startBackgroundMigrations(ctx context.Context, db *datastore.DB, config *configuration.Configuration) { l := dlog.GetLogger(dlog.WithContext(ctx)) // register all work functions with bbmWorker bbmWorker, err := bbm.RegisterWork(bbm.AllWork(), bbm.WithDB(db), bbm.WithLogger(dlog.GetLogger(dlog.WithContext(ctx))), bbm.WithJobInterval(config.Database.BackgroundMigrations.JobInterval), bbm.WithMaxJobAttempt(config.Database.BackgroundMigrations.MaxJobRetries), ) if err != nil { l.WithError(err).Error("background migration worker could not start") return } doneCh := make(chan struct{}) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) // listen for registry process exit signal in the background. go bbmListenForShutdown(ctx, sigChan, doneCh) // start looking for background migrations gracefulFinish, err := bbmWorker.ListenForBackgroundMigration(ctx, doneCh) if err != nil { l.WithError(err).Error("background migration worker exited abruptly") } l.Info("background migration worker is running") go func() { // wait for the worker to acknowledge that it is safe to exit. <-gracefulFinish l.Info("background migration worker stopped gracefully") }() } func bbmListenForShutdown(ctx context.Context, c chan os.Signal, doneCh chan struct{}) { <-c // signal to worker that the program is exiting dlog.GetLogger(dlog.WithContext(ctx)).Info("Background migration worker is shutting down") close(doneCh) }