internal/file_cleaner/file_cleaner.go (250 lines of code) (raw):
package file_cleaner //nolint:staticcheck
import (
"context"
"errors"
"fmt"
"io/fs"
"log/slog"
"os"
"path/filepath"
"regexp"
"strconv"
"time"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/disk_stats"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/indexing_lock"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/node_uuid"
)
const (
tmpFilesGlob = "*.tmp"
repoDeleteDir = "repos_to_delete"
)
var (
IDRegex = regexp.MustCompile(`^\d+`)
)
type FileCleaner struct {
IndexDir string
Ticker *time.Ticker
IndexingLock *indexing_lock.IndexingLock
removeFunc func(name string) error
}
func NewFileCleaner(indexDir string, indexingLock *indexing_lock.IndexingLock) *FileCleaner {
return &FileCleaner{
IndexDir: indexDir,
IndexingLock: indexingLock,
removeFunc: os.Remove,
}
}
func (c *FileCleaner) StartCleanInterval(ctx context.Context, interval time.Duration) error {
slog.Info("starting file cleaner", "interval", interval.Seconds())
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return ctx.Err()
case <-ticker.C:
slog.Debug("start Clean()")
if err := c.Clean(ctx); err != nil {
slog.Error("error while cleaning", "err", err)
}
slog.Debug("done Clean()")
}
}
}
func (c *FileCleaner) Init() error {
path := filepath.Join(c.IndexDir, repoDeleteDir)
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.Mkdir(path, os.ModeDir|0755); err != nil {
return err
}
}
return nil
}
func (c *FileCleaner) Clean(ctx context.Context) error {
var errs []error
if err := c.cleanTmpFiles(ctx); err != nil {
errs = append(errs, err)
}
if err := c.removeDeletedRepos(ctx); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
}
func (c *FileCleaner) removeDeletedRepos(ctx context.Context) error {
path := filepath.Join(c.IndexDir, repoDeleteDir)
err := c.ensurePathExists(path)
if err != nil {
return err
}
files, err := disk_stats.GetIndexFiles(path, "*")
if err != nil {
return err
}
repoIDs := make([]uint32, 0, len(files))
for _, file := range files {
fileName := filepath.Base(file)
repoIDStr := IDRegex.FindString(fileName)
if repoIDStr == "" {
return fmt.Errorf("failed to parse repo ID from %s", repoIDStr)
}
rID, err := strconv.ParseUint(repoIDStr, 10, 32)
if err != nil {
return err
}
repoIDs = append(repoIDs, uint32(rID))
}
if err := c.removeShards(ctx, repoIDs...); err != nil {
return err
}
for _, repoID := range repoIDs {
filePath := filepath.Join(path, fmt.Sprintf("%d.delete", repoID))
if err := c.removeFunc(filePath); err != nil {
return err
}
}
return nil
}
func (c *FileCleaner) Truncate() error {
return c.removeFilesWithGlob("*")
}
func (c *FileCleaner) TruncateExceptUUID() error {
slog.Info("truncating the node except UUID")
files, err := disk_stats.GetIndexFiles(c.IndexDir, "*")
if err != nil {
return err
}
var errs []error
for _, file := range files {
if filepath.Base(file) == node_uuid.FileName {
continue
}
if err := c.removeFunc(file); err != nil {
errs = append(errs, err)
}
}
if len(errs) != 0 {
return errors.Join(errs...)
}
return nil
}
func (c *FileCleaner) MarkRepoForDeletion(repoID uint32) error {
path := filepath.Join(c.IndexDir, repoDeleteDir)
err := c.ensurePathExists(path)
if err != nil {
return err
}
if err := os.WriteFile(filepath.Join(path, fmt.Sprintf("%d.delete", repoID)), []byte{}, 0600); err != nil {
return err
}
return nil
}
func (c *FileCleaner) RemoveShardsFor(repoID uint32) error {
err := c.removeFilesWithGlob(
fmt.Sprintf("%d_*.zoekt", repoID),
fmt.Sprintf("%d_*.zoekt.meta", repoID),
)
return err
}
func (c *FileCleaner) RemoveNodeUUID() error {
filePath := filepath.Join(c.IndexDir, node_uuid.FileName)
return os.Remove(filePath)
}
func (c *FileCleaner) removeShards(ctx context.Context, repoIDs ...uint32) error {
removeFunc := func(path string) bool {
fileName := filepath.Base(path)
repoIDStr := IDRegex.FindString(fileName)
if repoIDStr == "" {
return false
}
rID, err := strconv.ParseUint(repoIDStr, 10, 32)
if err != nil {
return false
}
if fmt.Sprintf("%d.delete", rID) == fileName {
return false
}
for _, repoID := range repoIDs {
if repoID == uint32(rID) {
return true
}
}
return false
}
return c.removeFiles(ctx, removeFunc)
}
func (c *FileCleaner) removeFiles(ctx context.Context, toRemoveFunc func(string) bool) error {
walkFunc := func(path string, d fs.DirEntry, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
if toRemoveFunc(path) {
if err := c.removeFunc(path); err != nil {
return err
}
}
return nil
}
}
if err := filepath.WalkDir(c.IndexDir, walkFunc); err != nil {
return err
}
return nil
}
func (c *FileCleaner) removeFilesWithGlob(globs ...string) error {
files, err := disk_stats.GetIndexFiles(c.IndexDir, globs...)
if err != nil {
return err
}
var errs []error
for _, file := range files {
if err := c.removeFunc(file); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
func (c *FileCleaner) cleanTmpFiles(ctx context.Context) error {
files, err := c.getTmpFiles()
if err != nil {
return err
}
var errs []error
for _, file := range files {
select {
case <-ctx.Done():
return ctx.Err()
default:
err = c.tryToDeleteTmpFile(file)
if err != nil {
errs = append(errs, err)
}
}
}
return errors.Join(errs...)
}
func (c *FileCleaner) getTmpFiles() ([]string, error) {
path := filepath.Join(c.IndexDir, tmpFilesGlob)
return filepath.Glob(path)
}
func (c *FileCleaner) tryToDeleteTmpFile(file string) error {
fileName := filepath.Base(file)
repoIDStr := IDRegex.FindString(fileName)
if repoIDStr == "" {
slog.Warn("failed to find repoID for fileName", "fileName", fileName)
return nil
}
rID, err := strconv.ParseUint(repoIDStr, 10, 32)
if err != nil {
return err
}
repoID := uint32(rID)
if !c.IndexingLock.TryLock(repoID) {
// repo locked, skip the file
return nil
}
defer c.IndexingLock.Unlock(repoID)
// TODO do not return an error if file no longer exists (i.e. was deleted concurrently by something)
return c.removeFunc(file)
}
func (c *FileCleaner) ensurePathExists(path string) error {
return os.MkdirAll(path, os.ModeDir|0755)
}