component/file_cache/lru_policy.go (330 lines of code) (raw):
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
Copyright © 2020-2025 Microsoft Corporation. All rights reserved.
Author : <blobfusedev@microsoft.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/
package file_cache
import (
"os"
"strings"
"sync"
"time"
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
)
type lruNode struct {
next *lruNode
prev *lruNode
usage int
deleted bool
name string
}
type lruPolicy struct {
sync.Mutex
cachePolicyConfig
nodeMap sync.Map
head *lruNode
currMarker *lruNode
lastMarker *lruNode
// Channel to close main channel select loop
closeSignal chan int
closeSignalValidate chan int
// Channel to contain files that needs to be deleted immediately
deleteEvent chan string
// Channel to contain files that are in use so push them up in lru list
validateChan chan string
// Channel to check disk usage is within the limits configured or not
diskUsageMonitor <-chan time.Time
// Channel to check for file eviction based on file-cache timeout
cacheTimeoutMonitor <-chan time.Time
// DU utility was found on the path or not
duPresent bool
}
const (
// Check for file expiry in below number of seconds
CacheTimeoutCheckInterval = 5
// Check for disk usage in below number of minutes
DiskUsageCheckInterval = 1
)
var _ cachePolicy = &lruPolicy{}
func NewLRUPolicy(cfg cachePolicyConfig) cachePolicy {
obj := &lruPolicy{
cachePolicyConfig: cfg,
head: nil,
currMarker: &lruNode{
name: "__",
usage: -1,
},
lastMarker: &lruNode{
name: "##",
usage: -1,
},
duPresent: false,
}
return obj
}
func (p *lruPolicy) StartPolicy() error {
log.Trace("lruPolicy::StartPolicy")
p.currMarker.prev = nil
p.currMarker.next = p.lastMarker
p.lastMarker.prev = p.currMarker
p.lastMarker.next = nil
p.head = p.currMarker
p.closeSignal = make(chan int)
p.closeSignalValidate = make(chan int)
p.deleteEvent = make(chan string, 1000)
p.validateChan = make(chan string, 10000)
_, err := common.GetUsage(p.tmpPath)
if err == nil {
p.duPresent = true
} else {
log.Err("lruPolicy::StartPolicy : 'du' command not found, disabling disk usage checks")
}
if p.duPresent {
p.diskUsageMonitor = time.Tick(time.Duration(DiskUsageCheckInterval * time.Minute))
}
// Only start the timeoutMonitor if evictTime is non-zero.
// If evictTime=0, we delete on invalidate so there is no need for a timeout monitor signal to be sent.
log.Info("lruPolicy::StartPolicy : Policy set with %v timeout", p.cacheTimeout)
if p.cacheTimeout != 0 {
p.cacheTimeoutMonitor = time.Tick(time.Duration(time.Duration(p.cacheTimeout) * time.Second))
}
go p.clearCache()
go p.asyncCacheValid()
return nil
}
func (p *lruPolicy) ShutdownPolicy() error {
log.Trace("lruPolicy::ShutdownPolicy")
p.closeSignal <- 1
p.closeSignalValidate <- 1
return nil
}
func (p *lruPolicy) UpdateConfig(c cachePolicyConfig) error {
log.Trace("lruPolicy::UpdateConfig")
p.maxSizeMB = c.maxSizeMB
p.highThreshold = c.highThreshold
p.lowThreshold = c.lowThreshold
p.maxEviction = c.maxEviction
p.policyTrace = c.policyTrace
return nil
}
func (p *lruPolicy) CacheValid(name string) {
_, found := p.nodeMap.Load(name)
if !found {
p.cacheValidate(name)
} else {
p.validateChan <- name
}
}
func (p *lruPolicy) CacheInvalidate(name string) {
log.Trace("lruPolicy::CacheInvalidate : %s", name)
// We check if the file is not in the nodeMap to deal with the case
// where timeout is 0 and there are multiple handles open to the file.
// When the first close comes, we will remove the entry from the map
// and attempt to delete the file. This deletion will fail (and be skipped)
// since there are other open handles. When the last close comes in, the map
// will be clean so we we need to try deleting the file.
_, found := p.nodeMap.Load(name)
if p.cacheTimeout == 0 || !found {
p.CachePurge(name)
}
}
func (p *lruPolicy) CachePurge(name string) {
log.Trace("lruPolicy::CachePurge : %s", name)
p.removeNode(name)
p.deleteEvent <- name
}
func (p *lruPolicy) IsCached(name string) bool {
log.Trace("lruPolicy::IsCached : %s", name)
val, found := p.nodeMap.Load(name)
if found {
node := val.(*lruNode)
log.Debug("lruPolicy::IsCached : %s, deleted:%t", name, node.deleted)
if !node.deleted {
return true
}
}
log.Trace("lruPolicy::IsCached : %s, found %t", name, found)
return false
}
func (p *lruPolicy) Name() string {
return "lru"
}
// On validate name of the file was pushed on this channel so now update the LRU list
func (p *lruPolicy) asyncCacheValid() {
for {
select {
case name := <-p.validateChan:
p.cacheValidate(name)
case <-p.closeSignalValidate:
return
}
}
}
func (p *lruPolicy) cacheValidate(name string) {
var node *lruNode = nil
val, found := p.nodeMap.Load(name)
if !found {
node = &lruNode{
name: name,
next: nil,
prev: nil,
usage: 0,
deleted: false,
}
p.nodeMap.Store(name, node)
} else {
node = val.(*lruNode)
}
p.Lock()
defer p.Unlock()
node.deleted = false
if node == p.head {
return
}
if node.next != nil {
node.next.prev = node.prev
}
if node.prev != nil {
node.prev.next = node.next
}
node.prev = nil
node.next = p.head
p.head.prev = node
p.head = node
node.usage++
}
// For all other timer based activities we check the stuff here
func (p *lruPolicy) clearCache() {
log.Trace("lruPolicy::ClearCache")
for {
select {
case name := <-p.deleteEvent:
log.Trace("lruPolicy::Clear-delete")
// we are asked to delete file explicitly
p.deleteItem(name)
case <-p.cacheTimeoutMonitor:
log.Trace("lruPolicy::Clear-timeout monitor")
// File cache timeout has hit so delete all unused files for past N seconds
p.updateMarker()
p.printNodes()
p.deleteExpiredNodes()
case <-p.diskUsageMonitor:
// File cache timeout has not occurred so just monitor the cache usage
cleanupCount := 0
pUsage := getUsagePercentage(p.tmpPath, p.maxSizeMB)
if pUsage > p.highThreshold {
continueDeletion := true
for continueDeletion {
log.Info("lruPolicy::ClearCache : High threshold reached %f > %f", pUsage, p.highThreshold)
cleanupCount++
p.updateMarker()
p.printNodes()
p.deleteExpiredNodes()
pUsage := getUsagePercentage(p.tmpPath, p.maxSizeMB)
if pUsage < p.lowThreshold || cleanupCount >= 3 {
log.Info("lruPolicy::ClearCache : Threshold stabilized %f > %f", pUsage, p.lowThreshold)
continueDeletion = false
}
}
}
case <-p.closeSignal:
return
}
}
}
func (p *lruPolicy) removeNode(name string) {
log.Trace("lruPolicy::removeNode : %s", name)
var node *lruNode = nil
val, found := p.nodeMap.Load(name)
if !found || val == nil {
return
}
p.nodeMap.Delete(name)
p.Lock()
defer p.Unlock()
node = val.(*lruNode)
node.deleted = true
if node == p.head {
p.head = node.next
p.head.prev = nil
node.next = nil
return
}
if node.next != nil {
node.next.prev = node.prev
}
if node.prev != nil {
node.prev.next = node.next
}
node.prev = nil
node.next = nil
}
func (p *lruPolicy) updateMarker() {
log.Trace("lruPolicy::updateMarker")
p.Lock()
node := p.lastMarker
if node.next != nil {
node.next.prev = node.prev
}
if node.prev != nil {
node.prev.next = node.next
}
node.prev = nil
node.next = p.head
p.head.prev = node
p.head = node
p.lastMarker = p.currMarker
p.currMarker = node
p.Unlock()
}
func (p *lruPolicy) deleteExpiredNodes() {
log.Debug("lruPolicy::deleteExpiredNodes : Starts")
if p.lastMarker.next == nil {
return
}
delItems := make([]*lruNode, 0)
count := uint32(0)
p.Lock()
node := p.lastMarker.next
p.lastMarker.next = nil
if node != nil {
node.prev = nil
}
for ; node != nil && count < p.maxEviction; node = node.next {
delItems = append(delItems, node)
node.deleted = true
count++
}
if count >= p.maxEviction {
log.Debug("lruPolicy::DeleteExpiredNodes : Max deletion count hit")
}
p.lastMarker.next = node
if node != nil {
node.prev = p.lastMarker
}
p.Unlock()
log.Debug("lruPolicy::deleteExpiredNodes : List generated %d items", count)
for _, item := range delItems {
if item.deleted {
p.removeNode(item.name)
p.deleteItem(item.name)
}
}
log.Debug("lruPolicy::deleteExpiredNodes : Ends")
}
func (p *lruPolicy) deleteItem(name string) {
log.Trace("lruPolicy::deleteItem : Deleting %s", name)
azPath := strings.TrimPrefix(name, p.tmpPath)
if azPath == "" {
log.Err("lruPolicy::DeleteItem : Empty file name formed name : %s, tmpPath : %s", name, p.tmpPath)
return
}
if azPath[0] == '/' {
azPath = azPath[1:]
}
flock := p.fileLocks.Get(azPath)
if p.fileLocks.Locked(azPath) {
log.Warn("lruPolicy::DeleteItem : File in under download %s", azPath)
p.CacheValid(name)
return
}
flock.Lock()
defer flock.Unlock()
// Check if there are any open handles to this file or not
if flock.Count() > 0 {
log.Warn("lruPolicy::DeleteItem : File in use %s", name)
p.CacheValid(name)
return
}
// There are no open handles for this file so its safe to remove this
err := deleteFile(name)
if err != nil && !os.IsNotExist(err) {
log.Err("lruPolicy::DeleteItem : failed to delete local file %s [%s]", name, err.Error())
}
// File was deleted so try clearing its parent directory
// TODO: Delete directories up the path recursively that are "safe to delete". Ensure there is no race between this code and code that creates directories (like OpenFile)
// This might require something like hierarchical locking.
}
func (p *lruPolicy) printNodes() {
if !p.policyTrace {
return
}
node := p.head
var count int = 0
log.Debug("lruPolicy::printNodes : Starts")
for ; node != nil; node = node.next {
log.Debug(" ==> (%d) %s", count, node.name)
count++
}
log.Debug("lruPolicy::printNodes : Ends")
}