internal/file_cleaner/file_cleaner.go (250 lines of code) (raw):

package file_cleaner //nolint:staticcheck import ( "context" "errors" "fmt" "io/fs" "log/slog" "os" "path/filepath" "regexp" "strconv" "time" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/disk_stats" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/indexing_lock" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/node_uuid" ) const ( tmpFilesGlob = "*.tmp" repoDeleteDir = "repos_to_delete" ) var ( IDRegex = regexp.MustCompile(`^\d+`) ) type FileCleaner struct { IndexDir string Ticker *time.Ticker IndexingLock *indexing_lock.IndexingLock removeFunc func(name string) error } func NewFileCleaner(indexDir string, indexingLock *indexing_lock.IndexingLock) *FileCleaner { return &FileCleaner{ IndexDir: indexDir, IndexingLock: indexingLock, removeFunc: os.Remove, } } func (c *FileCleaner) StartCleanInterval(ctx context.Context, interval time.Duration) error { slog.Info("starting file cleaner", "interval", interval.Seconds()) ticker := time.NewTicker(interval) for { select { case <-ctx.Done(): ticker.Stop() return ctx.Err() case <-ticker.C: slog.Debug("start Clean()") if err := c.Clean(ctx); err != nil { slog.Error("error while cleaning", "err", err) } slog.Debug("done Clean()") } } } func (c *FileCleaner) Init() error { path := filepath.Join(c.IndexDir, repoDeleteDir) if _, err := os.Stat(path); os.IsNotExist(err) { if err := os.Mkdir(path, os.ModeDir|0755); err != nil { return err } } return nil } func (c *FileCleaner) Clean(ctx context.Context) error { var errs []error if err := c.cleanTmpFiles(ctx); err != nil { errs = append(errs, err) } if err := c.removeDeletedRepos(ctx); err != nil { errs = append(errs, err) } return errors.Join(errs...) } func (c *FileCleaner) removeDeletedRepos(ctx context.Context) error { path := filepath.Join(c.IndexDir, repoDeleteDir) err := c.ensurePathExists(path) if err != nil { return err } files, err := disk_stats.GetIndexFiles(path, "*") if err != nil { return err } repoIDs := make([]uint32, 0, len(files)) for _, file := range files { fileName := filepath.Base(file) repoIDStr := IDRegex.FindString(fileName) if repoIDStr == "" { return fmt.Errorf("failed to parse repo ID from %s", repoIDStr) } rID, err := strconv.ParseUint(repoIDStr, 10, 32) if err != nil { return err } repoIDs = append(repoIDs, uint32(rID)) } if err := c.removeShards(ctx, repoIDs...); err != nil { return err } for _, repoID := range repoIDs { filePath := filepath.Join(path, fmt.Sprintf("%d.delete", repoID)) if err := c.removeFunc(filePath); err != nil { return err } } return nil } func (c *FileCleaner) Truncate() error { return c.removeFilesWithGlob("*") } func (c *FileCleaner) TruncateExceptUUID() error { slog.Info("truncating the node except UUID") files, err := disk_stats.GetIndexFiles(c.IndexDir, "*") if err != nil { return err } var errs []error for _, file := range files { if filepath.Base(file) == node_uuid.FileName { continue } if err := c.removeFunc(file); err != nil { errs = append(errs, err) } } if len(errs) != 0 { return errors.Join(errs...) } return nil } func (c *FileCleaner) MarkRepoForDeletion(repoID uint32) error { path := filepath.Join(c.IndexDir, repoDeleteDir) err := c.ensurePathExists(path) if err != nil { return err } if err := os.WriteFile(filepath.Join(path, fmt.Sprintf("%d.delete", repoID)), []byte{}, 0600); err != nil { return err } return nil } func (c *FileCleaner) RemoveShardsFor(repoID uint32) error { err := c.removeFilesWithGlob( fmt.Sprintf("%d_*.zoekt", repoID), fmt.Sprintf("%d_*.zoekt.meta", repoID), ) return err } func (c *FileCleaner) RemoveNodeUUID() error { filePath := filepath.Join(c.IndexDir, node_uuid.FileName) return os.Remove(filePath) } func (c *FileCleaner) removeShards(ctx context.Context, repoIDs ...uint32) error { removeFunc := func(path string) bool { fileName := filepath.Base(path) repoIDStr := IDRegex.FindString(fileName) if repoIDStr == "" { return false } rID, err := strconv.ParseUint(repoIDStr, 10, 32) if err != nil { return false } if fmt.Sprintf("%d.delete", rID) == fileName { return false } for _, repoID := range repoIDs { if repoID == uint32(rID) { return true } } return false } return c.removeFiles(ctx, removeFunc) } func (c *FileCleaner) removeFiles(ctx context.Context, toRemoveFunc func(string) bool) error { walkFunc := func(path string, d fs.DirEntry, err error) error { select { case <-ctx.Done(): return ctx.Err() default: if toRemoveFunc(path) { if err := c.removeFunc(path); err != nil { return err } } return nil } } if err := filepath.WalkDir(c.IndexDir, walkFunc); err != nil { return err } return nil } func (c *FileCleaner) removeFilesWithGlob(globs ...string) error { files, err := disk_stats.GetIndexFiles(c.IndexDir, globs...) if err != nil { return err } var errs []error for _, file := range files { if err := c.removeFunc(file); err != nil { errs = append(errs, err) } } return errors.Join(errs...) } func (c *FileCleaner) cleanTmpFiles(ctx context.Context) error { files, err := c.getTmpFiles() if err != nil { return err } var errs []error for _, file := range files { select { case <-ctx.Done(): return ctx.Err() default: err = c.tryToDeleteTmpFile(file) if err != nil { errs = append(errs, err) } } } return errors.Join(errs...) } func (c *FileCleaner) getTmpFiles() ([]string, error) { path := filepath.Join(c.IndexDir, tmpFilesGlob) return filepath.Glob(path) } func (c *FileCleaner) tryToDeleteTmpFile(file string) error { fileName := filepath.Base(file) repoIDStr := IDRegex.FindString(fileName) if repoIDStr == "" { slog.Warn("failed to find repoID for fileName", "fileName", fileName) return nil } rID, err := strconv.ParseUint(repoIDStr, 10, 32) if err != nil { return err } repoID := uint32(rID) if !c.IndexingLock.TryLock(repoID) { // repo locked, skip the file return nil } defer c.IndexingLock.Unlock(repoID) // TODO do not return an error if file no longer exists (i.e. was deleted concurrently by something) return c.removeFunc(file) } func (c *FileCleaner) ensurePathExists(path string) error { return os.MkdirAll(path, os.ModeDir|0755) }