lib/store/cleanup.go (257 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package store import ( "fmt" "os" "slices" "sync" "time" "github.com/uber/kraken/lib/store/base" "github.com/uber/kraken/lib/store/metadata" "github.com/uber/kraken/utils/diskspaceutil" "github.com/uber/kraken/utils/log" "github.com/andres-erbsen/clock" "github.com/uber-go/tally" ) // CleanupConfig defines configuration for periodically cleaning up idle files. type CleanupConfig struct { Disabled bool `yaml:"disabled"` Interval time.Duration `yaml:"interval"` // How often cleanup runs. TTI time.Duration `yaml:"tti"` // Time to idle based on last access time. TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL. AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup. AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered. AggressiveLowerThreshold int `yaml:"aggressive_lower_threshold"` // The lower disk util threshold in percent, below which aggressive cleanup will stop. If 0, no lower threshold. } type ( // for mocking diskUsageFn func() (diskspaceutil.UsageInfo, error) ) func (c CleanupConfig) applyDefaults() CleanupConfig { if c.Interval == 0 { c.Interval = 30 * time.Minute } if c.TTI == 0 { c.TTI = 6 * time.Hour } if c.AggressiveThreshold != 0 { if c.AggressiveTTL == 0 { c.AggressiveTTL = 1 * time.Hour } } return c } type cleanupManager struct { clk clock.Clock stats tally.Scope stopOnce sync.Once stopc chan struct{} } func newCleanupManager(clk clock.Clock, stats tally.Scope) *cleanupManager { stats = stats.Tagged(map[string]string{ "module": "storecleanup", }) return &cleanupManager{ clk: clk, stats: stats, stopc: make(chan struct{}), } } // addJob starts a background cleanup task which removes idle files from op based // on the settings in config. op must set the desired states to clean before addJob // is called. func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp) { config = config.applyDefaults() if config.Disabled { log.Warnf("Cleanup disabled for %s", op) return } if config.TTL == 0 { log.Warnf("TTL disabled for %s", op) } if config.AggressiveThreshold == 0 { log.Warnf("Aggressive cleanup disabled for %s", op) } ticker := m.clk.Ticker(config.Interval) usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage") go func() { for { select { case <-ticker.C: log.Debugf("Performing cleanup of %s", op) usage, err := m.cleanup(op, config, cachedInAgentPolicy) if err != nil { log.Errorf("Error scanning %s: %s", op, err) } usageGauge.Update(float64(usage)) case <-m.stopc: ticker.Stop() return } } }() } func (m *cleanupManager) stop() { m.stopOnce.Do(func() { close(m.stopc) }) } type fInfo struct { name string accessTime time.Time downloadTime time.Time size int64 } // cachedInAgentPolicy is a custom cleanup policy that prioritizes discarding blobs already distributed to agents. // cachedInAgentPolicy is passed to [slices.SortFunc] func cachedInAgentPolicy(left, right fInfo) int { // For context, the order when downloading a file is: // 1. Upon downloading the last byte of a file, its download time is set. // 2. The application logic sets the access time. // 3. no-op - The file is moved from the upload dir to the upload cache, but this changes neither the access nor download times. // 4. [optional] File is downloaded by a consumer (proxy or agent) and the access time is updated. if isDownloadedByConsumer(left) && !isDownloadedByConsumer(right) { return -1 } if !isDownloadedByConsumer(left) && isDownloadedByConsumer(right) { return 1 } // At this point, both files must be downloaded by a consumer (proxy/agent). // The ifs below check if one file is for sure cached by an agent, while the other isn't. if forSureInAgent(left) && !forSureInAgent(right) { return -1 } if !forSureInAgent(left) && forSureInAgent(right) { return 1 } // If none of the heuristics above work out, we default to a basic LRU cache, keeping the most recently accessed files. return int(left.accessTime.Sub(right.accessTime)) } // A file must be downloaded by a consumer if the diff is > 1s, // as to trigger a download an 202 HTTP is made to the origin, // after which the consumer backs off for at least 1s before trying to download again. // During the 1s+ backoff, the access time and download time are almost the same. func isDownloadedByConsumer(f fInfo) bool { return accessDownloadDiff(f) > 1*time.Second } func accessDownloadDiff(f fInfo) time.Duration { return f.downloadTime.Sub(f.accessTime).Abs() } // If a file is accessed long after it has been downloaded, // it must have gotten prefetched by proxy earlier and now an agent consumed it. func forSureInAgent(f fInfo) bool { return accessDownloadDiff(f) > 45*time.Minute } // cleanup cleans op from idle or expired files and returns its size BEFORE cleanup. // It works in one of two possible modes: // 1. tti + ttl based cleanup - the default. // 2. aggressive cleanup - triggered on high disk usage. By default, it is ttl- and threshold-based. // However, it can also be custom policy- and threshold-based, when a `customPolicy` and a `config.AggressiveLowerThreshold` are provided. // Then the cache is cleaned until the lower threshold is reached, prioritizing blobs for deletion based on the `customPolicy`, which is a fn passed to [slices.SortFunc]. func (m *cleanupManager) cleanup(op base.FileOp, config CleanupConfig, customPolicy func(a, b fInfo) int) (usage int64, err error) { shouldAggro := m.shouldAggro(op, config, diskspaceutil.Usage) customPolicyBasedCleanup := shouldAggro && customPolicy != nil && config.AggressiveLowerThreshold != 0 if customPolicyBasedCleanup { return m.customPolicyBasedCleanup(op, config, customPolicy, diskspaceutil.Usage) } ttl := config.TTL lowerThreshold := 0 if shouldAggro { ttl = config.AggressiveTTL lowerThreshold = config.AggressiveLowerThreshold } return m.ttlBasedCleanup(op, config.TTI, ttl, lowerThreshold, diskspaceutil.Usage) } func (m *cleanupManager) customPolicyBasedCleanup(op base.FileOp, config CleanupConfig, customPolicy func(a, b fInfo) int, diskUsageFn diskUsageFn) (usage int64, err error) { names, err := op.ListNames() if err != nil { return 0, fmt.Errorf("list names: %s", err) } var fInfos []fInfo var totalUsage int64 for _, name := range names { fStat, err := op.GetFileStat(name) if err != nil { log.With("name", name).Errorf("Error getting file stat: %s", err) continue } fInfo := fInfo{ name: name, downloadTime: fStat.ModTime(), size: fStat.Size(), } totalUsage += fStat.Size() var accessTime metadata.LastAccessTime err = op.GetFileMetadata(name, &accessTime) if os.IsNotExist(err) { continue } if err != nil { log.With("name", name).Errorf("Error getting file metadata: %s", err) continue } fInfo.accessTime = accessTime.Time fInfos = append(fInfos, fInfo) } slices.SortFunc(fInfos, customPolicy) dInfo, err := diskUsageFn() if err != nil { return 0, fmt.Errorf("get disk usage info %s: %s", op, err) } minBytes := dInfo.TotalBytes * uint64(config.AggressiveLowerThreshold) / 100 remainDeleteBytes := int64(dInfo.TotalBytes) - int64(minBytes) for _, file := range fInfos { if remainDeleteBytes <= 0 { break } err := op.DeleteFile(file.name) if err != nil && err != base.ErrFilePersisted { log.With("name", file.name).Errorf("Error deleting expired file: %s", err) } if err == nil { remainDeleteBytes -= file.size } } return totalUsage, nil } func (m *cleanupManager) ttlBasedCleanup( op base.FileOp, tti time.Duration, ttl time.Duration, aggroUtilLowerThreshold int, diskUsageFn diskUsageFn) (scannedBytes int64, err error) { var lowThresholdBytes uint64 = 0 respectLowThreshold := false var dInfo diskspaceutil.UsageInfo if aggroUtilLowerThreshold != 0 { dInfo, err = diskUsageFn() if err != nil { log.Errorf("Error getting disk usage info %s: %s", op, err) } else { respectLowThreshold = true lowThresholdBytes = (dInfo.TotalBytes * uint64(aggroUtilLowerThreshold)) / 100 } } names, err := op.ListNames() if err != nil { return 0, fmt.Errorf("list names: %s", err) } for _, name := range names { info, err := op.GetFileStat(name) if err != nil { log.With("name", name).Errorf("Error getting file stat: %s", err) continue } ready, err := m.readyForDeletion(op, name, info, tti, ttl) if err != nil { log.With("name", name).Errorf("Error checking if file expired: %s", err) } lowThresholdBreached := respectLowThreshold && ((dInfo.UsedBytes - uint64(scannedBytes)) <= lowThresholdBytes) if ready && !lowThresholdBreached { if err := op.DeleteFile(name); err != nil && err != base.ErrFilePersisted { log.With("name", name).Errorf("Error deleting expired file: %s", err) } } scannedBytes += info.Size() } return scannedBytes, nil } func (m *cleanupManager) readyForDeletion( op base.FileOp, name string, info os.FileInfo, tti time.Duration, ttl time.Duration) (bool, error) { if ttl > 0 && m.clk.Now().Sub(info.ModTime()) > ttl { return true, nil } var lat metadata.LastAccessTime if err := op.GetFileMetadata(name, &lat); os.IsNotExist(err) { return false, nil } else if err != nil { return false, fmt.Errorf("get file lat: %s", err) } return m.clk.Now().Sub(lat.Time) > tti, nil } func (m *cleanupManager) shouldAggro(op base.FileOp, config CleanupConfig, diskUsageFn diskUsageFn) bool { if config.AggressiveThreshold == 0 { return false } diskUsage, err := diskUsageFn() if err != nil { log.Errorf("Error getting disk usage info %s: %s", op, err) return false } if diskUsage.Util >= config.AggressiveThreshold { log.Warnf("Aggressive cleanup of %s triggers with disk space util %d", op, diskUsage.Util) m.stats.Counter("aggro_gc_runs").Inc(1) return true } return false }