lib/store/cleanup.go (148 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package store import ( "fmt" "os" "sync" "time" "github.com/uber/kraken/lib/store/base" "github.com/uber/kraken/lib/store/metadata" "github.com/uber/kraken/utils/diskspaceutil" "github.com/uber/kraken/utils/log" "github.com/andres-erbsen/clock" "github.com/uber-go/tally" ) // CleanupConfig defines configuration for periodically cleaning up idle files. type CleanupConfig struct { Disabled bool `yaml:"disabled"` Interval time.Duration `yaml:"interval"` // How often cleanup runs. TTI time.Duration `yaml:"tti"` // Time to idle based on last access time. TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL. AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup. AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered. } type ( // Define a func type for mocking diskSpaceUtil function. diskSpaceUtilFunc func() (int, error) ) func (c CleanupConfig) applyDefaults() CleanupConfig { if c.Interval == 0 { c.Interval = 30 * time.Minute } if c.TTI == 0 { c.TTI = 6 * time.Hour } if c.AggressiveThreshold != 0 { if c.AggressiveTTL == 0 { c.AggressiveTTL = 1 * time.Hour } } return c } type cleanupManager struct { clk clock.Clock stats tally.Scope stopOnce sync.Once stopc chan struct{} } func newCleanupManager(clk clock.Clock, stats tally.Scope) (*cleanupManager, error) { hostname, err := os.Hostname() if err != nil { return nil, fmt.Errorf("look up hostname: %s", err) } stats = stats.Tagged(map[string]string{ "module": "storecleanup", "hostname": hostname, }) return &cleanupManager{ clk: clk, stats: stats, stopc: make(chan struct{}), }, nil } // addJob starts a background cleanup task which removes idle files from op based // on the settings in config. op must set the desired states to clean before addJob // is called. func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp) { config = config.applyDefaults() if config.Disabled { log.Warnf("Cleanup disabled for %s", op) return } if config.TTL == 0 { log.Warnf("TTL disabled for %s", op) } if config.AggressiveThreshold == 0 { log.Warnf("Aggressive cleanup disabled for %s", op) } ticker := m.clk.Ticker(config.Interval) usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage") go func() { for { select { case <-ticker.C: log.Debugf("Performing cleanup of %s", op) ttl := m.checkAggressiveCleanup(op, config, diskspaceutil.DiskSpaceUtil) usage, err := m.scan(op, config.TTI, ttl) if err != nil { log.Errorf("Error scanning %s: %s", op, err) } usageGauge.Update(float64(usage)) case <-m.stopc: ticker.Stop() return } } }() } func (m *cleanupManager) stop() { m.stopOnce.Do(func() { close(m.stopc) }) } // scan scans the op for idle or expired files. Also returns the total disk usage // of op. func (m *cleanupManager) scan( op base.FileOp, tti time.Duration, ttl time.Duration) (usage int64, err error) { names, err := op.ListNames() if err != nil { return 0, fmt.Errorf("list names: %s", err) } for _, name := range names { info, err := op.GetFileStat(name) if err != nil { log.With("name", name).Errorf("Error getting file stat: %s", err) continue } if ready, err := m.readyForDeletion(op, name, info, tti, ttl); err != nil { log.With("name", name).Errorf("Error checking if file expired: %s", err) } else if ready { if err := op.DeleteFile(name); err != nil && err != base.ErrFilePersisted { log.With("name", name).Errorf("Error deleting expired file: %s", err) } } usage += info.Size() } return usage, nil } func (m *cleanupManager) readyForDeletion( op base.FileOp, name string, info os.FileInfo, tti time.Duration, ttl time.Duration) (bool, error) { if ttl > 0 && m.clk.Now().Sub(info.ModTime()) > ttl { return true, nil } var lat metadata.LastAccessTime if err := op.GetFileMetadata(name, &lat); os.IsNotExist(err) { return false, nil } else if err != nil { return false, fmt.Errorf("get file lat: %s", err) } return m.clk.Now().Sub(lat.Time) > tti, nil } func (m *cleanupManager) checkAggressiveCleanup(op base.FileOp, config CleanupConfig, util diskSpaceUtilFunc) time.Duration { if config.AggressiveThreshold != 0 { diskspaceutil, err := util() if err != nil { log.Errorf("Error checking disk space util %s: %s", op, err) return config.TTL } if diskspaceutil >= config.AggressiveThreshold { log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, diskspaceutil) return config.AggressiveTTL } } return config.TTL }