internal/cli/praefect/serve.go (500 lines of code) (raw):

package praefect import ( "context" "database/sql" "errors" "fmt" "math/rand" "net/http" "os" "runtime/debug" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli/v3" "gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap" "gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/nodes/tracker" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/reconciler" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/repocleaner" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/service" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/service/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/v16/internal/version" "gitlab.com/gitlab-org/labkit/monitoring" "gitlab.com/gitlab-org/labkit/tracing" ) func newServeCommand() *cli.Command { return &cli.Command{ Name: "serve", Usage: "launch the server daemon", Description: `Launch the Praefect server daemon. Example: praefect --config praefect.config.toml serve`, Action: serveAction, Before: func(ctx context.Context, cmd *cli.Command) (context.Context, error) { if cmd.Args().Present() { return nil, unexpectedPositionalArgsError{Command: cmd.Name} } return ctx, nil }, } } func serveAction(ctx context.Context, cmd *cli.Command) error { if cmd.Args().Present() { return unexpectedPositionalArgsError{Command: cmd.Name} } // The cmd.Command.Name can't be used here because if `praefect -config FILE` is used // it will be set to 'praefect' instead of 'serve'. configPath := mustProvideConfigFlag(ctx, cmd, "serve") conf, err := readConfig(configPath) if err != nil { return err } logger, err := log.Configure(os.Stdout, conf.Logging.Format, conf.Logging.Level) if err != nil { return fmt.Errorf("configuring logger: %w", err) } logger.WithField("version", version.GetVersion()).Info("Starting Praefect") if !conf.Failover.Enabled && conf.Failover.ElectionStrategy != "" { logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( "ignoring configured election strategy as failover is disabled") } if err := run(conf, cmd.Name, logger); err != nil { logger.WithError(err).Error("Praefect shutdown") return cli.Exit("", 1) } logger.Info("Praefect shutdown") return nil } func run(conf config.Config, appName string, logger log.Logger) error { configure(logger, appName, conf) starterConfigs, err := getStarterConfigs(conf) if err != nil { return cli.Exit(err, 1) } promreg := prometheus.DefaultRegisterer b, err := bootstrap.New(logger, promauto.With(promreg).NewCounterVec( prometheus.CounterOpts{ Name: "gitaly_praefect_connections_total", Help: "Total number of connections to Praefect", }, []string{"type"}, )) if err != nil { return cli.Exit(fmt.Errorf("unable to create a bootstrap: %w", err), 1) } dbPromRegistry := prometheus.NewRegistry() if err := server(starterConfigs, conf, logger, b, promreg, dbPromRegistry); err != nil { return cli.Exit(err, 1) } return nil } func readConfig(path string) (config.Config, error) { conf, err := config.FromFile(path) if err != nil { return conf, fmt.Errorf("error reading config file: %w", err) } if err := conf.Validate(); err != nil { return config.Config{}, fmt.Errorf("validating config: %w", err) } if !conf.AllowLegacyElectors { conf.Failover.ElectionStrategy = config.ElectionStrategyPerRepository } return conf, nil } func configure(logger log.Logger, appName string, conf config.Config) { tracing.Initialize(tracing.WithServiceName(appName)) if conf.PrometheusListenAddr != "" { conf.Prometheus.Configure(logger) } sentry.ConfigureSentry(logger, version.GetVersion(), conf.Sentry) } func server( cfgs []starter.Config, conf config.Config, logger log.Logger, b bootstrap.Listener, promreg prometheus.Registerer, dbPromRegistry interface { prometheus.Registerer prometheus.Gatherer }, ) error { nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg) if err != nil { return err } delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg) if err != nil { return err } latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg) if err != nil { return err } ctx, cancel := context.WithCancel(context.Background()) defer cancel() var db *sql.DB if conf.NeedsSQL() { logger.WithField("database_address", fmt.Sprintf("%s:%d", conf.DB.Host, conf.DB.Port)).Info("establishing database connection") dbConn, closedb, err := initDatabase(ctx, logger, conf) if err != nil { return err } defer closedb() db = dbConn logger.Info("database connection established") } var queue datastore.ReplicationEventQueue var rs datastore.RepositoryStore var csg datastore.ConsistentStoragesGetter var metricsCollectors []prometheus.Collector if conf.MemoryQueueEnabled { queue = datastore.NewMemoryReplicationEventQueue(conf) rs = datastore.MockRepositoryStore{} csg = rs logger.Info("reads distribution caching is disabled for in memory storage") } else { queue = datastore.NewPostgresReplicationEventQueue(db) rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) dsn := glsql.DSN(conf.DB, true) if dsn == "" { csg = rs logger.Info("reads distribution caching is disabled because direct connection to Postgres is not set") } else { storagesCached, err := datastore.NewCachingConsistentStoragesGetter(logger, rs, conf.VirtualStorageNames()) if err != nil { return fmt.Errorf("caching storage provider: %w", err) } resilientListenerTicker := helper.NewTimerTicker(5 * time.Second) notificationsListener := datastore.NewResilientListener(conf.DB, resilientListenerTicker, logger) go func() { err := notificationsListener.Listen(ctx, storagesCached, datastore.StorageRepositoriesUpdatesChannel, datastore.RepositoriesUpdatesChannel) if err != nil && !errors.Is(err, context.Canceled) { logger.WithError(err).Error("notifications listener terminated") } }() metricsCollectors = append(metricsCollectors, storagesCached, notificationsListener) csg = storagesCached logger.Info("reads distribution caching is enabled by configuration") } } var errTracker tracker.ErrorTracker if conf.Failover.Enabled { thresholdsConfigured, err := conf.Failover.ErrorThresholdsConfigured() if err != nil { return err } if thresholdsConfigured { errorWindowFunction, err := tracker.NewErrorWindowFunction(conf.Failover) if err != nil { return err } errTracker, err = tracker.NewErrors(ctx, errorWindowFunction, conf.Failover.ReadErrorThresholdCount, conf.Failover.WriteErrorThresholdCount) if err != nil { return err } } } transactionManager := transactions.NewManager(conf, logger) sidechannelRegistry := sidechannel.NewRegistry() backchannelCfg := backchannel.DefaultConfiguration() backchannelCfg.AcceptBacklog = int(conf.Yamux.AcceptBacklog) backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes clientHandshaker := backchannel.NewClientHandshaker( logger, praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry), backchannelCfg, ) assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames()) var ( nodeManager nodes.Manager healthChecker praefect.HealthChecker nodeSet praefect.NodeSet router praefect.Router primaryGetter praefect.PrimaryGetter ) if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { nodeSet, err = praefect.DialNodes( ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry, logger, ) if err != nil { return fmt.Errorf("dial nodes: %w", err) } defer nodeSet.Close() healthManager := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients()) go func() { if err := healthManager.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil { logger.WithError(err).Error("health manager exited") } }() healthChecker = healthManager // Wait for the first health check to complete so the Praefect doesn't start serving RPC // before the router is ready with the health status of the nodes. <-healthManager.Updated() elector := nodes.NewPerRepositoryElector(logger, db) primaryGetter = elector assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) router = praefect.NewPerRepositoryRouter( nodeSet.Connections(), elector, healthManager, praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), csg, assignmentStore, rs, conf.DefaultReplicationFactors(), ) if conf.BackgroundVerification.VerificationInterval > 0 { logger.WithField("config", conf.BackgroundVerification).Info("background verifier started") verifier := praefect.NewMetadataVerifier( logger, db, nodeSet.Connections(), healthManager, conf.BackgroundVerification.VerificationInterval.Duration(), conf.BackgroundVerification.DeleteInvalidRecords, ) promreg.MustRegister(verifier) go func() { if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil { logger.WithError(err).Error("metadata verifier finished") } }() go func() { if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil { logger.WithError(err).Error("expired verification lease releaser finished") } }() } else { logger.Info("background verifier is disabled") } } else { if conf.Failover.Enabled { logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( "Deprecated election strategy in use") } nodeMgr, err := nodes.NewManager(logger, conf, db, csg, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry) if err != nil { return err } healthChecker = praefect.HealthChecker(nodeMgr) nodeSet = praefect.NodeSetFromNodeManager(nodeMgr) router = praefect.NewNodeManagerRouter(nodeMgr, rs) primaryGetter = nodeMgr nodeManager = nodeMgr nodeMgr.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration()) defer nodeMgr.Stop() } logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Info("election strategy") logger.Info("background started: gitaly nodes health monitoring") var ( // top level server dependencies coordinator = praefect.NewCoordinator( logger, queue, rs, router, transactionManager, conf, protoregistry.GitalyProtoPreregistered, ) repl = praefect.NewReplMgr( logger, conf.StorageNames(), queue, rs, healthChecker, nodeSet, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), praefect.WithDequeueBatchSize(conf.Replication.BatchSize), praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers), ) srvFactory = praefect.NewServerFactory(&praefect.Dependencies{ Config: conf, Logger: logger, Coordinator: coordinator, Director: coordinator.StreamDirector, NodeMgr: nodeManager, TxMgr: transactionManager, Queue: queue, RepositoryStore: rs, AssignmentStore: assignmentStore, Router: router, Registry: protoregistry.GitalyProtoPreregistered, Conns: nodeSet.Connections(), PrimaryGetter: primaryGetter, Checks: service.Checks(), }, defaultServerOptions...) ) metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) if db != nil { dbMetricCollectors := []prometheus.Collector{ datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout.Duration()), datastore.NewQueueDepthCollector(logger, db, conf.Prometheus.ScrapeTimeout.Duration()), } if conf.BackgroundVerification.VerificationInterval > 0 { dbMetricCollectors = append(dbMetricCollectors, datastore.NewVerificationQueueDepthCollector( logger, db, conf.Prometheus.ScrapeTimeout.Duration(), conf.BackgroundVerification.VerificationInterval.Duration(), conf.StorageNames(), )) } // Database-related metrics are exported via a separate endpoint such that it's possible // to set a different scraping interval and thus to reduce database load. dbPromRegistry.MustRegister(dbMetricCollectors...) } promreg.MustRegister(metricsCollectors...) for _, cfg := range cfgs { srv, err := srvFactory.Create(cfg.IsSecure()) if err != nil { return fmt.Errorf("create gRPC server: %w", err) } defer srv.Stop() b.RegisterStarter(starter.New(cfg, srv, logger)) } if conf.PrometheusListenAddr != "" { logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener") b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error, _ *prometheus.CounterVec) error { l, err := listen(starter.TCP, conf.PrometheusListenAddr) if err != nil { return err } serveMux := http.NewServeMux() serveMux.Handle("/db_metrics", promhttp.HandlerFor(dbPromRegistry, promhttp.HandlerOpts{})) go func() { opts := []monitoring.Option{ monitoring.WithListener(l), monitoring.WithServeMux(serveMux), } if buildInfo, ok := debug.ReadBuildInfo(); ok { opts = append(opts, monitoring.WithGoBuildInformation(buildInfo)) } if err := monitoring.Start(opts...); err != nil { logger.WithError(err).WithField("listen_address", conf.PrometheusListenAddr).Error("unable to start prometheus listener") } }() return nil }) } if err := b.Start(); err != nil { return fmt.Errorf("unable to start the bootstrap: %w", err) } for _, cfg := range cfgs { logger.WithFields(log.Fields{"schema": cfg.Name, "address": cfg.Addr}).Info("listening") } go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second}) staleTicker := helper.NewTimerTicker(30 * time.Second) defer staleTicker.Stop() logger.Info("background started: processing of the replication events") repl.ProcessStale(ctx, staleTicker, time.Minute) logger.Info("background started: processing of the stale replication events") if interval := conf.Reconciliation.SchedulingInterval.Duration(); interval > 0 { if conf.MemoryQueueEnabled { logger.Warn("Disabled automatic reconciliation as it is only implemented using SQL queue and in-memory queue is configured.") } else { r := reconciler.NewReconciler( logger, db, healthChecker, conf.StorageNames(), conf.Reconciliation.HistogramBuckets, ) promreg.MustRegister(r) go func() { if err := r.Run(ctx, helper.NewTimerTicker(interval)); err != nil { logger.WithError(err).Error("reconciler finished execution") } }() } } if interval := conf.RepositoriesCleanup.RunInterval.Duration(); interval > 0 { if db != nil { go func() { storageSync := datastore.NewStorageCleanup(db) cfg := repocleaner.Cfg{ RunInterval: conf.RepositoriesCleanup.RunInterval.Duration(), LivenessInterval: 30 * time.Second, RepositoriesInBatch: int(conf.RepositoriesCleanup.RepositoriesInBatch), } repoCleaner := repocleaner.NewRunner(cfg, logger, healthChecker, nodeSet.Connections(), storageSync, storageSync, repocleaner.NewLogWarnAction(logger)) if err := repoCleaner.Run(ctx, helper.NewTimerTicker(conf.RepositoriesCleanup.CheckInterval.Duration())); err != nil && !errors.Is(err, context.Canceled) { logger.WithError(err).Error("repository cleaner finished execution") } else { logger.Info("repository cleaner finished execution") } }() } else { logger.Warn("Repository cleanup background task disabled as there is no database connection configured.") } } else { logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`) } gracefulStopTicker := helper.NewTimerTicker(conf.GracefulStopTimeout.Duration()) defer gracefulStopTicker.Stop() return b.Wait(gracefulStopTicker, srvFactory.GracefulStop) } func getStarterConfigs(conf config.Config) ([]starter.Config, error) { var cfgs []starter.Config unique := map[string]struct{}{} for schema, addr := range map[string]string{ starter.TCP: conf.ListenAddr, starter.TLS: conf.TLSListenAddr, starter.Unix: conf.SocketPath, } { if addr == "" { continue } addrConf, err := starter.ParseEndpoint(addr) if err != nil { // address doesn't include schema if !errors.Is(err, starter.ErrEmptySchema) { return nil, err } addrConf = starter.Config{Name: schema, Addr: addr} } addrConf.HandoverOnUpgrade = true if _, found := unique[addrConf.Addr]; found { return nil, fmt.Errorf("same address can't be used for different schemas %q", addr) } unique[addrConf.Addr] = struct{}{} cfgs = append(cfgs, addrConf) } if len(cfgs) == 0 { return nil, errors.New("no listening addresses were provided, unable to start") } return cfgs, nil } func initDatabase(ctx context.Context, logger log.Logger, conf config.Config) (*sql.DB, func(), error) { openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() db, err := glsql.OpenDB(openDBCtx, conf.DB) if err != nil { logger.WithError(err).Error("SQL connection open failed") return nil, nil, err } closedb := func() { if err := db.Close(); err != nil { logger.WithError(err).Error("SQL connection close failed") } } if err := datastore.CheckPostgresVersion(db); err != nil { closedb() return nil, nil, err } return db, closedb, nil }