in internal/gcsx/random_reader.go [293:379]
func (rr *randomReader) ReadAt(
ctx context.Context,
p []byte,
offset int64) (objectData ObjectData, err error) {
objectData = ObjectData{
DataBuf: p,
CacheHit: false,
Size: 0,
}
if offset >= int64(rr.object.Size) {
err = io.EOF
return
}
// Note: If we are reading the file for the first time and read type is sequential
// then the file cache behavior is write-through i.e. data is first read from
// GCS, cached in file and then served from that file. But the cacheHit is
// false in that case.
n, cacheHit, err := rr.tryReadingFromFileCache(ctx, p, offset)
if err != nil {
err = fmt.Errorf("ReadAt: while reading from cache: %w", err)
return
}
// Data was served from cache.
if cacheHit || n == len(p) || (n < len(p) && uint64(offset)+uint64(n) == rr.object.Size) {
objectData.CacheHit = cacheHit
objectData.Size = n
return
}
// Check first if we can read using existing reader. if not, determine which
// api to use and call gcs accordingly.
// When the offset is AFTER the reader position, try to seek forward, within reason.
// This happens when the kernel page cache serves some data. It's very common for
// concurrent reads, often by only a few 128kB fuse read requests. The aim is to
// re-use GCS connection and avoid throwing away already read data.
// For parallel sequential reads to a single file, not throwing away the connections
// is a 15-20x improvement in throughput: 150-200 MiB/s instead of 10 MiB/s.
if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize {
bytesToSkip := offset - rr.start
discardedBytes, copyError := io.CopyN(io.Discard, rr.reader, int64(bytesToSkip))
// io.EOF is expected if the reader is shorter than the requested offset to read.
if copyError != nil && !errors.Is(copyError, io.EOF) {
logger.Warnf("Error while skipping reader bytes: %v", copyError)
}
rr.start += discardedBytes
}
// If we have an existing reader, but it's positioned at the wrong place,
// clean it up and throw it away.
// We will also clean up the existing reader if it can't serve the entire request.
dataToRead := math.Min(float64(offset+int64(len(p))), float64(rr.object.Size))
if rr.reader != nil && (rr.start != offset || int64(dataToRead) > rr.limit) {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
if rr.start != offset {
// We should only increase the seek count if we have to discard the reader when it's
// positioned at wrong place. Discarding it if can't serve the entire request would
// result in reader size not growing for random reads scenario.
rr.seeks++
}
}
if rr.reader != nil {
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, -1, rr.readType)
return
}
// If we don't have a reader, determine whether to read from NewReader or MRR.
end, err := rr.getReadInfo(offset, int64(len(p)))
if err != nil {
err = fmt.Errorf("ReadAt: getReaderInfo: %w", err)
return
}
readerType := readerType(rr.readType, offset, end, rr.bucket.BucketType())
if readerType == RangeReader {
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, end, rr.readType)
return
}
objectData.Size, err = rr.readFromMultiRangeReader(ctx, p, offset, end, TimeoutForMultiRangeRead)
return
}