lib/store/base/file_map.go (235 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package base
import (
"container/list"
"os"
"sync"
"time"
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/utils/log"
"github.com/andres-erbsen/clock"
)
// FileMap is a thread-safe name -> FileEntry map.
type FileMap interface {
Contains(name string) bool
TryStore(name string, entry FileEntry, f func(string, FileEntry) bool) bool
LoadForWrite(name string, f func(string, FileEntry)) bool
LoadForRead(name string, f func(string, FileEntry)) bool
LoadForPeek(name string, f func(string, FileEntry)) bool
Delete(name string, f func(string, FileEntry) bool) bool
}
var _ FileMap = (*lruFileMap)(nil)
type fileEntryWithAccessTime struct {
sync.RWMutex
fe FileEntry
// The last time that LoadForWrite/LoadForRead is called on the entry.
lastAccessTime time.Time
}
// lruFileMap implements FileMap interface, with an optional max capacity, and
// will evict least recently accessed entry when the capacity is reached, which
// will only be updated by LoadForRead and LoadForWrite.
type lruFileMap struct {
sync.Mutex
// Capacity limit of the LRU map. Set capacity to 0 to disable eviction.
size int
clk clock.Clock
// Min timespan between two updates of LAT for the same file.
timeResolution time.Duration
queue *list.List
elements map[string]*list.Element
}
// NewLRUFileMap creates a new LRU map given capacity.
func NewLRUFileMap(size int, clk clock.Clock) FileMap {
m := &lruFileMap{
size: size,
clk: clk,
timeResolution: time.Minute * 5,
queue: list.New(),
elements: make(map[string]*list.Element),
}
return m
}
// NewLATFileMap creates a new file map that tracks last access time, but no
// auto-eviction.
func NewLATFileMap(clk clock.Clock) FileMap {
m := &lruFileMap{
size: 0, // Disable eviction.
clk: clk,
timeResolution: time.Minute * 5,
queue: list.New(),
elements: make(map[string]*list.Element),
}
return m
}
func (fm *lruFileMap) get(name string) (*fileEntryWithAccessTime, bool) {
if element, ok := fm.elements[name]; ok {
fm.queue.MoveToFront(element)
return element.Value.(*fileEntryWithAccessTime), ok
}
return nil, false
}
func (fm *lruFileMap) syncGet(name string) (*fileEntryWithAccessTime, bool) {
fm.Lock()
defer fm.Unlock()
return fm.get(name)
}
func (fm *lruFileMap) syncGetAndTouch(name string) (*fileEntryWithAccessTime, bool) {
fm.Lock()
defer fm.Unlock()
e, ok := fm.get(name)
if !ok {
return nil, false
}
// Update last access time.
t := fm.clk.Now()
if t.Sub(e.lastAccessTime) >= fm.timeResolution {
// Only update if new timestamp is <timeResolution> newer than previous
// value.
e.lastAccessTime = t
e.fe.SetMetadata(metadata.NewLastAccessTime(t))
}
return e, true
}
func (fm *lruFileMap) add(name string, e *fileEntryWithAccessTime) bool {
if _, ok := fm.elements[name]; !ok {
element := fm.queue.PushFront(e)
fm.elements[name] = element
return true
}
return false
}
func (fm *lruFileMap) getOldest() (*fileEntryWithAccessTime, bool) {
if e := fm.queue.Back(); e != nil {
return e.Value.(*fileEntryWithAccessTime), true
}
return nil, false
}
func (fm *lruFileMap) remove(name string) (*fileEntryWithAccessTime, bool) {
if e, ok := fm.elements[name]; ok {
delete(fm.elements, name)
fm.queue.Remove(e)
return e.Value.(*fileEntryWithAccessTime), ok
}
return nil, false
}
func (fm *lruFileMap) syncRemove(name string) (*fileEntryWithAccessTime, bool) {
fm.Lock()
defer fm.Unlock()
return fm.remove(name)
}
func (fm *lruFileMap) syncRemoveOldestIfNeeded() (e *fileEntryWithAccessTime, ok bool) {
// Verify if size limit was defined and exceeded.
fm.Lock()
if fm.size <= 0 || fm.queue.Len() <= fm.size {
defer fm.Unlock()
return nil, false
}
e, ok = fm.getOldest()
if !ok {
defer fm.Unlock()
return nil, false
}
fm.Unlock()
e.Lock()
defer e.Unlock()
// Now that we have the entry lock, make sure k was not deleted or
// overwritten.
name := e.fe.GetName()
if ne, ok := fm.syncGet(name); !ok {
return nil, false
} else if ne != e {
return nil, false
}
if err := e.fe.Delete(); err != nil {
log.With("name", e.fe.GetName()).Errorf("Error deleting evicted entry: %s", err)
}
// Remove from map while the entry lock is still being held.
fm.syncRemove(name)
return e, true
}
// Contains returns true if the given key is stored in the map.
func (fm *lruFileMap) Contains(name string) bool {
fm.Lock()
defer fm.Unlock()
_, ok := fm.elements[name]
return ok
}
// TryStore tries to stores the given key / value pair into the map.
// If object is successfully stored, execute f under the protection of Lock.
// Returns false if the name is already present.
func (fm *lruFileMap) TryStore(name string, entry FileEntry, f func(string, FileEntry) bool) bool {
// Lock on entry first, in case the lock is taken by other goroutine before f().
e := &fileEntryWithAccessTime{
fe: entry,
}
// After store, make sure size limit wasn't exceeded.
// Also make sure this happens after e.RUnlock(), in case the new entry is to be deleted.
defer fm.syncRemoveOldestIfNeeded()
e.Lock()
defer e.Unlock()
fm.Lock()
// Verify if it's already in the map.
if _, ok := fm.get(name); ok {
defer fm.Unlock()
// Update last access time.
t := fm.clk.Now()
if t.Sub(e.lastAccessTime) >= fm.timeResolution {
// Only update if new timestamp is <timeResolution> newer than
// previous value.
e.lastAccessTime = t
e.fe.SetMetadata(metadata.NewLastAccessTime(t))
}
return false
}
// Add new entry to map.
fm.add(name, e)
lat := metadata.NewLastAccessTime(fm.clk.Now())
if err := e.fe.GetMetadata(lat); err != nil {
// Set LAT if it doesn't exist on disk or cannot be read.
if !os.IsNotExist(err) {
log.With("name", e.fe.GetName()).Errorf("Error reading LAT: %s", err)
}
if _, err := e.fe.SetMetadata(lat); err != nil {
log.With("name", e.fe.GetName()).Errorf("Error setting LAT: %s", err)
}
}
e.lastAccessTime = lat.Time
fm.Unlock()
if !f(name, e.fe) {
// Remove from map while the entry lock is still being held.
fm.syncRemove(name)
return false
}
return true
}
// LoadForWrite looks up the value of key k and executes f under the protection
// of RLock.
// While f executes, it is guaranteed that k will not be deleted from the map.
// Returns false if k was not found.
// It updates last access time and file size.
func (fm *lruFileMap) LoadForWrite(name string, f func(string, FileEntry)) bool {
e, ok := fm.syncGet(name)
if !ok {
return false
}
e.Lock()
defer e.Unlock()
// Now that we have the entry lock, make sure k was not deleted or
// overwritten.
if ne, ok := fm.syncGetAndTouch(name); !ok {
return false
} else if ne != e {
return false
}
f(name, e.fe)
return true
}
// LoadForRead looks up the value of key k and executes f under the protection
// of RLock.
// While f executes, it is guaranteed that k will not be deleted from the map.
// Returns false if k was not found.
// It updates last access time.
func (fm *lruFileMap) LoadForRead(name string, f func(string, FileEntry)) bool {
e, ok := fm.syncGet(name)
if !ok {
return false
}
e.RLock()
defer e.RUnlock()
// Now that we have the entry lock, make sure k was not deleted or
// overwritten.
if ne, ok := fm.syncGetAndTouch(name); !ok {
return false
} else if ne != e {
return false
}
f(name, e.fe)
return true
}
// LoadForPeek looks up the value of key k and executes f under the protection
// of RLock.
// While f executes, it is guaranteed that k will not be deleted from the map.
// Returns false if k was not found.
func (fm *lruFileMap) LoadForPeek(name string, f func(string, FileEntry)) bool {
e, ok := fm.syncGet(name)
if !ok {
return false
}
e.RLock()
defer e.RUnlock()
// Now that we have the entry lock, make sure k was not deleted or
// overwritten.
if ne, ok := fm.syncGet(name); !ok {
return false
} else if ne != e {
return false
}
f(name, e.fe)
return true
}
// Delete deletes the given key from the Map.
// It also executes f under the protection of Lock.
// If f returns false, abort before key deletion.
func (fm *lruFileMap) Delete(name string, f func(string, FileEntry) bool) bool {
e, ok := fm.syncGet(name)
if !ok {
return false
}
e.Lock()
defer e.Unlock()
// Now that we have the entry lock, make sure k was not deleted or
// overwritten.
if ne, ok := fm.syncGet(name); !ok {
return false
} else if ne != e {
return false
}
if !f(name, e.fe) {
return false
}
// Remove from map while the entry lock is still being held.
fm.syncRemove(name)
return true
}