func()

in internal/gcsx/random_reader.go [210:291]


func (rr *randomReader) tryReadingFromFileCache(ctx context.Context,
	p []byte,
	offset int64) (n int, cacheHit bool, err error) {

	if rr.fileCacheHandler == nil {
		return
	}

	// By default, consider read type random if the offset is non-zero.
	isSeq := offset == 0

	// Request log and start the execution timer.
	requestId := uuid.New()
	readOp := ctx.Value(ReadOp).(*fuseops.ReadFileOp)
	logger.Tracef("%.13v <- FileCache(%s:/%s, offset: %d, size: %d handle: %d)", requestId, rr.bucket.Name(), rr.object.Name, offset, len(p), readOp.Handle)
	startTime := time.Now()

	// Response log
	defer func() {
		executionTime := time.Since(startTime)
		var requestOutput string
		if err != nil {
			requestOutput = fmt.Sprintf("err: %v (%v)", err, executionTime)
		} else {
			if rr.fileCacheHandle != nil {
				isSeq = rr.fileCacheHandle.IsSequential(offset)
			}
			requestOutput = fmt.Sprintf("OK (isSeq: %t, hit: %t) (%v)", isSeq, cacheHit, executionTime)
		}

		// Here rr.fileCacheHandle will not be nil since we return from the above in those cases.
		logger.Tracef("%.13v -> %s", requestId, requestOutput)

		readType := util.Random
		if isSeq {
			readType = util.Sequential
		}
		captureFileCacheMetrics(ctx, rr.metricHandle, readType, n, cacheHit, executionTime)
	}()

	// Create fileCacheHandle if not already.
	if rr.fileCacheHandle == nil {
		rr.fileCacheHandle, err = rr.fileCacheHandler.GetCacheHandle(rr.object, rr.bucket, rr.cacheFileForRangeRead, offset)
		if err != nil {
			// We fall back to GCS if file size is greater than the cache size
			if errors.Is(err, lru.ErrInvalidEntrySize) {
				logger.Warnf("tryReadingFromFileCache: while creating CacheHandle: %v", err)
				return 0, false, nil
			} else if errors.Is(err, cacheutil.ErrCacheHandleNotRequiredForRandomRead) {
				// Fall back to GCS if it is a random read, cacheFileForRangeRead is
				// False and there doesn't already exist file in cache.
				isSeq = false
				return 0, false, nil
			}

			return 0, false, fmt.Errorf("tryReadingFromFileCache: while creating CacheHandle instance: %w", err)
		}
	}

	n, cacheHit, err = rr.fileCacheHandle.Read(ctx, rr.bucket, rr.object, offset, p)
	if err == nil {
		return
	}

	cacheHit = false
	n = 0

	if cacheutil.IsCacheHandleInvalid(err) {
		logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", rr.fileCacheHandle, rr.bucket.Name(), rr.object.Name)
		err = rr.fileCacheHandle.Close()
		if err != nil {
			logger.Warnf("tryReadingFromFileCache: while closing fileCacheHandle: %v", err)
		}
		rr.fileCacheHandle = nil
	} else if !errors.Is(err, cacheutil.ErrFallbackToGCS) {
		err = fmt.Errorf("tryReadingFromFileCache: while reading via cache: %w", err)
		return
	}
	err = nil

	return
}