internal/gcsx/file_cache_reader.go (147 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcsx import ( "context" "errors" "fmt" "io" "strconv" "time" "github.com/google/uuid" "github.com/googlecloudplatform/gcsfuse/v2/common" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/file" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru" cacheUtil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/util" "github.com/jacobsa/fuse/fuseops" ) const ( // ReadOp ("readOp") is the value used in read context to store pointer to the read operation. ReadOp = "readOp" MiB = 1 << 20 ) type FileCacheReader struct { Reader object *gcs.MinObject bucket gcs.Bucket // fileCacheHandler is used to get file cache handle and read happens using that. // This will be nil if the file cache is disabled. fileCacheHandler *file.CacheHandler // cacheFileForRangeRead is also valid for cache workflow, if true, object content // will be downloaded for random reads as well too. cacheFileForRangeRead bool // fileCacheHandle is used to read from the cached location. It is created on the fly // using fileCacheHandler for the given object and bucket. fileCacheHandle *file.CacheHandle metricHandle common.MetricHandle } func NewFileCacheReader(o *gcs.MinObject, bucket gcs.Bucket, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle) *FileCacheReader { return &FileCacheReader{ object: o, bucket: bucket, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: cacheFileForRangeRead, metricHandle: metricHandle, } } // tryReadingFromFileCache creates the cache handle first if it doesn't exist already // and then use that handle to read object's content which is cached in local file. // For the successful read, it returns number of bytes read, and a boolean representing // cacheHit as true. // For unsuccessful read, returns cacheHit as false, in this case content // should be read from GCS. // And it returns non-nil error in case something unexpected happens during the execution. // In this case, we must abort the Read operation. // // Important: What happens if the file in cache deleted externally? // That means, we have fileInfo entry in the fileInfoCache for that deleted file. // (a) If a new fileCacheHandle is created in that case it will return FileNotPresentInCache // error, given by fileCacheHandler.GetCacheHandle(). // (b) If there is already an open fileCacheHandle then it means there is an open // fileHandle to file in cache. So, we will get the correct data from fileHandle // because Linux does not delete a file until open fileHandle count for a file is zero. func (fc *FileCacheReader) tryReadingFromFileCache(ctx context.Context, p []byte, offset int64) (int, bool, error) { if fc.fileCacheHandler == nil { return 0, false, nil } // By default, consider read type random if the offset is non-zero. isSequential := offset == 0 var handleID uint64 if readOp, ok := ctx.Value(ReadOp).(*fuseops.ReadFileOp); ok { handleID = uint64(readOp.Handle) } requestID := uuid.New() logger.Tracef("%.13v <- FileCache(%s:/%s, offset: %d, size: %d, handle: %d)", requestID, fc.bucket.Name(), fc.object.Name, offset, len(p), handleID) startTime := time.Now() var bytesRead int var cacheHit bool var err error defer func() { executionTime := time.Since(startTime) var requestOutput string if err != nil { requestOutput = fmt.Sprintf("err: %v (%v)", err, executionTime) } else { if fc.fileCacheHandle != nil { isSequential = fc.fileCacheHandle.IsSequential(offset) } requestOutput = fmt.Sprintf("OK (isSeq: %t, cacheHit: %t) (%v)", isSequential, cacheHit, executionTime) } logger.Tracef("%.13v -> %s", requestID, requestOutput) readType := util.Random if isSequential { readType = util.Sequential } captureFileCacheMetrics(ctx, fc.metricHandle, readType, bytesRead, cacheHit, executionTime) }() // Create fileCacheHandle if not already. if fc.fileCacheHandle == nil { fc.fileCacheHandle, err = fc.fileCacheHandler.GetCacheHandle(fc.object, fc.bucket, fc.cacheFileForRangeRead, offset) if err != nil { switch { case errors.Is(err, lru.ErrInvalidEntrySize): logger.Warnf("tryReadingFromFileCache: while creating CacheHandle: %v", err) return 0, false, nil case errors.Is(err, cacheUtil.ErrCacheHandleNotRequiredForRandomRead): // Fall back to GCS if it is a random read, cacheFileForRangeRead is // false and there doesn't already exist file in cache. isSequential = false return 0, false, nil default: return 0, false, fmt.Errorf("tryReadingFromFileCache: GetCacheHandle failed: %w", err) } } } bytesRead, cacheHit, err = fc.fileCacheHandle.Read(ctx, fc.bucket, fc.object, offset, p) if err == nil { return bytesRead, cacheHit, nil } bytesRead = 0 cacheHit = false if cacheUtil.IsCacheHandleInvalid(err) { logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", fc.fileCacheHandle, fc.bucket.Name(), fc.object.Name) closeErr := fc.fileCacheHandle.Close() if closeErr != nil { logger.Warnf("tryReadingFromFileCache: close cacheHandle error: %v", closeErr) } fc.fileCacheHandle = nil } else if !errors.Is(err, cacheUtil.ErrFallbackToGCS) { return 0, false, fmt.Errorf("tryReadingFromFileCache: while reading via cache: %w", err) } err = nil return 0, false, nil } func (fc *FileCacheReader) ReadAt(ctx context.Context, p []byte, offset int64) (ReaderResponse, error) { var err error readerResponse := ReaderResponse{ DataBuf: p, Size: 0, } if offset >= int64(fc.object.Size) { err = io.EOF return readerResponse, err } // Note: If we are reading the file for the first time and read type is sequential // then the file cache behavior is write-through i.e. data is first read from // GCS, cached in file and then served from that file. But the cacheHit is // false in that case. bytesRead, cacheHit, err := fc.tryReadingFromFileCache(ctx, p, offset) if err != nil { err = fmt.Errorf("ReadAt: while reading from cache: %w", err) return readerResponse, err } // Data was served from cache. if cacheHit || bytesRead == len(p) || (bytesRead < len(p) && uint64(offset)+uint64(bytesRead) == fc.object.Size) { readerResponse.Size = bytesRead return readerResponse, nil } // The cache is unable to serve data and requires a fallback to another reader. err = FallbackToAnotherReader return readerResponse, err } func captureFileCacheMetrics(ctx context.Context, metricHandle common.MetricHandle, readType string, readDataSize int, cacheHit bool, readLatency time.Duration) { metricHandle.FileCacheReadCount(ctx, 1, []common.MetricAttr{ {Key: common.ReadType, Value: readType}, {Key: common.CacheHit, Value: strconv.FormatBool(cacheHit)}, }) metricHandle.FileCacheReadBytesCount(ctx, int64(readDataSize), []common.MetricAttr{{Key: common.ReadType, Value: readType}}) metricHandle.FileCacheReadLatency(ctx, float64(readLatency.Microseconds()), []common.MetricAttr{{Key: common.CacheHit, Value: strconv.FormatBool(cacheHit)}}) } func (fc *FileCacheReader) Destroy() { if fc.fileCacheHandle != nil { logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", fc.fileCacheHandle, fc.bucket.Name(), fc.object.Name) err := fc.fileCacheHandle.Close() if err != nil { logger.Warnf("fc.Destroy(): while closing cacheFileHandle: %v", err) } fc.fileCacheHandle = nil } }