internal/contentcache/contentcache.go (199 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package contentcache stores GCS object contents locally.
// Note: The content cache is not concurrent safe and callers should ensure thread safety
package contentcache
import (
"encoding/json"
"fmt"
"io"
"io/fs"
"os"
"path"
"regexp"
"sync"
"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/jacobsa/timeutil"
)
const CacheFilePrefix = "gcsfusecache"
// CacheObjectKey uniquely identifies GCS objects by bucket name and object name
type CacheObjectKey struct {
BucketName string
ObjectName string
}
// ContentCache is a directory on local disk to store the object content
// ContentCache is thread-safe
// fileMap is an in memory map to represent cache contents on disk
type ContentCache struct {
mu sync.Mutex
tempDir string
fileMap map[CacheObjectKey]*CacheObject
mtimeClock timeutil.Clock
}
// Metadata store struct
type CacheFileObjectMetadata struct {
CacheFileNameOnDisk string
BucketName string
ObjectName string
Generation int64
MetaGeneration int64
}
// CacheObject is a wrapper struct for a cache file and its associated metadata
type CacheObject struct {
MetadataFileName string
CacheFileObjectMetadata *CacheFileObjectMetadata
CacheFile gcsx.TempFile
}
// ValidateGeneration compares fresh gcs object generation and metageneration numbers against cached objects
func (c *CacheObject) ValidateGeneration(generation int64, metaGeneration int64) bool {
if c.CacheFileObjectMetadata == nil {
return false
}
return c.CacheFileObjectMetadata.Generation == generation && c.CacheFileObjectMetadata.MetaGeneration == metaGeneration
}
// WriteMetadataCheckpointFile writes the metadata struct to a json file so cache files can be recovered on startup
func (c *ContentCache) WriteMetadataCheckpointFile(cacheFileName string, cacheFileObjectMetadata *CacheFileObjectMetadata) (metadataFileName string, err error) {
var file []byte
file, err = json.MarshalIndent(cacheFileObjectMetadata, "", " ")
if err != nil {
err = fmt.Errorf("json.MarshalIndent failed for object metadata: %w", err)
return
}
metadataFileName = fmt.Sprintf("%s.json", cacheFileName)
err = os.WriteFile(metadataFileName, file, 0644)
if err != nil {
err = fmt.Errorf("WriteFile for JSON metadata: %w", err)
return
}
return
}
// Destroy performs disk clean up of cache files and metadata files
func (c *CacheObject) Destroy() {
if c.CacheFile != nil {
os.Remove(c.CacheFile.Name())
c.CacheFile.Destroy()
}
os.Remove(c.MetadataFileName)
}
// recoverFileFromCache recovers a file from the cache via metadata
func (c *ContentCache) recoverFileFromCache(metadataFile fs.FileInfo) {
// validate not a directory and matches gcsfuse pattern
if metadataFile.IsDir() {
return
}
if !matchPattern(metadataFile.Name()) {
return
}
var metadata CacheFileObjectMetadata
metadataAbsolutePath := path.Join(c.tempDir, metadataFile.Name())
contents, err := os.ReadFile(metadataAbsolutePath)
if err != nil {
logger.Errorf("content cache: Skip metadata file %v due to read error: %s", metadataFile.Name(), err)
return
}
err = json.Unmarshal(contents, &metadata)
if err != nil {
logger.Errorf("content cache: Skip metadata file %v due to file corruption: %s", metadataFile.Name(), err)
return
}
cacheObjectKey := &CacheObjectKey{
BucketName: metadata.BucketName,
ObjectName: metadata.ObjectName,
}
fileName := metadata.CacheFileNameOnDisk
// TODO (#641) linux fs limits single process to open max of 1024 file descriptors
// so this is probably not scalable, we should figure out if this is an actual issue or not
file, err := os.Open(fileName)
if err != nil {
logger.Errorf("content cache: Skip cache file %v due to error: %v", fileName, err)
return
}
cacheFile, err := c.recoverCacheFile(file)
if err != nil {
logger.Errorf("content cache: Skip cache file %v due to error: %v", fileName, err)
}
cacheObject := &CacheObject{
MetadataFileName: metadataAbsolutePath,
CacheFileObjectMetadata: &metadata,
CacheFile: cacheFile,
}
c.fileMap[*cacheObjectKey] = cacheObject
}
// RecoverCache recovers the cache with existing persisted files when gcsfuse starts
// RecoverCache should not be called concurrently
func (c *ContentCache) RecoverCache() error {
if c.tempDir == "" {
c.tempDir = "/tmp"
}
logger.Infof("Recovering cache:\n")
dirEntries, err := os.ReadDir(c.tempDir)
if err != nil {
// We failed to get the list of directory entries
// in the temp directory, log and return error.
return fmt.Errorf("recover cache: %w", err)
}
files := make([]os.FileInfo, len(dirEntries))
for i, dirEntry := range dirEntries {
files[i], err = dirEntry.Info()
if err != nil {
// We failed to read a directory entry,
// log and return error.
return fmt.Errorf("recover cache: %w", err)
}
}
for _, metadataFile := range files {
c.recoverFileFromCache(metadataFile)
}
return nil
}
// matchPattern matches the filename format of a gcsfuse file via regex
func matchPattern(fileName string) bool {
match, err := regexp.MatchString(fmt.Sprintf("%v[0-9]+[.]json", CacheFilePrefix), fileName)
if err != nil {
return false
}
return match
}
// New creates a ContentCache.
func New(tempDir string, mtimeClock timeutil.Clock) *ContentCache {
return &ContentCache{
tempDir: tempDir,
fileMap: make(map[CacheObjectKey]*CacheObject),
mtimeClock: mtimeClock,
}
}
// NewTempFile returns a handle for a temporary file on the disk. The caller
// must call Destroy on the TempFile before releasing it.
func (c *ContentCache) NewTempFile(rc io.ReadCloser) (gcsx.TempFile, error) {
return gcsx.NewTempFile(rc, c.tempDir, c.mtimeClock)
}
// AddOrReplace creates a new cache file or updates an existing cache file
// AddOrReplace is thread-safe
func (c *ContentCache) AddOrReplace(cacheObjectKey *CacheObjectKey, generation int64, metaGeneration int64, rc io.ReadCloser) (*CacheObject, error) {
c.mu.Lock()
defer c.mu.Unlock()
if cacheObject, exists := c.fileMap[*cacheObjectKey]; exists {
cacheObject.Destroy()
}
// Create a temporary cache file on disk
f, err := os.CreateTemp(c.tempDir, CacheFilePrefix)
if err != nil {
return nil, fmt.Errorf("TempFile: %w", err)
}
file := c.NewCacheFile(rc, f)
metadata := &CacheFileObjectMetadata{
CacheFileNameOnDisk: file.Name(),
BucketName: cacheObjectKey.BucketName,
ObjectName: cacheObjectKey.ObjectName,
Generation: generation,
MetaGeneration: metaGeneration,
}
var metadataFileName string
metadataFileName, err = c.WriteMetadataCheckpointFile(file.Name(), metadata)
if err != nil {
return nil, fmt.Errorf("WriteMetadataCheckpointFile: %w", err)
}
cacheObject := &CacheObject{
MetadataFileName: metadataFileName,
CacheFileObjectMetadata: metadata,
CacheFile: file,
}
c.fileMap[*cacheObjectKey] = cacheObject
return cacheObject, err
}
// Get retrieves a file from the cache given the GCS object name and bucket name
// Get is thread-safe
func (c *ContentCache) Get(cacheObjectKey *CacheObjectKey) (*CacheObject, bool) {
c.mu.Lock()
defer c.mu.Unlock()
cacheObject, exists := c.fileMap[*cacheObjectKey]
return cacheObject, exists
}
// Remove and destroys the specfied cache file and metadata on disk
// Remove is thread-safe
func (c *ContentCache) Remove(cacheObjectKey *CacheObjectKey) {
c.mu.Lock()
defer c.mu.Unlock()
if cacheObject, exists := c.fileMap[*cacheObjectKey]; exists {
cacheObject.Destroy()
delete(c.fileMap, *cacheObjectKey)
}
}
// NewCacheFile returns a cache tempfile wrapper around the source reader and file
func (c *ContentCache) NewCacheFile(rc io.ReadCloser, f *os.File) gcsx.TempFile {
return gcsx.NewCacheFile(rc, f, c.tempDir, c.mtimeClock)
}
// recoverCacheFile returns a tempfile wrapper around a prepopulated cache file from disk
func (c *ContentCache) recoverCacheFile(f *os.File) (gcsx.TempFile, error) {
return gcsx.RecoverCacheFile(f, c.tempDir, c.mtimeClock)
}
// Size returns the size of the in memory map of cache files
func (c *ContentCache) Size() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.fileMap)
}