internal/git/housekeeping/manager/optimize_repository.go (397 lines of code) (raw):

package manager import ( "bytes" "context" "errors" "fmt" "os" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc/codes" ) // OptimizeRepositoryConfig is the configuration used by OptimizeRepository that is computed by // applying all the OptimizeRepositoryOption modifiers. type OptimizeRepositoryConfig struct { StrategyConstructor OptimizationStrategyConstructor } // OptimizeRepositoryOption is an option that can be passed to OptimizeRepository. type OptimizeRepositoryOption func(cfg *OptimizeRepositoryConfig) // OptimizationStrategyConstructor is a constructor for an OptimizationStrategy that is being // informed by the passed-in RepositoryInfo. type OptimizationStrategyConstructor func(stats.RepositoryInfo) housekeeping.OptimizationStrategy // WithOptimizationStrategyConstructor changes the constructor for the optimization strategy.that is // used to determine which parts of the repository will be optimized. By default the // HeuristicalOptimizationStrategy is used. func WithOptimizationStrategyConstructor(strategyConstructor OptimizationStrategyConstructor) OptimizeRepositoryOption { return func(cfg *OptimizeRepositoryConfig) { cfg.StrategyConstructor = strategyConstructor } } // OptimizeRepository performs optimizations on the repository. Whether optimizations are performed // or not depends on a set of heuristics. func (m *RepositoryManager) OptimizeRepository( ctx context.Context, repo *localrepo.Repo, opts ...OptimizeRepositoryOption, ) error { var cfg OptimizeRepositoryConfig for _, opt := range opts { opt(&cfg) } span, ctx := tracing.StartSpanIfHasParent(ctx, "housekeeping.OptimizeRepository", nil) defer span.Finish() if err := m.maybeStartTransaction(ctx, repo, func(ctx context.Context, tx storage.Transaction, repo *localrepo.Repo) error { originalRepo := &gitalypb.Repository{ StorageName: repo.GetStorageName(), RelativePath: repo.GetRelativePath(), } if tx != nil { originalRepo = tx.OriginalRepository(originalRepo) } // tryRunningHousekeeping acquires a lock on the repository to prevent other concurrent housekeeping calls on the repository. // As we may be in a transaction, the repository's relative path may have been rewritten. We use the original unrewritten relative // path here to ensure we hit the same key regardless if we run in different transactions where the snapshot prefixes in the // relative paths may differ. ok, cleanup := m.repositoryStates.tryRunningHousekeeping(originalRepo) // If we didn't succeed to set the state to "running" because of a concurrent housekeeping run // we exit early. if !ok { return nil } defer cleanup() if m.optimizeFunc != nil { strategy, err := m.validate(ctx, repo, cfg) if err != nil { return err } return m.optimizeFunc(ctx, repo, strategy) } if tx != nil { return m.optimizeRepositoryWithTransaction(ctx, repo, cfg) } return m.optimizeRepository(ctx, repo, cfg) }); err != nil { return err } return nil } func (m *RepositoryManager) maybeStartTransaction(ctx context.Context, repo *localrepo.Repo, run func(context.Context, storage.Transaction, *localrepo.Repo) error) error { if m.node == nil { return run(ctx, nil, repo) } return m.runInTransaction(ctx, true, repo, run) } func (m *RepositoryManager) runInTransaction(ctx context.Context, readOnly bool, repo *localrepo.Repo, run func(context.Context, storage.Transaction, *localrepo.Repo) error) (returnedErr error) { originalRepo := &gitalypb.Repository{ StorageName: repo.GetStorageName(), RelativePath: repo.GetRelativePath(), } if tx := storage.ExtractTransaction(ctx); tx != nil { originalRepo = tx.OriginalRepository(originalRepo) } storageHandle, err := m.node.GetStorage(repo.GetStorageName()) if err != nil { return fmt.Errorf("get storage: %w", err) } tx, err := storageHandle.Begin(ctx, storage.TransactionOptions{ ReadOnly: readOnly, RelativePath: originalRepo.GetRelativePath(), }) if err != nil { return fmt.Errorf("begin: %w", err) } defer func() { if returnedErr != nil { if err := tx.Rollback(ctx); err != nil { returnedErr = errors.Join(returnedErr, fmt.Errorf("rollback: %w", err)) } } }() if err := run( storage.ContextWithTransaction(ctx, tx), tx, localrepo.NewFrom(repo, tx.RewriteRepository(originalRepo)), ); err != nil { return fmt.Errorf("run: %w", err) } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit: %w", err) } return nil } func (m *RepositoryManager) validate( ctx context.Context, repo *localrepo.Repo, cfg OptimizeRepositoryConfig, ) (housekeeping.OptimizationStrategy, error) { repositoryInfo, err := stats.RepositoryInfoForRepository(ctx, repo) if err != nil { return nil, fmt.Errorf("deriving repository info: %w", err) } repositoryInfo.Log(ctx, m.logger) m.metrics.ReportRepositoryInfo(repositoryInfo) var strategy housekeeping.OptimizationStrategy if cfg.StrategyConstructor == nil { strategy = housekeeping.NewHeuristicalOptimizationStrategy(repositoryInfo) } else { strategy = cfg.StrategyConstructor(repositoryInfo) } return strategy, nil } func (m *RepositoryManager) optimizeRepository( ctx context.Context, repo *localrepo.Repo, cfg OptimizeRepositoryConfig, ) error { strategy, err := m.validate(ctx, repo, cfg) if err != nil { return err } finishTotalTimer := m.metrics.ReportTaskLatency("total", "apply") totalStatus := "failure" optimizations := make(map[string]string) defer func() { finishTotalTimer() m.logger.WithField("optimizations", optimizations).Info("optimized repository") for task, status := range optimizations { m.metrics.TasksTotal.WithLabelValues(task, status).Inc() } m.metrics.TasksTotal.WithLabelValues("total", totalStatus).Add(1) }() finishTimer := m.metrics.ReportTaskLatency("clean-stale-data", "apply") if err := m.CleanStaleData(ctx, repo, housekeeping.DefaultStaleDataCleanup()); err != nil { return fmt.Errorf("could not execute housekeeping: %w", err) } finishTimer() finishTimer = m.metrics.ReportTaskLatency("clean-worktrees", "apply") if err := housekeeping.CleanupWorktrees(ctx, repo); err != nil { return fmt.Errorf("could not clean up worktrees: %w", err) } finishTimer() finishTimer = m.metrics.ReportTaskLatency("repack", "apply") didRepack, repackCfg, err := repackIfNeeded(ctx, repo, strategy) if err != nil { optimizations["packed_objects_"+string(repackCfg.Strategy)] = "failure" if repackCfg.WriteBitmap { optimizations["written_bitmap"] = "failure" } if repackCfg.WriteMultiPackIndex { optimizations["written_multi_pack_index"] = "failure" } return fmt.Errorf("could not repack: %w", err) } if didRepack { optimizations["packed_objects_"+string(repackCfg.Strategy)] = "success" if repackCfg.WriteBitmap { optimizations["written_bitmap"] = "success" } if repackCfg.WriteMultiPackIndex { optimizations["written_multi_pack_index"] = "success" } } finishTimer() finishTimer = m.metrics.ReportTaskLatency("prune", "apply") didPrune, err := pruneIfNeeded(ctx, repo, strategy) if err != nil { optimizations["pruned_objects"] = "failure" return fmt.Errorf("could not prune: %w", err) } else if didPrune { optimizations["pruned_objects"] = "success" } finishTimer() finishTimer = m.metrics.ReportTaskLatency("pack-refs", "apply") didPackRefs, err := m.packRefsIfNeeded(ctx, repo, strategy) if err != nil { optimizations["packed_refs"] = "failure" return fmt.Errorf("could not pack refs: %w", err) } else if didPackRefs { optimizations["packed_refs"] = "success" } finishTimer() finishTimer = m.metrics.ReportTaskLatency("commit-graph", "apply") if didWriteCommitGraph, writeCommitGraphCfg, err := writeCommitGraphIfNeeded(ctx, repo, strategy); err != nil { optimizations["written_commit_graph_full"] = "failure" optimizations["written_commit_graph_incremental"] = "failure" return fmt.Errorf("could not write commit-graph: %w", err) } else if didWriteCommitGraph { if writeCommitGraphCfg.ReplaceChain { optimizations["written_commit_graph_full"] = "success" } else { optimizations["written_commit_graph_incremental"] = "success" } } finishTimer() totalStatus = "success" return nil } // optimizeRepositoryWithTransaction performs optimizations in the context of WAL transaction. // // Reference repacking and object repacking are run in two different transactions. This decreases the chance of conflicts as it // allow reference repacking to commit faster. Reference repacking conflicts with reference deletions but runs relatively fast. // Object repacking is slower but is conflict-free if no pruning is done. // // Note that the strategy is selected in a parent transaction. The repository's state may change in the meanwhile but this shouldn't // really change things too much. RepositoryManager itself prevents concurrent housekeeping. Even if there was a housekeeping operation // committed in between, we'd just do redundant repacks. func (m *RepositoryManager) optimizeRepositoryWithTransaction( ctx context.Context, repo *localrepo.Repo, cfg OptimizeRepositoryConfig, ) error { strategy, err := m.validate(ctx, repo, cfg) if err != nil { return err } repackNeeded, repackCfg := strategy.ShouldRepackObjects(ctx) packRefsNeeded := strategy.ShouldRepackReferences(ctx) writeCommitGraphNeeded, writeCommitGraphCfg, err := strategy.ShouldWriteCommitGraph(ctx) if err != nil { return fmt.Errorf("checking commit graph writing eligibility: %w", err) } var errPackReferences error if packRefsNeeded { if err := m.runInTransaction(ctx, false, repo, func(ctx context.Context, tx storage.Transaction, repo *localrepo.Repo) error { tx.PackRefs() return nil }); err != nil { errPackReferences = fmt.Errorf("run reference packing: %w", err) } } var errRepackObjects error if repackNeeded || writeCommitGraphNeeded { if err := m.runInTransaction(ctx, false, repo, func(ctx context.Context, tx storage.Transaction, repo *localrepo.Repo) error { if repackNeeded { tx.Repack(repackCfg) } if writeCommitGraphNeeded { tx.WriteCommitGraphs(writeCommitGraphCfg) } return nil }); err != nil { errRepackObjects = fmt.Errorf("run object repacking: %w", err) } } getStatus := func(err error) string { if err != nil { return "failure" } return "success" } repackObjectsStatus := getStatus(errRepackObjects) optimizations := make(map[string]string) if repackNeeded { optimizations["packed_objects_"+string(repackCfg.Strategy)] = repackObjectsStatus if repackCfg.WriteBitmap { optimizations["written_bitmap"] = repackObjectsStatus } if repackCfg.WriteMultiPackIndex { optimizations["written_multi_pack_index"] = repackObjectsStatus } } if packRefsNeeded { optimizations["packed_refs"] = getStatus(errPackReferences) } if writeCommitGraphNeeded { if writeCommitGraphCfg.ReplaceChain { optimizations["written_commit_graph_full"] = repackObjectsStatus } else { optimizations["written_commit_graph_incremental"] = repackObjectsStatus } } m.logger.WithField("optimizations", optimizations).Info("optimized repository with WAL") for task, status := range optimizations { m.metrics.TasksTotal.WithLabelValues(task, status).Inc() } errCombined := errors.Join(errPackReferences, errRepackObjects) m.metrics.TasksTotal.WithLabelValues("total", getStatus(errCombined)).Add(1) return errCombined } // repackIfNeeded repacks the repository according to the strategy. func repackIfNeeded(ctx context.Context, repo *localrepo.Repo, strategy housekeeping.OptimizationStrategy) (bool, config.RepackObjectsConfig, error) { repackNeeded, cfg := strategy.ShouldRepackObjects(ctx) if !repackNeeded { return false, config.RepackObjectsConfig{}, nil } if err := housekeeping.RepackObjects(ctx, repo, cfg); err != nil { return false, cfg, err } return true, cfg, nil } // writeCommitGraphIfNeeded writes the commit-graph if required. func writeCommitGraphIfNeeded(ctx context.Context, repo *localrepo.Repo, strategy housekeeping.OptimizationStrategy) (bool, config.WriteCommitGraphConfig, error) { needed, cfg, err := strategy.ShouldWriteCommitGraph(ctx) if !needed || err != nil { return false, config.WriteCommitGraphConfig{}, err } if err := housekeeping.WriteCommitGraph(ctx, repo, cfg); err != nil { return true, cfg, fmt.Errorf("writing commit-graph: %w", err) } return true, cfg, nil } // pruneIfNeeded removes objects from the repository which are either unreachable or which are // already part of a packfile. We use a grace period of two weeks. func pruneIfNeeded(ctx context.Context, repo *localrepo.Repo, strategy housekeeping.OptimizationStrategy) (bool, error) { needed, cfg := strategy.ShouldPruneObjects(ctx) if !needed { return false, nil } if err := housekeeping.PruneObjects(ctx, repo, cfg); err != nil { return true, fmt.Errorf("pruning objects: %w", err) } return true, nil } func (m *RepositoryManager) packRefsIfNeeded(ctx context.Context, repo *localrepo.Repo, strategy housekeeping.OptimizationStrategy) (bool, error) { if !strategy.ShouldRepackReferences(ctx) { return false, nil } // If there are any inhibitors, we don't run git-pack-refs(1). ok, cleanup := m.repositoryStates.tryRunningPackRefs(repo) if !ok { return false, nil } defer cleanup() var stderr bytes.Buffer if err := repo.ExecAndWait(ctx, gitcmd.Command{ Name: "pack-refs", Flags: []gitcmd.Option{ gitcmd.Flag{Name: "--all"}, }, }, gitcmd.WithStderr(&stderr)); err != nil { return false, fmt.Errorf("packing refs: %w, stderr: %q", err, stderr.String()) } return true, nil } // CleanStaleData removes any stale data in the repository as per the provided configuration. func (m *RepositoryManager) CleanStaleData(ctx context.Context, repo *localrepo.Repo, cfg housekeeping.CleanStaleDataConfig) error { span, ctx := tracing.StartSpanIfHasParent(ctx, "housekeeping.CleanStaleData", nil) defer span.Finish() repoPath, err := repo.Path(ctx) if err != nil { m.logger.WithError(err).WarnContext(ctx, "housekeeping failed to get repo path") if structerr.GRPCCode(err) == codes.NotFound { return nil } return fmt.Errorf("housekeeping failed to get repo path: %w", err) } staleDataByType := map[string]int{} defer func() { if len(staleDataByType) == 0 { return } logEntry := m.logger for staleDataType, count := range staleDataByType { logEntry = logEntry.WithField(fmt.Sprintf("stale_data.%s", staleDataType), count) m.metrics.PrunedFilesTotal.WithLabelValues(staleDataType).Add(float64(count)) } logEntry.InfoContext(ctx, "removed files") }() var filesToPrune []string for staleFileType, staleFileFinder := range cfg.StaleFileFinders { staleFiles, err := staleFileFinder(ctx, repoPath) if err != nil { return fmt.Errorf("housekeeping failed to find %s: %w", staleFileType, err) } filesToPrune = append(filesToPrune, staleFiles...) staleDataByType[staleFileType] = len(staleFiles) } for _, path := range filesToPrune { if err := os.Remove(path); err != nil { if os.IsNotExist(err) { continue } staleDataByType["failures"]++ m.logger.WithError(err).WithField("path", path).WarnContext(ctx, "unable to remove stale file") } } for repoCleanupName, repoCleanupFn := range cfg.RepoCleanups { cleanupCount, err := repoCleanupFn(ctx, repo) staleDataByType[repoCleanupName] = cleanupCount if err != nil { return fmt.Errorf("housekeeping could not perform cleanup %s: %w", repoCleanupName, err) } } for repoCleanupName, repoCleanupFn := range cfg.RepoCleanupWithTxManagers { cleanupCount, err := repoCleanupFn(ctx, repo, m.txManager) staleDataByType[repoCleanupName] = cleanupCount if err != nil { return fmt.Errorf("housekeeping could not perform cleanup (with TxManager) %s: %w", repoCleanupName, err) } } return nil }