func()

in internal/gcsx/random_reader.go [293:379]


func (rr *randomReader) ReadAt(
	ctx context.Context,
	p []byte,
	offset int64) (objectData ObjectData, err error) {
	objectData = ObjectData{
		DataBuf:  p,
		CacheHit: false,
		Size:     0,
	}

	if offset >= int64(rr.object.Size) {
		err = io.EOF
		return
	}

	// Note: If we are reading the file for the first time and read type is sequential
	// then the file cache behavior is write-through i.e. data is first read from
	// GCS, cached in file and then served from that file. But the cacheHit is
	// false in that case.
	n, cacheHit, err := rr.tryReadingFromFileCache(ctx, p, offset)
	if err != nil {
		err = fmt.Errorf("ReadAt: while reading from cache: %w", err)
		return
	}
	// Data was served from cache.
	if cacheHit || n == len(p) || (n < len(p) && uint64(offset)+uint64(n) == rr.object.Size) {
		objectData.CacheHit = cacheHit
		objectData.Size = n
		return
	}

	// Check first if we can read using existing reader. if not, determine which
	// api to use and call gcs accordingly.

	// When the offset is AFTER the reader position, try to seek forward, within reason.
	// This happens when the kernel page cache serves some data. It's very common for
	// concurrent reads, often by only a few 128kB fuse read requests. The aim is to
	// re-use GCS connection and avoid throwing away already read data.
	// For parallel sequential reads to a single file, not throwing away the connections
	// is a 15-20x improvement in throughput: 150-200 MiB/s instead of 10 MiB/s.
	if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize {
		bytesToSkip := offset - rr.start
		discardedBytes, copyError := io.CopyN(io.Discard, rr.reader, int64(bytesToSkip))
		// io.EOF is expected if the reader is shorter than the requested offset to read.
		if copyError != nil && !errors.Is(copyError, io.EOF) {
			logger.Warnf("Error while skipping reader bytes: %v", copyError)
		}
		rr.start += discardedBytes
	}

	// If we have an existing reader, but it's positioned at the wrong place,
	// clean it up and throw it away.
	// We will also clean up the existing reader if it can't serve the entire request.
	dataToRead := math.Min(float64(offset+int64(len(p))), float64(rr.object.Size))
	if rr.reader != nil && (rr.start != offset || int64(dataToRead) > rr.limit) {
		rr.closeReader()
		rr.reader = nil
		rr.cancel = nil
		if rr.start != offset {
			// We should only increase the seek count if we have to discard the reader when it's
			// positioned at wrong place. Discarding it if can't serve the entire request would
			// result in reader size not growing for random reads scenario.
			rr.seeks++
		}
	}

	if rr.reader != nil {
		objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, -1, rr.readType)
		return
	}

	// If we don't have a reader, determine whether to read from NewReader or MRR.
	end, err := rr.getReadInfo(offset, int64(len(p)))
	if err != nil {
		err = fmt.Errorf("ReadAt: getReaderInfo: %w", err)
		return
	}

	readerType := readerType(rr.readType, offset, end, rr.bucket.BucketType())
	if readerType == RangeReader {
		objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, end, rr.readType)
		return
	}

	objectData.Size, err = rr.readFromMultiRangeReader(ctx, p, offset, end, TimeoutForMultiRangeRead)
	return
}