internal/cli/gitaly/serve.go (639 lines of code) (raw):
package gitaly
import (
"context"
"fmt"
"os"
"runtime/debug"
"time"
"github.com/go-enry/go-license-detector/v4/licensedb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/urfave/cli/v3"
"gitlab.com/gitlab-org/gitaly/v16"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap"
"gitlab.com/gitlab-org/gitaly/v16/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri"
"gitlab.com/gitlab-org/gitaly/v16/internal/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/cgroups"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
housekeepingmgr "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/manager"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/sentry"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/maintenance"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mdfile"
nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration/reftable"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/env"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter/watchers"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/offloading"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v16/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/v16/internal/tracing"
"gitlab.com/gitlab-org/gitaly/v16/internal/version"
"gitlab.com/gitlab-org/labkit/fips"
"gitlab.com/gitlab-org/labkit/monitoring"
labkittracing "gitlab.com/gitlab-org/labkit/tracing"
"go.uber.org/automaxprocs/maxprocs"
"gocloud.dev/blob"
"google.golang.org/grpc"
// Import to register the proxy codec with gRPC.
_ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
)
func newServeCommand() *cli.Command {
return &cli.Command{
Name: "serve",
Usage: "launch the server daemon",
UsageText: `gitaly serve <gitaly_config_file>
Example: gitaly serve gitaly.config.toml`,
Description: "Launch the Gitaly server daemon.",
Action: serveAction,
}
}
func loadConfig(configPath string) (config.Cfg, error) {
cfgFile, err := os.Open(configPath)
if err != nil {
return config.Cfg{}, err
}
defer cfgFile.Close()
cfg, err := config.Load(cfgFile)
if err != nil {
return config.Cfg{}, err
}
if err := cfg.Validate(); err != nil {
return config.Cfg{}, fmt.Errorf("invalid config: %w", err)
}
return cfg, nil
}
func serveAction(ctx context.Context, cmd *cli.Command) error {
if cmd.NArg() != 1 || cmd.Args().First() == "" {
cli.ShowSubcommandHelpAndExit(cmd, 2)
}
cfg, logger, err := configure(cmd.Args().First())
if err != nil {
return cli.Exit(err, 1)
}
if cfg.Auth.Transitioning && len(cfg.Auth.Token) > 0 {
logger.Warn("Authentication is enabled but not enforced because transitioning=true. Gitaly will accept unauthenticated requests.")
}
logger.WithField("version", version.GetVersion()).Info("Starting Gitaly")
fips.Check()
if err := run(cmd, cfg, logger); err != nil {
return cli.Exit(fmt.Errorf("unclean Gitaly shutdown: %w", err), 1)
}
logger.Info("Gitaly shutdown")
return nil
}
func configure(configPath string) (config.Cfg, log.Logger, error) {
cfg, err := loadConfig(configPath)
if err != nil {
return config.Cfg{}, nil, fmt.Errorf("load config: config_path %q: %w", configPath, err)
}
urlSanitizer := log.NewURLSanitizerHook()
urlSanitizer.AddPossibleGrpcMethod(
"CreateRepositoryFromURL",
"FetchRemote",
"UpdateRemoteMirror",
)
logger, err := log.Configure(log.NewSyncWriter(os.Stdout), cfg.Logging.Format, cfg.Logging.Level, urlSanitizer)
if err != nil {
return config.Cfg{}, nil, fmt.Errorf("configuring logger failed: %w", err)
}
if err := cfg.ValidateV2(); err != nil {
logger.Warn(
fmt.Sprintf(
"The current configurations will cause Gitaly to fail to start up in future versions. Please run 'gitaly configuration validate < %s' and fix the errors that are printed.",
configPath,
),
)
}
if undo, err := maxprocs.Set(maxprocs.Logger(func(s string, i ...interface{}) {
logger.Info(fmt.Sprintf(s, i...))
})); err != nil {
logger.WithError(err).Error("failed to set GOMAXPROCS")
undo()
}
sentry.ConfigureSentry(logger, version.GetVersion(), sentry.Config(cfg.Logging.Sentry))
cfg.Prometheus.Configure(logger)
labkittracing.Initialize(labkittracing.WithServiceName("gitaly"))
preloadLicenseDatabase(logger)
return cfg, logger, nil
}
func preloadLicenseDatabase(logger log.Logger) {
go func() {
// the first call to `licensedb.Detect` could be too long
// https://github.com/go-enry/go-license-detector/issues/13
// this is why we're calling it here to preload license database
// on server startup to avoid long initialization on gRPC
// method handling.
began := time.Now()
licensedb.Preload()
logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("License database preloaded")
}()
}
func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
beganRun := time.Now()
bootstrapSpan, ctx := tracing.StartSpan(ctx, "gitaly-bootstrap", nil)
defer bootstrapSpan.Finish()
if cfg.RuntimeDir != "" {
if err := config.PruneOldGitalyProcessDirectories(logger, cfg.RuntimeDir); err != nil {
return fmt.Errorf("prune runtime directories: %w", err)
}
}
var err error
cfg, err = config.SetupRuntimeDirectory(cfg, os.Getpid())
if err != nil {
return fmt.Errorf("setup runtime directory: %w", err)
}
cgroupMgr := cgroups.NewManager(cfg.Cgroups, logger, os.Getpid())
began := time.Now()
if err := cgroupMgr.Setup(); err != nil {
return fmt.Errorf("failed setting up cgroups: %w", err)
}
logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished initializing cgroups")
defer func() {
if err := os.RemoveAll(cfg.RuntimeDir); err != nil {
logger.Warn("could not clean up runtime dir")
}
}()
began = time.Now()
if err := gitaly.UnpackAuxiliaryBinaries(cfg.RuntimeDir, func(string) bool {
return true
}); err != nil {
return fmt.Errorf("unpack auxiliary binaries: %w", err)
}
logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished unpacking auxiliary binaries")
began = time.Now()
b, err := bootstrap.New(logger, promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gitaly_connections_total",
Help: "Total number of connections to Gitaly",
},
[]string{"type"},
))
if err != nil {
return fmt.Errorf("init bootstrap: %w", err)
}
logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished initializing bootstrap")
began = time.Now()
gitCmdFactory, cleanup, err := gitcmd.NewExecCommandFactory(cfg, logger, gitcmd.WithCgroupsManager(cgroupMgr))
if err != nil {
return fmt.Errorf("creating Git command factory: %w", err)
}
defer cleanup()
logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished initializing command factory")
logger.WithField("binary_path", gitCmdFactory.GetExecutionEnvironment(ctx).BinaryPath).Info("using Git binary")
began = time.Now()
gitVersion, err := gitCmdFactory.GitVersion(ctx)
if err != nil {
return fmt.Errorf("git version detection: %w", err)
}
logger.WithField("duration_ms", time.Since(began).Milliseconds()).Info("finished detecting git version")
if !gitVersion.IsSupported() {
return fmt.Errorf("unsupported Git version: %q", gitVersion)
}
logger.WithField("version", gitVersion.String()).Info("using Git version")
registry := backchannel.NewRegistry()
transactionManager := transaction.NewManager(cfg, logger, registry)
prometheus.MustRegister(transactionManager)
locator := config.NewLocator(cfg)
repoCounter := counter.NewRepositoryCounter(cfg.Storages)
prometheus.MustRegister(repoCounter)
prometheus.MustRegister(gitCmdFactory)
txRegistry := storagemgr.NewTransactionRegistry()
conns := client.NewPool(
client.WithDialer(client.HealthCheckDialer(
func(ctx context.Context, address string, opts []grpc.DialOption) (*grpc.ClientConn, error) {
return client.New(ctx, address, client.WithGrpcOptions(opts))
},
)),
client.WithDialOptions(
client.UnaryInterceptor(),
client.StreamInterceptor(),
),
)
defer func() {
_ = conns.Close()
}()
catfileCache := catfile.NewCache(cfg)
defer catfileCache.Stop()
prometheus.MustRegister(catfileCache)
localrepoFactory := localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache)
diskCache := cache.New(cfg, locator, logger)
prometheus.MustRegister(diskCache)
if err := diskCache.StartWalkers(); err != nil {
return fmt.Errorf("disk cache walkers: %w", err)
}
// List of tracking adaptive limits. They will be calibrated by the adaptive calculator
adaptiveLimits := []limiter.AdaptiveLimiter{}
perRPCLimits, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
for _, concurrency := range cfg.Concurrency {
// Connect adaptive limits to the adaptive calculator
if concurrency.Adaptive {
adaptiveLimits = append(adaptiveLimits, perRPCLimits[concurrency.RPC])
}
}
perRPCLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
setupPerRPCConcurrencyLimiters,
)
prometheus.MustRegister(perRPCLimitHandler)
var packObjectLimit *limiter.AdaptiveLimit
if cfg.PackObjectsLimiting.Adaptive {
packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
Initial: cfg.PackObjectsLimiting.InitialLimit,
Max: cfg.PackObjectsLimiting.MaxLimit,
Min: cfg.PackObjectsLimiting.MinLimit,
BackoffFactor: limiter.DefaultBackoffFactor,
})
adaptiveLimits = append(adaptiveLimits, packObjectLimit)
} else {
packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
Initial: cfg.PackObjectsLimiting.MaxConcurrency,
})
}
packObjectsMonitor := limiter.NewPackObjectsConcurrencyMonitor(
cfg.Prometheus.GRPCLatencyBuckets,
)
packObjectsLimiter := limiter.NewConcurrencyLimiter(
packObjectLimit,
cfg.PackObjectsLimiting.MaxQueueLength,
cfg.PackObjectsLimiting.MaxQueueWait.Duration(),
packObjectsMonitor,
)
prometheus.MustRegister(packObjectsMonitor)
// Enable the adaptive calculator only if there is any limit needed to be adaptive.
if len(adaptiveLimits) > 0 {
adaptiveCalculator := limiter.NewAdaptiveCalculator(
limiter.DefaultCalibrateFrequency,
logger,
adaptiveLimits,
[]limiter.ResourceWatcher{
watchers.NewCgroupCPUWatcher(cgroupMgr, cfg.AdaptiveLimiting.CPUThrottledThreshold),
watchers.NewCgroupMemoryWatcher(cgroupMgr, cfg.AdaptiveLimiting.MemoryThreshold),
},
)
prometheus.MustRegister(adaptiveCalculator)
stop, err := adaptiveCalculator.Start(ctx)
if err != nil {
logger.WithError(err).Warn("error starting adaptive limiter calculator")
}
defer stop()
}
storageMetrics := storagemgr.NewMetrics(cfg.Prometheus)
housekeepingMetrics := housekeeping.NewMetrics(cfg.Prometheus)
raftMetrics := raftmgr.NewMetrics()
partitionMetrics := partition.NewMetrics(housekeepingMetrics)
migrationMetrics := migration.NewMetrics()
reftableMigratorMetrics := reftable.NewMetrics()
prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics, reftableMigratorMetrics)
migrations := []migration.Migration{}
var txMiddleware server.TransactionMiddleware
var node storage.Node
if cfg.Transactions.Enabled {
logger.WarnContext(ctx, "Transactions enabled. Transactions are an experimental feature. The feature is not production ready yet and might lead to various issues including data loss.")
dbMgr, err := databasemgr.NewDBManager(
ctx,
cfg.Storages,
keyvalue.NewBadgerStore,
helper.NewTimerTickerFactory(time.Minute),
logger,
)
if err != nil {
return fmt.Errorf("new db manager: %w", err)
}
defer dbMgr.Close()
var logConsumer storage.LogConsumer
if cfg.Backup.WALGoCloudURL != "" {
walSink, err := backup.ResolveSink(ctx, cfg.Backup.WALGoCloudURL)
if err != nil {
return fmt.Errorf("resolving write-ahead log backup sink: %w", err)
}
walArchiver := backup.NewLogEntryArchiver(logger, walSink, cfg.Backup.WALWorkerCount, &node)
prometheus.MustRegister(walArchiver)
walArchiver.Run()
defer walArchiver.Close()
logConsumer = walArchiver
}
var raftFactory raftmgr.RaftReplicaFactory
var raftNode *raftmgr.Node
if cfg.Raft.Enabled {
raftNode, err = raftmgr.NewNode(cfg, logger, dbMgr, conns)
if err != nil {
return fmt.Errorf("new raft node: %w", err)
}
raftFactory = raftmgr.DefaultFactoryWithNode(cfg.Raft, raftNode)
}
var offloadingSink *offloading.Sink
if cfg.Offloading.Enabled {
if cfg.Offloading.GoCloudURL == "" {
return fmt.Errorf("empty offloading storage URL")
}
var bucket *blob.Bucket
var err error
if bucket, err = blob.OpenBucket(ctx, cfg.Offloading.GoCloudURL); err != nil {
return fmt.Errorf("create offloading bucket: %w", err)
}
defer func() { _ = bucket.Close() }()
if offloadingSink, err = offloading.NewSink(bucket); err != nil {
return fmt.Errorf("create offloading sink: %w", err)
}
}
partitionFactoryOptions := []partition.FactoryOption{
partition.WithCmdFactory(gitCmdFactory),
partition.WithRepoFactory(localrepoFactory),
partition.WithMetrics(partitionMetrics),
partition.WithLogConsumer(logConsumer),
partition.WithRaftConfig(cfg.Raft),
partition.WithRaftFactory(raftFactory),
partition.WithOffloadingSink(offloadingSink),
}
nodeMgr, err := nodeimpl.NewManager(
cfg.Storages,
storagemgr.NewFactory(
logger,
dbMgr,
migration.NewFactory(
partition.NewFactory(partitionFactoryOptions...),
migrationMetrics,
migrations,
),
2,
storageMetrics,
),
)
if err != nil {
return fmt.Errorf("new node manager: %w", err)
}
defer nodeMgr.Close()
if cfg.Raft.Enabled {
for _, storageCfg := range cfg.Storages {
baseStorage, err := nodeMgr.GetStorage(storageCfg.Name)
if err != nil {
return fmt.Errorf("get base storage %q from node manager: %w", storageCfg.Name, err)
}
if err := raftNode.SetBaseStorage(storageCfg.Name, baseStorage); err != nil {
return fmt.Errorf("set base storage for raft node %q: %w", storageCfg.Name, err)
}
}
node = raftNode
// Start partition worker synchronously as it is a pre-requisite for Raft.
if err := storagemgr.AssignmentWorker(ctx, cfg, node, dbMgr, locator); err != nil {
return fmt.Errorf("partition assignment worker: %w", err)
}
} else {
node = nodeMgr
}
reftableMigrator := reftable.NewMigrator(logger, reftableMigratorMetrics, node, localrepoFactory)
reftableMigrator.Run()
defer reftableMigrator.Close()
txMiddleware = server.TransactionMiddleware{
UnaryInterceptors: []grpc.UnaryServerInterceptor{
storagemgr.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator),
reftable.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator),
},
StreamInterceptors: []grpc.StreamServerInterceptor{
storagemgr.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator),
reftable.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator),
},
}
} else {
storagePaths := make([]string, len(cfg.Storages))
for i := range cfg.Storages {
storagePaths[i] = cfg.Storages[i].Path
}
if mayHaveWAL, err := storagemgr.MayHavePendingWAL(storagePaths); err != nil {
return fmt.Errorf("may have pending WAL: %w", err)
} else if mayHaveWAL {
dbMgr, err := databasemgr.NewDBManager(
ctx,
cfg.Storages,
keyvalue.NewBadgerStore,
helper.NewTimerTickerFactory(time.Minute),
logger,
)
if err != nil {
return fmt.Errorf("new db manager: %w", err)
}
defer dbMgr.Close()
partitionFactoryOptions := []partition.FactoryOption{
partition.WithCmdFactory(gitCmdFactory),
partition.WithRepoFactory(localrepoFactory),
partition.WithMetrics(partitionMetrics),
partition.WithRaftConfig(cfg.Raft),
}
nodeMgr, err := nodeimpl.NewManager(
cfg.Storages,
storagemgr.NewFactory(
logger,
dbMgr,
partition.NewFactory(partitionFactoryOptions...),
// In recovery mode we don't want to keep inactive partitions active. The cache
// however can't be disabled so simply set it to one.
1,
storageMetrics,
),
)
if err != nil {
return fmt.Errorf("new node: %w", err)
}
defer nodeMgr.Close()
recoveryMiddleware := storagemgr.NewTransactionRecoveryMiddleware(protoregistry.GitalyProtoPreregistered, nodeMgr)
txMiddleware = server.TransactionMiddleware{
UnaryInterceptors: []grpc.UnaryServerInterceptor{
recoveryMiddleware.UnaryServerInterceptor(),
},
StreamInterceptors: []grpc.StreamServerInterceptor{
recoveryMiddleware.StreamServerInterceptor(),
},
}
}
}
housekeepingManager := housekeepingmgr.New(cfg.Prometheus, logger, transactionManager, node)
prometheus.MustRegister(housekeepingManager)
gitalyServerFactory := server.NewGitalyServerFactory(
cfg,
logger,
registry,
diskCache,
[]*limithandler.LimiterMiddleware{perRPCLimitHandler},
txMiddleware,
)
defer gitalyServerFactory.Stop()
gitlabClient := gitlab.NewStubClient()
if skipHooks, _ := env.GetBool("GITALY_TESTING_NO_GIT_HOOKS", false); skipHooks {
logger.Warn("skipping GitLab API client creation since hooks are bypassed via GITALY_TESTING_NO_GIT_HOOKS")
} else {
httpClient, err := gitlab.NewHTTPClient(logger, cfg.Gitlab, cfg.TLS, cfg.Prometheus)
if err != nil {
return fmt.Errorf("could not create GitLab API client: %w", err)
}
prometheus.MustRegister(httpClient)
gitlabClient = httpClient
}
hookManager := hook.NewManager(
cfg,
locator,
logger,
gitCmdFactory,
transactionManager,
gitlabClient,
hook.NewTransactionRegistry(txRegistry),
hook.NewProcReceiveRegistry(),
node,
)
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, logger, locator, hookManager, gitCmdFactory, catfileCache)
streamCache := streamcache.New(cfg.PackObjectsCache, logger)
var backupSink *backup.Sink
var backupLocator backup.Locator
if cfg.Backup.GoCloudURL != "" {
var err error
backupSink, err = backup.ResolveSink(ctx, cfg.Backup.GoCloudURL, backup.WithBufferSize(cfg.Backup.BufferSize))
if err != nil {
return fmt.Errorf("resolve backup sink: %w", err)
}
backupLocator, err = backup.ResolveLocator(cfg.Backup.Layout, backupSink)
if err != nil {
return fmt.Errorf("resolve backup locator: %w", err)
}
}
var bundleURIManager *bundleuri.GenerationManager
if cfg.BundleURI.GoCloudURL != "" {
bundleURISink, err := bundleuri.NewSink(ctx, cfg.BundleURI.GoCloudURL)
if err != nil {
return fmt.Errorf("create bundle-URI sink: %w", err)
}
// The manager created here merely to have a non-nil manager in order to use
// the SignedURL() method on it. It is not used, yet, to generate bundles
// based on this configuration.
// Further tests and analysis would be required to come up with the
// appropriate configuration. This will be done once we are ready to use this manager
// to generate bundles.
maxBundleAge := time.Hour * 24
interval := time.Minute
maxConcurrent := 5
threshold := 5
bundleGenerationStrategy, err := bundleuri.NewOccurrenceStrategy(logger, threshold, interval, maxConcurrent, maxBundleAge)
if err != nil {
return fmt.Errorf("error creating bundle generation strategy: %w", err)
}
prometheus.MustRegister(bundleGenerationStrategy)
stop := bundleGenerationStrategy.Start(ctx)
defer stop()
bundleURIManager, err = bundleuri.NewGenerationManager(ctx, bundleURISink, logger, node, bundleGenerationStrategy)
if err != nil {
return fmt.Errorf("error creating bundle manager: %w", err)
}
logger.Info(fmt.Sprintf("bundle-uri bucket configured: %s", cfg.BundleURI.GoCloudURL))
prometheus.MustRegister(bundleURIManager)
}
for _, c := range []starter.Config{
{Name: starter.Unix, Addr: cfg.SocketPath, HandoverOnUpgrade: true},
{Name: starter.Unix, Addr: cfg.InternalSocketPath(), HandoverOnUpgrade: false},
{Name: starter.TCP, Addr: cfg.ListenAddr, HandoverOnUpgrade: true},
{Name: starter.TLS, Addr: cfg.TLSListenAddr, HandoverOnUpgrade: true},
} {
if c.Addr == "" {
continue
}
var srv *grpc.Server
if c.HandoverOnUpgrade {
srv, err = gitalyServerFactory.CreateExternal(c.IsSecure())
if err != nil {
return fmt.Errorf("create external gRPC server: %w", err)
}
} else {
srv, err = gitalyServerFactory.CreateInternal()
if err != nil {
return fmt.Errorf("create internal gRPC server: %w", err)
}
}
setup.RegisterAll(srv, &service.Dependencies{
Logger: logger,
Cfg: cfg,
GitalyHookManager: hookManager,
TransactionManager: transactionManager,
StorageLocator: locator,
ClientPool: conns,
GitCmdFactory: gitCmdFactory,
CatfileCache: catfileCache,
DiskCache: diskCache,
PackObjectsCache: streamCache,
PackObjectsLimiter: packObjectsLimiter,
RepositoryCounter: repoCounter,
UpdaterWithHooks: updaterWithHooks,
Node: node,
TransactionRegistry: txRegistry,
HousekeepingManager: housekeepingManager,
BackupSink: backupSink,
BackupLocator: backupLocator,
LocalRepositoryFactory: localrepoFactory,
BundleURIManager: bundleURIManager,
MigrationStateManager: migration.NewStateManager(migrations),
})
b.RegisterStarter(starter.New(c, srv, logger))
}
if addr := cfg.PrometheusListenAddr; addr != "" {
b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error, _ *prometheus.CounterVec) error {
l, err := listen("tcp", addr)
if err != nil {
return err
}
logger.WithField("address", addr).Info("starting prometheus listener")
go func() {
opts := []monitoring.Option{
monitoring.WithListener(l),
monitoring.WithBuildExtraLabels(
map[string]string{"git_version": gitVersion.String()},
),
}
if buildInfo, ok := debug.ReadBuildInfo(); ok {
opts = append(opts, monitoring.WithGoBuildInformation(buildInfo))
}
if err := monitoring.Start(opts...); err != nil {
logger.WithError(err).Error("Unable to serve prometheus")
}
}()
return nil
})
}
for _, shard := range cfg.Storages {
if err := mdfile.WriteMetadataFile(ctx, shard.Path); err != nil {
// TODO should this be a return? https://gitlab.com/gitlab-org/gitaly/issues/1893
logger.WithError(err).Error("Unable to write gitaly metadata file")
}
}
// When cgroups are configured, we create a directory structure each
// time a gitaly process is spawned. Look through the hierarchy root
// to find any cgroup directories that belong to old gitaly processes
// and remove them.
cgroups.StartPruningOldCgroups(cfg.Cgroups, logger)
repoCounter.StartCountingRepositories(ctx, locator, logger)
tempdir.StartCleaning(logger, locator, cfg.Storages, time.Hour)
if err := b.Start(); err != nil {
return fmt.Errorf("unable to start the bootstrap: %w", err)
}
bootstrapSpan.Finish()
// There are a few goroutines running async tasks that may still be in progress (i.e. preloading the license
// database), but this is a close enough indication of startup latency.
logger.WithField("duration_ms", time.Since(beganRun).Milliseconds()).Info("Started Gitaly")
if !cfg.DailyMaintenance.IsDisabled() {
shutdownWorkers, err := maintenance.StartWorkers(
ctx,
logger,
maintenance.DailyOptimizationWorker(cfg, maintenance.OptimizerFunc(func(ctx context.Context, logger log.Logger, repo storage.Repository) error {
return housekeepingManager.OptimizeRepository(ctx, localrepo.New(logger, locator, gitCmdFactory, catfileCache, repo))
})),
)
if err != nil {
return fmt.Errorf("initialize auxiliary workers: %w", err)
}
defer shutdownWorkers()
}
gracefulStopTicker := helper.NewTimerTicker(cfg.GracefulRestartTimeout.Duration())
defer gracefulStopTicker.Stop()
return b.Wait(gracefulStopTicker, gitalyServerFactory.GracefulStop)
}