lib/store/ca_store.go (455 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"bytes"
"container/list"
"errors"
"fmt"
"hash"
"io"
"os"
"path"
"sync"
"time"
"github.com/andres-erbsen/clock"
"github.com/docker/distribution/uuid"
"github.com/spaolacci/murmur3"
"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/hrw"
"github.com/uber/kraken/lib/store/base"
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/utils/cache"
"github.com/uber/kraken/utils/closers"
"github.com/uber/kraken/utils/log"
)
const _drainDuration = 100 * time.Millisecond
var drainDurationBuckets = append(
tally.DurationBuckets{0},
tally.MustMakeExponentialDurationBuckets(100*time.Millisecond, 2, 12)...,
)
// drainItem represents an item in the drain queue for async disk writing.
type drainItem struct {
entry *cache.MemoryEntry
retries int
errs []error
}
type drain struct {
queue *list.List
mu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
histogram tally.Histogram
}
// CAStore allows uploading / caching content-addressable files.
type CAStore struct {
config CAStoreConfig
stats tally.Scope
clk clock.Clock
*uploadStore
*cacheStore
cleanup *cleanupManager
memCache *cache.BlobMemoryCache
drain *drain
ttlStopChan chan struct{}
ttlWg sync.WaitGroup
}
// NewCAStore creates a new CAStore.
func NewCAStore(config CAStoreConfig, stats tally.Scope) (*CAStore, error) {
return newCAStore(config, stats, clock.New())
}
// newCAStore creates a new CAStore with clock injected
func newCAStore(config CAStoreConfig, stats tally.Scope, clk clock.Clock) (*CAStore, error) {
config = config.applyDefaults()
stats = stats.Tagged(map[string]string{
"module": "castore",
})
uploadStore, err := newUploadStore(config.UploadDir, config.ReadPartSize, config.WritePartSize)
if err != nil {
return nil, fmt.Errorf("new upload store: %s", err)
}
cacheBackend := base.NewCASFileStoreWithLRUMap(config.Capacity, clk)
cacheStore, err := newCacheStore(config.CacheDir, cacheBackend, config.ReadPartSize)
if err != nil {
return nil, fmt.Errorf("new cache store: %s", err)
}
if err := initCASVolumes(config.CacheDir, config.Volumes); err != nil {
return nil, fmt.Errorf("init cas volumes: %s", err)
}
cleanup := newCleanupManager(clk, stats)
cleanup.addJob("upload", config.UploadCleanup, uploadStore.newFileOp())
cleanup.addJob("cache", config.CacheCleanup, cacheStore.newFileOp())
cas := &CAStore{
config: config,
stats: stats,
clk: clk,
uploadStore: uploadStore,
cacheStore: cacheStore,
cleanup: cleanup,
}
if config.MemoryCache.Enabled {
memCache := createMemoryCache(&config, stats)
cas.memCache = memCache
initMemCacheCleanupJob(cas)
startDrainWorkers(cas)
}
return cas, nil
}
func createMemoryCache(config *CAStoreConfig, stats tally.Scope) *cache.BlobMemoryCache {
return cache.NewBlobMemoryCache(cache.BlobMemoryCacheConfig{
MaxSize: config.MemoryCache.MaxSize,
}, stats)
}
func initMemCacheCleanupJob(cas *CAStore) {
cas.ttlStopChan = make(chan struct{})
cas.ttlWg.Add(1)
go cas.memoryCacheCleanupWorker()
}
func startDrainWorkers(cas *CAStore) {
cas.drain = &drain{
histogram: cas.stats.Histogram("drain_duration", drainDurationBuckets),
stopChan: make(chan struct{}),
queue: list.New(),
}
for range cas.config.MemoryCache.DrainWorkers {
cas.drain.wg.Add(1)
go cas.drainWorker()
}
}
// Close terminates any goroutines started by s.
func (s *CAStore) Close() {
if s.drain != nil && s.drain.stopChan != nil {
close(s.drain.stopChan)
s.drain.wg.Wait()
}
if s.ttlStopChan != nil {
close(s.ttlStopChan)
s.ttlWg.Wait()
}
s.cleanup.stop()
}
// MoveUploadFileToCache commits uploadName as cacheName. Clients are expected
// to validate the content of the upload file matches the cacheName digest.
func (s *CAStore) MoveUploadFileToCache(uploadName, cacheName string) error {
uploadPath, err := s.uploadStore.newFileOp().GetFilePath(uploadName)
if err != nil {
return err
}
defer s.deferDeleteUploadFile(uploadName)()
f, err := s.uploadStore.newFileOp().GetFileReader(uploadName, s.uploadStore.readPartSize)
if err != nil {
return fmt.Errorf("get file reader %s: %s", uploadName, err)
}
defer closers.Close(f)
if err := s.verify(f, cacheName); err != nil {
return fmt.Errorf("verify digest: %s", err)
}
return s.cacheStore.newFileOp().MoveFileFrom(cacheName, s.cacheStore.state, uploadPath)
}
// CreateCacheFile initializes a cache file for name from r. name should be a raw
// hex sha256 digest, and the contents of r must hash to name.
func (s *CAStore) CreateCacheFile(name string, r io.Reader) error {
return s.WriteCacheFile(name, func(w FileReadWriter) error {
_, err := io.Copy(w, r)
return err
})
}
// WriteCacheFile initializes a cache file for name by passing a temporary
// upload file writer to the write function.
func (s *CAStore) WriteCacheFile(name string, write func(w FileReadWriter) error) error {
return s.writeCacheFile(name, write, false, 0)
}
// this function writes cache file with an option to write metadata alongside
func (s *CAStore) writeCacheFile(name string, write func(w FileReadWriter) error, addMetadata bool, pieceLength int64) error {
tmp := fmt.Sprintf("%s.%s", name, uuid.Generate().String())
if err := s.CreateUploadFile(tmp, 0); err != nil {
return fmt.Errorf("create upload file: %s", err)
}
defer s.deferDeleteUploadFile(tmp)()
w, err := s.GetUploadFileReadWriter(tmp)
if err != nil {
return fmt.Errorf("get upload writer: %s", err)
}
defer closers.Close(w)
if err := write(w); err != nil {
return err
}
if err := s.MoveUploadFileToCache(tmp, name); err != nil && !os.IsExist(err) {
return fmt.Errorf("move upload file to cache: %s", err)
}
if addMetadata {
return s.generateMetadataFromFile(name, pieceLength)
}
return nil
}
// WriteBlobToCacheWithMetaInfo writes a blob and its metadata to disk,
// potentially going through a write-through memory cache, if memory is available.
func (s *CAStore) WriteBlobToCacheWithMetaInfo(
name string,
size uint64,
write func(w FileReadWriter) error,
pieceLength int64) error {
if s.config.MemoryCache.Enabled && s.memCache.TryReserve(size) {
log.With("name", name, "size", size).Debug("successfully reserved cache")
err := s.addToMemoryCache(name, write, size, pieceLength)
if err == nil {
return nil
}
log.With("blob", name).Errorf("error while trying to add the blob to memory cache: %w", err)
s.memCache.ReleaseReservation(size)
}
addMetadata := true
return s.writeCacheFile(name, write, addMetadata, pieceLength)
}
// CheckInMemCache returns true if the blob is present in memcache
// Used in tests
func (s *CAStore) CheckInMemCache(name string) bool {
return s.memCache.Get(name) != nil
}
func (s *CAStore) addToMemoryCache(
name string,
write func(w FileReadWriter) error,
size uint64,
pieceLength int64,
) error {
tmpWriter := base.NewBufferReadWriter(size)
if err := write(tmpWriter); err != nil {
return err
}
data := tmpWriter.Bytes()
metaInfo, err := s.generateMetadataFromBytes(name, data, pieceLength)
if err != nil {
return fmt.Errorf("generating metainfo: %w", err)
}
entry := &cache.MemoryEntry{
Name: name,
Data: data,
MetaInfo: metaInfo,
CreatedAt: s.clk.Now(),
}
if added := s.memCache.Add(entry); !added {
// Multiple goroutines are trying to add the same blob (which should never happen).
return fmt.Errorf("entry already in in-memory cache")
}
log.With("name", name, "size", entry.Size(), "cap", cap(data)).Debug("successfully added to cache")
s.addItemForDiskSync(&drainItem{
entry: entry,
retries: 0,
})
return nil
}
func (s *CAStore) generateMetadataFromBytes(name string, data []byte, pieceLength int64) (*core.MetaInfo, error) {
digest, err := core.NewSHA256DigestFromHex(name)
if err != nil {
return nil, fmt.Errorf("new digest from hex: %s", err)
}
metaInfo, err := core.NewMetaInfo(digest, bytes.NewReader(data), pieceLength)
if err != nil {
return nil, fmt.Errorf("generate metainfo: %w", err)
}
return metaInfo, nil
}
func (s *CAStore) generateMetadataFromFile(name string, pieceLength int64) error {
d, err := core.NewSHA256DigestFromHex(name)
if err != nil {
return fmt.Errorf("get digest from file: %w", err)
}
f, err := s.GetCacheFileReader(name)
if err != nil {
return fmt.Errorf("get cache file: %w", err)
}
mi, err := core.NewMetaInfo(d, f, pieceLength)
if err != nil {
return fmt.Errorf("create metainfo: %w", err)
}
if _, err := s.SetCacheFileMetadata(d.Hex(), metadata.NewTorrentMeta(mi)); err != nil {
return fmt.Errorf("set metainfo: %w", err)
}
return nil
}
func (s *CAStore) addItemForDiskSync(item *drainItem) {
s.drain.mu.Lock()
defer s.drain.mu.Unlock()
s.drain.queue.PushBack(item)
}
// verify verifies that name is a valid SHA256 digest, and checks if the given
// blob content matches the digset unless explicitly skipped.
func (s *CAStore) verify(r io.Reader, name string) error {
// Verify that expected name is a valid SHA256 digest.
expected, err := core.NewSHA256DigestFromHex(name)
if err != nil {
return fmt.Errorf("new digest from file name: %s", err)
}
if !s.config.SkipHashVerification {
digester := core.NewDigester()
computed, err := digester.FromReader(r)
if err != nil {
return fmt.Errorf("calculate digest: %s", err)
}
if computed != expected {
return fmt.Errorf("computed digest %s doesn't match expected value %s", computed, expected)
}
}
return nil
}
func (s *CAStore) memoryCacheCleanupWorker() {
defer s.ttlWg.Done()
ticker := s.clk.Ticker(s.config.MemoryCache.TTLInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.cleanupMemoryCacheExpiredEntries()
case <-s.ttlStopChan:
return
}
}
}
func (s *CAStore) cleanupMemoryCacheExpiredEntries() {
expiredNames := s.memCache.GetExpiredEntries(s.clk.Now(), s.config.MemoryCache.TTL)
if len(expiredNames) > 0 {
s.memCache.RemoveBatch(expiredNames)
}
}
func (s *CAStore) drainWorker() {
defer s.drain.wg.Done()
ticker := s.clk.Ticker(_drainDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.drainNext()
case <-s.drain.stopChan:
return
}
}
}
func (s *CAStore) dequeueNextDrainItem() *drainItem {
s.drain.mu.Lock()
defer s.drain.mu.Unlock()
if s.drain.queue.Len() == 0 {
return nil
}
elem := s.drain.queue.Front()
s.drain.queue.Remove(elem)
item, ok := elem.Value.(*drainItem)
if !ok {
// Shouldn't happen, but good to check
return nil
}
return item
}
func (s *CAStore) drainNext() {
item := s.dequeueNextDrainItem()
if item == nil {
return
}
err := s.writeDrainItemToDisk(item.entry)
if err != nil {
if item.retries < s.config.MemoryCache.DrainMaxRetries {
s.addItemForDiskSync(&drainItem{
entry: item.entry,
retries: item.retries + 1,
errs: append(item.errs, err),
})
return
}
s.memCache.Remove(item.entry.Name)
s.stats.Counter("drain_error").Inc(1)
log.With("name", item.entry.Name, "drain_errors", errors.Join(append(item.errs, err)...)).
Errorf("Failed to drain blob from mem cache to disk after %v retries", s.config.MemoryCache.DrainMaxRetries)
return
}
s.drain.histogram.RecordDuration(s.clk.Now().Sub(item.entry.CreatedAt))
s.memCache.Remove(item.entry.Name)
}
func (s *CAStore) writeDrainItemToDisk(entry *cache.MemoryEntry) error {
if err := s.WriteCacheFile(entry.Name, func(w FileReadWriter) error {
_, err := w.Write(entry.Data)
return err
}); err != nil {
return fmt.Errorf("write blob: %s", err)
}
tm := metadata.NewTorrentMeta(entry.MetaInfo)
if _, err := s.SetCacheFileMetadata(entry.Name, tm); err != nil {
return fmt.Errorf("write metadata: %s", err)
}
return nil
}
// GetCacheFileReader overrides cacheStore.GetCacheFileReader to check
// memory cache first before reading from disk.
func (s *CAStore) GetCacheFileReader(name string) (FileReader, error) {
if s.memCache != nil {
if entry := s.memCache.Get(name); entry != nil {
return NewBufferFileReader(entry.Data), nil
}
}
return s.cacheStore.GetCacheFileReader(name)
}
// GetCacheFileMetadata overrides cacheStore.GetCacheFileMetadata to serve
// TorrentMeta from memory cache when available.
func (s *CAStore) GetCacheFileMetadata(name string, md metadata.Metadata) error {
if s.memCache != nil && md.GetSuffix() == metadata.GetTorrentMetadataSuffix() {
if entry := s.memCache.Get(name); entry != nil {
if entry.MetaInfo == nil {
// shouldn't happen, but good to check
return fmt.Errorf("entry %s doesn't have any metainfo", entry.Name)
}
// Serialize and deserialize for consistency with disk behavior
b, err := entry.MetaInfo.Serialize()
if err != nil {
return fmt.Errorf("serialize metainfo: %s", err)
}
return md.Deserialize(b)
}
}
// Fallback to disk
return s.cacheStore.GetCacheFileMetadata(name, md)
}
// GetCacheFileStat overrides cacheStore.GetCacheFileStat to check memory cache first.
func (s *CAStore) GetCacheFileStat(name string) (os.FileInfo, error) {
if s.memCache != nil {
if entry := s.memCache.Get(name); entry != nil {
return &memoryFileInfo{
name: name,
size: int64(entry.Size()),
modTime: entry.CreatedAt,
}, nil
}
}
return s.cacheStore.GetCacheFileStat(name)
}
// ListCacheFiles overrides cacheStore.ListCacheFiles to include memory cache entries.
func (s *CAStore) ListCacheFiles() ([]string, error) {
diskFiles, err := s.cacheStore.ListCacheFiles()
if err != nil {
return nil, err
}
if s.memCache == nil {
return diskFiles, nil
}
memNames := s.memCache.ListNames()
allFiles := make(map[string]bool)
for _, name := range diskFiles {
allFiles[name] = true
}
for _, name := range memNames {
allFiles[name] = true
}
result := make([]string, 0, len(allFiles))
for name := range allFiles {
result = append(result, name)
}
return result, nil
}
var _ os.FileInfo = &memoryFileInfo{}
// memoryFileInfo implements os.FileInfo for memory cache entries.
type memoryFileInfo struct {
name string
size int64
modTime time.Time
}
func (f *memoryFileInfo) Name() string { return f.name }
func (f *memoryFileInfo) Size() int64 { return f.size }
func (f *memoryFileInfo) Mode() os.FileMode { return os.ModePerm }
func (f *memoryFileInfo) ModTime() time.Time { return f.modTime }
func (f *memoryFileInfo) IsDir() bool { return false }
func (f *memoryFileInfo) Sys() interface{} { return nil }
func initCASVolumes(dir string, volumes []Volume) error {
if len(volumes) == 0 {
return nil
}
rendezvousHash := hrw.NewRendezvousHash(
func() hash.Hash { return murmur3.New64() },
hrw.UInt64ToFloat64)
for _, v := range volumes {
if _, err := os.Stat(v.Location); err != nil {
return fmt.Errorf("verify volume: %s", err)
}
rendezvousHash.AddNode(v.Location, v.Weight)
}
// Create 256 symlinks under dir.
for subdirIndex := 0; subdirIndex < 256; subdirIndex++ {
subdirName := fmt.Sprintf("%02X", subdirIndex)
nodes := rendezvousHash.GetOrderedNodes(subdirName, 1)
if len(nodes) != 1 {
return fmt.Errorf("calculate volume for subdir: %s", subdirName)
}
sourcePath := path.Join(nodes[0].Label, path.Base(dir), subdirName)
if err := os.MkdirAll(sourcePath, 0775); err != nil {
return fmt.Errorf("volume source path: %s", err)
}
targetPath := path.Join(dir, subdirName)
if err := createOrUpdateSymlink(sourcePath, targetPath); err != nil {
return fmt.Errorf("symlink to volume: %s", err)
}
}
return nil
}