func run()

in internal/cli/gitaly/serve.go [184:766]


func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	beganRun := time.Now()

	bootstrapSpan, ctx := tracing.StartSpan(ctx, "gitaly-bootstrap", nil)
	defer bootstrapSpan.Finish()

	if cfg.RuntimeDir != "" {
		if err := config.PruneOldGitalyProcessDirectories(logger, cfg.RuntimeDir); err != nil {
			return fmt.Errorf("prune runtime directories: %w", err)
		}
	}

	var err error
	cfg, err = config.SetupRuntimeDirectory(cfg, os.Getpid())
	if err != nil {
		return fmt.Errorf("setup runtime directory: %w", err)
	}

	cgroupMgr := cgroups.NewManager(cfg.Cgroups, logger, os.Getpid())

	began := time.Now()
	if err := cgroupMgr.Setup(); err != nil {
		return fmt.Errorf("failed setting up cgroups: %w", err)
	}
	logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished initializing cgroups")

	defer func() {
		if err := os.RemoveAll(cfg.RuntimeDir); err != nil {
			logger.Warn("could not clean up runtime dir")
		}
	}()

	began = time.Now()
	if err := gitaly.UnpackAuxiliaryBinaries(cfg.RuntimeDir, func(string) bool {
		return true
	}); err != nil {
		return fmt.Errorf("unpack auxiliary binaries: %w", err)
	}
	logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished unpacking auxiliary binaries")

	began = time.Now()
	b, err := bootstrap.New(logger, promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "gitaly_connections_total",
			Help: "Total number of connections to Gitaly",
		},
		[]string{"type"},
	))
	if err != nil {
		return fmt.Errorf("init bootstrap: %w", err)
	}
	logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished initializing bootstrap")

	began = time.Now()
	gitCmdFactory, cleanup, err := gitcmd.NewExecCommandFactory(cfg, logger, gitcmd.WithCgroupsManager(cgroupMgr))
	if err != nil {
		return fmt.Errorf("creating Git command factory: %w", err)
	}
	defer cleanup()
	logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished initializing command factory")

	logger.WithField("binary_path", gitCmdFactory.GetExecutionEnvironment(ctx).BinaryPath).Info("using Git binary")

	began = time.Now()
	gitVersion, err := gitCmdFactory.GitVersion(ctx)
	if err != nil {
		return fmt.Errorf("git version detection: %w", err)
	}
	logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished detecting git version")

	if !gitVersion.IsSupported() {
		return fmt.Errorf("unsupported Git version: %q", gitVersion)
	}

	logger.WithField("version", gitVersion.String()).Info("using Git version")

	registry := backchannel.NewRegistry()
	transactionManager := transaction.NewManager(cfg, logger, registry)
	prometheus.MustRegister(transactionManager)

	locator := config.NewLocator(cfg)

	repoCounter := counter.NewRepositoryCounter(cfg.Storages)
	prometheus.MustRegister(repoCounter)

	prometheus.MustRegister(gitCmdFactory)

	txRegistry := storagemgr.NewTransactionRegistry()

	conns := client.NewPool(
		client.WithDialer(client.HealthCheckDialer(
			func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error) {
				return client.New(ctx, address, client.WithGrpcOptions(opts))
			},
		)),
		client.WithDialOptions(
			client.UnaryInterceptor(),
			client.StreamInterceptor(),
		),
	)
	defer func() {
		_ = conns.Close()
	}()

	catfileCache := catfile.NewCache(cfg)
	defer catfileCache.Stop()
	prometheus.MustRegister(catfileCache)

	localrepoFactory := localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache)

	diskCache := cache.New(cfg, locator, logger)
	prometheus.MustRegister(diskCache)
	if err := diskCache.StartWalkers(); err != nil {
		return fmt.Errorf("disk cache walkers: %w", err)
	}

	// List of tracking adaptive limits. They will be calibrated by the adaptive calculator
	adaptiveLimits := []limiter.AdaptiveLimiter{}

	perRPCLimits, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
	for _, concurrency := range cfg.Concurrency {
		// Connect adaptive limits to the adaptive calculator
		if concurrency.Adaptive {
			adaptiveLimits = append(adaptiveLimits, perRPCLimits[concurrency.RPC])
		}
	}
	perRPCLimitHandler := limithandler.New(
		cfg,
		limithandler.LimitConcurrencyByRepo,
		setupPerRPCConcurrencyLimiters,
	)
	prometheus.MustRegister(perRPCLimitHandler)

	var packObjectLimit *limiter.AdaptiveLimit
	if cfg.PackObjectsLimiting.Adaptive {
		packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
			Initial:       cfg.PackObjectsLimiting.InitialLimit,
			Max:           cfg.PackObjectsLimiting.MaxLimit,
			Min:           cfg.PackObjectsLimiting.MinLimit,
			BackoffFactor: limiter.DefaultBackoffFactor,
		})
		adaptiveLimits = append(adaptiveLimits, packObjectLimit)
	} else {
		packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
			Initial: cfg.PackObjectsLimiting.MaxConcurrency,
		})
	}

	packObjectsMonitor := limiter.NewPackObjectsConcurrencyMonitor(
		cfg.Prometheus.GRPCLatencyBuckets,
	)
	packObjectsLimiter := limiter.NewConcurrencyLimiter(
		packObjectLimit,
		cfg.PackObjectsLimiting.MaxQueueLength,
		cfg.PackObjectsLimiting.MaxQueueWait.Duration(),
		packObjectsMonitor,
	)
	prometheus.MustRegister(packObjectsMonitor)

	// Enable the adaptive calculator only if there is any limit needed to be adaptive.
	if len(adaptiveLimits) > 0 {
		adaptiveCalculator := limiter.NewAdaptiveCalculator(
			limiter.DefaultCalibrateFrequency,
			logger,
			adaptiveLimits,
			[]limiter.ResourceWatcher{
				watchers.NewCgroupCPUWatcher(cgroupMgr, cfg.AdaptiveLimiting.CPUThrottledThreshold),
				watchers.NewCgroupMemoryWatcher(cgroupMgr, cfg.AdaptiveLimiting.MemoryThreshold),
			},
		)
		prometheus.MustRegister(adaptiveCalculator)

		stop, err := adaptiveCalculator.Start(ctx)
		if err != nil {
			logger.WithError(err).Warn("error starting adaptive limiter calculator")
		}
		defer stop()
	}

	storageMetrics := storagemgr.NewMetrics(cfg.Prometheus)
	housekeepingMetrics := housekeeping.NewMetrics(cfg.Prometheus)
	raftMetrics := raftmgr.NewMetrics()
	partitionMetrics := partition.NewMetrics(housekeepingMetrics)
	migrationMetrics := migration.NewMetrics()
	reftableMigratorMetrics := reftable.NewMetrics()
	prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics, reftableMigratorMetrics)

	migrations := []migration.Migration{}

	var txMiddleware server.TransactionMiddleware
	var node storage.Node
	if cfg.Transactions.Enabled {
		logger.WarnContext(ctx, "Transactions enabled. Transactions are an experimental feature. The feature is not production ready yet and might lead to various issues including data loss.")

		dbMgr, err := databasemgr.NewDBManager(
			ctx,
			cfg.Storages,
			keyvalue.NewBadgerStore,
			helper.NewTimerTickerFactory(time.Minute),
			logger,
		)
		if err != nil {
			return fmt.Errorf("new db manager: %w", err)
		}
		defer dbMgr.Close()

		var logConsumer storage.LogConsumer
		if cfg.Backup.WALGoCloudURL != "" {
			walSink, err := backup.ResolveSink(ctx, cfg.Backup.WALGoCloudURL)
			if err != nil {
				return fmt.Errorf("resolving write-ahead log backup sink: %w", err)
			}

			walArchiver := backup.NewLogEntryArchiver(logger, walSink, cfg.Backup.WALWorkerCount, &node)
			prometheus.MustRegister(walArchiver)
			walArchiver.Run()
			defer walArchiver.Close()

			logConsumer = walArchiver
		}

		var raftFactory raftmgr.RaftReplicaFactory
		var raftNode *raftmgr.Node

		if cfg.Raft.Enabled {
			raftNode, err = raftmgr.NewNode(cfg, logger, dbMgr, conns)
			if err != nil {
				return fmt.Errorf("new raft node: %w", err)
			}

			raftFactory = raftmgr.DefaultFactoryWithNode(cfg.Raft, raftNode)
		}

		var offloadingSink *offloading.Sink
		if cfg.Offloading.Enabled {
			if cfg.Offloading.GoCloudURL == "" {
				return fmt.Errorf("empty offloading storage URL")
			}
			var bucket *blob.Bucket
			var err error
			if bucket, err = blob.OpenBucket(ctx, cfg.Offloading.GoCloudURL); err != nil {
				return fmt.Errorf("create offloading bucket: %w", err)
			}
			defer func() { _ = bucket.Close() }()

			if offloadingSink, err = offloading.NewSink(bucket); err != nil {
				return fmt.Errorf("create offloading sink: %w", err)
			}
		}

		partitionFactoryOptions := []partition.FactoryOption{
			partition.WithCmdFactory(gitCmdFactory),
			partition.WithRepoFactory(localrepoFactory),
			partition.WithMetrics(partitionMetrics),
			partition.WithLogConsumer(logConsumer),
			partition.WithRaftConfig(cfg.Raft),
			partition.WithRaftFactory(raftFactory),
			partition.WithOffloadingSink(offloadingSink),
		}

		nodeMgr, err := nodeimpl.NewManager(
			cfg.Storages,
			storagemgr.NewFactory(
				logger,
				dbMgr,
				migration.NewFactory(
					partition.NewFactory(partitionFactoryOptions...),
					migrationMetrics,
					migrations,
				),
				2,
				storageMetrics,
			),
		)
		if err != nil {
			return fmt.Errorf("new node manager: %w", err)
		}
		defer nodeMgr.Close()

		if cfg.Raft.Enabled {
			for _, storageCfg := range cfg.Storages {
				baseStorage, err := nodeMgr.GetStorage(storageCfg.Name)
				if err != nil {
					return fmt.Errorf("get base storage %q from node manager: %w", storageCfg.Name, err)
				}
				if err := raftNode.SetBaseStorage(storageCfg.Name, baseStorage); err != nil {
					return fmt.Errorf("set base storage for raft node %q: %w", storageCfg.Name, err)
				}
			}
			node = raftNode

			// Start partition worker synchronously as it is a pre-requisite for Raft.
			if err := storagemgr.AssignmentWorker(ctx, cfg, node, dbMgr, locator); err != nil {
				return fmt.Errorf("partition assignment worker: %w", err)
			}
		} else {
			node = nodeMgr
		}

		reftableMigrator := reftable.NewMigrator(logger, reftableMigratorMetrics, node, localrepoFactory)
		reftableMigrator.Run()
		defer reftableMigrator.Close()

		txMiddleware = server.TransactionMiddleware{
			UnaryInterceptors: []grpc.UnaryServerInterceptor{
				storagemgr.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator),
				reftable.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator),
			},
			StreamInterceptors: []grpc.StreamServerInterceptor{
				storagemgr.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator),
				reftable.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator),
			},
		}
	} else {
		storagePaths := make([]string, len(cfg.Storages))
		for i := range cfg.Storages {
			storagePaths[i] = cfg.Storages[i].Path
		}

		if mayHaveWAL, err := storagemgr.MayHavePendingWAL(storagePaths); err != nil {
			return fmt.Errorf("may have pending WAL: %w", err)
		} else if mayHaveWAL {
			dbMgr, err := databasemgr.NewDBManager(
				ctx,
				cfg.Storages,
				keyvalue.NewBadgerStore,
				helper.NewTimerTickerFactory(time.Minute),
				logger,
			)
			if err != nil {
				return fmt.Errorf("new db manager: %w", err)
			}
			defer dbMgr.Close()

			partitionFactoryOptions := []partition.FactoryOption{
				partition.WithCmdFactory(gitCmdFactory),
				partition.WithRepoFactory(localrepoFactory),
				partition.WithMetrics(partitionMetrics),
				partition.WithRaftConfig(cfg.Raft),
			}

			nodeMgr, err := nodeimpl.NewManager(
				cfg.Storages,
				storagemgr.NewFactory(
					logger,
					dbMgr,
					partition.NewFactory(partitionFactoryOptions...),
					// In recovery mode we don't want to keep inactive partitions active. The cache
					// however can't be disabled so simply set it to one.
					1,
					storageMetrics,
				),
			)
			if err != nil {
				return fmt.Errorf("new node: %w", err)
			}
			defer nodeMgr.Close()

			recoveryMiddleware := storagemgr.NewTransactionRecoveryMiddleware(protoregistry.GitalyProtoPreregistered, nodeMgr)
			txMiddleware = server.TransactionMiddleware{
				UnaryInterceptors: []grpc.UnaryServerInterceptor{
					recoveryMiddleware.UnaryServerInterceptor(),
				},
				StreamInterceptors: []grpc.StreamServerInterceptor{
					recoveryMiddleware.StreamServerInterceptor(),
				},
			}
		}
	}

	housekeepingManager := housekeepingmgr.New(cfg.Prometheus, logger, transactionManager, node)
	prometheus.MustRegister(housekeepingManager)

	gitalyServerFactory := server.NewGitalyServerFactory(
		cfg,
		logger,
		registry,
		diskCache,
		[]*limithandler.LimiterMiddleware{perRPCLimitHandler},
		txMiddleware,
	)
	defer gitalyServerFactory.Stop()

	gitlabClient := gitlab.NewStubClient()
	if skipHooks, _ := env.GetBool("GITALY_TESTING_NO_GIT_HOOKS", false); skipHooks {
		logger.Warn("skipping GitLab API client creation since hooks are bypassed via GITALY_TESTING_NO_GIT_HOOKS")
	} else {
		httpClient, err := gitlab.NewHTTPClient(logger, cfg.Gitlab, cfg.TLS, cfg.Prometheus)
		if err != nil {
			return fmt.Errorf("could not create GitLab API client: %w", err)
		}
		prometheus.MustRegister(httpClient)
		gitlabClient = httpClient
	}

	hookManager := hook.NewManager(
		cfg,
		locator,
		logger,
		gitCmdFactory,
		transactionManager,
		gitlabClient,
		hook.NewTransactionRegistry(txRegistry),
		hook.NewProcReceiveRegistry(),
		node,
	)

	updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, logger, locator, hookManager, gitCmdFactory, catfileCache)

	streamCache := streamcache.New(cfg.PackObjectsCache, logger)

	var backupSink *backup.Sink
	var backupLocator backup.Locator
	if cfg.Backup.GoCloudURL != "" {
		var err error
		backupSink, err = backup.ResolveSink(ctx, cfg.Backup.GoCloudURL, backup.WithBufferSize(cfg.Backup.BufferSize))
		if err != nil {
			return fmt.Errorf("resolve backup sink: %w", err)
		}
		backupLocator, err = backup.ResolveLocator(cfg.Backup.Layout, backupSink)
		if err != nil {
			return fmt.Errorf("resolve backup locator: %w", err)
		}
	}

	var bundleURIManager *bundleuri.GenerationManager
	if cfg.BundleURI.GoCloudURL != "" {
		bundleURISink, err := bundleuri.NewSink(ctx, cfg.BundleURI.GoCloudURL)
		if err != nil {
			return fmt.Errorf("create bundle-URI sink: %w", err)
		}

		// The manager created here merely to have a non-nil manager in order to use
		// the SignedURL() method on it. It is not used, yet, to generate bundles
		// based on this configuration.
		// Further tests and analysis would be required to come up with the
		// appropriate configuration. This will be done once we are ready to use this manager
		// to generate bundles.
		maxBundleAge := time.Hour * 24
		interval := time.Minute
		maxConcurrent := 5
		threshold := 5
		bundleGenerationStrategy, err := bundleuri.NewOccurrenceStrategy(logger, threshold, interval, maxConcurrent, maxBundleAge)
		if err != nil {
			return fmt.Errorf("error creating bundle generation strategy: %w", err)
		}

		prometheus.MustRegister(bundleGenerationStrategy)
		stop := bundleGenerationStrategy.Start(ctx)
		defer stop()

		bundleURIManager, err = bundleuri.NewGenerationManager(ctx, bundleURISink, logger, node, bundleGenerationStrategy)
		if err != nil {
			return fmt.Errorf("error creating bundle manager: %w", err)
		}
		logger.Info(fmt.Sprintf("bundle-uri bucket configured: %s", cfg.BundleURI.GoCloudURL))
		prometheus.MustRegister(bundleURIManager)
	}

	for _, c := range []starter.Config{
		{Name: starter.Unix, Addr: cfg.SocketPath, HandoverOnUpgrade: true},
		{Name: starter.Unix, Addr: cfg.InternalSocketPath(), HandoverOnUpgrade: false},
		{Name: starter.TCP, Addr: cfg.ListenAddr, HandoverOnUpgrade: true},
		{Name: starter.TLS, Addr: cfg.TLSListenAddr, HandoverOnUpgrade: true},
	} {
		if c.Addr == "" {
			continue
		}

		var srv *grpc.Server
		if c.HandoverOnUpgrade {
			srv, err = gitalyServerFactory.CreateExternal(c.IsSecure())
			if err != nil {
				return fmt.Errorf("create external gRPC server: %w", err)
			}
		} else {
			srv, err = gitalyServerFactory.CreateInternal()
			if err != nil {
				return fmt.Errorf("create internal gRPC server: %w", err)
			}
		}

		setup.RegisterAll(srv, &service.Dependencies{
			Logger:                 logger,
			Cfg:                    cfg,
			GitalyHookManager:      hookManager,
			TransactionManager:     transactionManager,
			StorageLocator:         locator,
			ClientPool:             conns,
			GitCmdFactory:          gitCmdFactory,
			CatfileCache:           catfileCache,
			DiskCache:              diskCache,
			PackObjectsCache:       streamCache,
			PackObjectsLimiter:     packObjectsLimiter,
			RepositoryCounter:      repoCounter,
			UpdaterWithHooks:       updaterWithHooks,
			Node:                   node,
			TransactionRegistry:    txRegistry,
			HousekeepingManager:    housekeepingManager,
			BackupSink:             backupSink,
			BackupLocator:          backupLocator,
			LocalRepositoryFactory: localrepoFactory,
			BundleURIManager:       bundleURIManager,
			MigrationStateManager:  migration.NewStateManager(migrations),
		})
		b.RegisterStarter(starter.New(c, srv, logger))
	}

	if addr := cfg.PrometheusListenAddr; addr != "" {
		b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error, _ *prometheus.CounterVec) error {
			l, err := listen("tcp", addr)
			if err != nil {
				return err
			}

			logger.WithField("address", addr).Info("starting prometheus listener")

			go func() {
				opts := []monitoring.Option{
					monitoring.WithListener(l),
					monitoring.WithBuildExtraLabels(
						map[string]string{"git_version": gitVersion.String()},
					),
				}

				if buildInfo, ok := debug.ReadBuildInfo(); ok {
					opts = append(opts, monitoring.WithGoBuildInformation(buildInfo))
				}

				if err := monitoring.Start(opts...); err != nil {
					logger.WithError(err).Error("Unable to serve prometheus")
				}
			}()

			return nil
		})
	}

	for _, shard := range cfg.Storages {
		if err := mdfile.WriteMetadataFile(ctx, shard.Path); err != nil {
			// TODO should this be a return? https://gitlab.com/gitlab-org/gitaly/issues/1893
			logger.WithError(err).Error("Unable to write gitaly metadata file")
		}
	}

	// When cgroups are configured, we create a directory structure each
	// time a gitaly process is spawned. Look through the hierarchy root
	// to find any cgroup directories that belong to old gitaly processes
	// and remove them.
	cgroups.StartPruningOldCgroups(cfg.Cgroups, logger)
	repoCounter.StartCountingRepositories(ctx, locator, logger)
	tempdir.StartCleaning(logger, locator, cfg.Storages, time.Hour)

	if err := b.Start(); err != nil {
		return fmt.Errorf("unable to start the bootstrap: %w", err)
	}
	bootstrapSpan.Finish()
	// There are a few goroutines running async tasks that may still be in progress (i.e. preloading the license
	// database), but this is a close enough indication of startup latency.
	logger.WithField("duration_ms", time.Since(beganRun).Milliseconds()).Info("Started Gitaly")

	if !cfg.DailyMaintenance.IsDisabled() {
		shutdownWorkers, err := maintenance.StartWorkers(
			ctx,
			logger,
			maintenance.DailyOptimizationWorker(cfg, maintenance.OptimizerFunc(func(ctx context.Context, logger log.Logger, repo storage.Repository) error {
				return housekeepingManager.OptimizeRepository(ctx, localrepo.New(logger, locator, gitCmdFactory, catfileCache, repo))
			})),
		)
		if err != nil {
			return fmt.Errorf("initialize auxiliary workers: %w", err)
		}
		defer shutdownWorkers()
	}

	gracefulStopTicker := helper.NewTimerTicker(cfg.GracefulRestartTimeout.Duration())
	defer gracefulStopTicker.Stop()

	return b.Wait(gracefulStopTicker, gitalyServerFactory.GracefulStop)
}