in internal/gcsx/random_reader.go [210:291]
func (rr *randomReader) tryReadingFromFileCache(ctx context.Context,
p []byte,
offset int64) (n int, cacheHit bool, err error) {
if rr.fileCacheHandler == nil {
return
}
// By default, consider read type random if the offset is non-zero.
isSeq := offset == 0
// Request log and start the execution timer.
requestId := uuid.New()
readOp := ctx.Value(ReadOp).(*fuseops.ReadFileOp)
logger.Tracef("%.13v <- FileCache(%s:/%s, offset: %d, size: %d handle: %d)", requestId, rr.bucket.Name(), rr.object.Name, offset, len(p), readOp.Handle)
startTime := time.Now()
// Response log
defer func() {
executionTime := time.Since(startTime)
var requestOutput string
if err != nil {
requestOutput = fmt.Sprintf("err: %v (%v)", err, executionTime)
} else {
if rr.fileCacheHandle != nil {
isSeq = rr.fileCacheHandle.IsSequential(offset)
}
requestOutput = fmt.Sprintf("OK (isSeq: %t, hit: %t) (%v)", isSeq, cacheHit, executionTime)
}
// Here rr.fileCacheHandle will not be nil since we return from the above in those cases.
logger.Tracef("%.13v -> %s", requestId, requestOutput)
readType := util.Random
if isSeq {
readType = util.Sequential
}
captureFileCacheMetrics(ctx, rr.metricHandle, readType, n, cacheHit, executionTime)
}()
// Create fileCacheHandle if not already.
if rr.fileCacheHandle == nil {
rr.fileCacheHandle, err = rr.fileCacheHandler.GetCacheHandle(rr.object, rr.bucket, rr.cacheFileForRangeRead, offset)
if err != nil {
// We fall back to GCS if file size is greater than the cache size
if errors.Is(err, lru.ErrInvalidEntrySize) {
logger.Warnf("tryReadingFromFileCache: while creating CacheHandle: %v", err)
return 0, false, nil
} else if errors.Is(err, cacheutil.ErrCacheHandleNotRequiredForRandomRead) {
// Fall back to GCS if it is a random read, cacheFileForRangeRead is
// False and there doesn't already exist file in cache.
isSeq = false
return 0, false, nil
}
return 0, false, fmt.Errorf("tryReadingFromFileCache: while creating CacheHandle instance: %w", err)
}
}
n, cacheHit, err = rr.fileCacheHandle.Read(ctx, rr.bucket, rr.object, offset, p)
if err == nil {
return
}
cacheHit = false
n = 0
if cacheutil.IsCacheHandleInvalid(err) {
logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", rr.fileCacheHandle, rr.bucket.Name(), rr.object.Name)
err = rr.fileCacheHandle.Close()
if err != nil {
logger.Warnf("tryReadingFromFileCache: while closing fileCacheHandle: %v", err)
}
rr.fileCacheHandle = nil
} else if !errors.Is(err, cacheutil.ErrFallbackToGCS) {
err = fmt.Errorf("tryReadingFromFileCache: while reading via cache: %w", err)
return
}
err = nil
return
}