internal/gcsx/client_readers/range_reader.go (194 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcsx
import (
"context"
"errors"
"fmt"
"io"
"math"
"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors"
"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
)
const (
MiB = 1 << 20
// Max read size in bytes for random reads.
// If the average read size (between seeks) is below this number, reads will optimise for random access.
// We will skip forwards in a GCS response at most this many bytes.
maxReadSize = 8 * MiB
)
type RangeReader struct {
gcsx.GCSReader
object *gcs.MinObject
bucket gcs.Bucket
// start is the current read offset of the reader.
start int64
// limit is the exclusive upper bound up to which the reader can read.
limit int64
// If non-nil, an in-flight read request and a function for cancelling it.
//
// INVARIANT: (reader == nil) == (cancel == nil)
reader gcs.StorageReader
// Stores the handle associated with the previously closed newReader instance.
// This will be used while making the new connection to bypass auth and metadata
// checks.
readHandle []byte
cancel func()
readType string
metricHandle common.MetricHandle
}
func NewRangeReader(object *gcs.MinObject, bucket gcs.Bucket, metricHandle common.MetricHandle) *RangeReader {
return &RangeReader{
object: object,
bucket: bucket,
metricHandle: metricHandle,
start: -1,
limit: -1,
}
}
func (rr *RangeReader) checkInvariants() {
// INVARIANT: (reader == nil) == (cancel == nil)
if (rr.reader == nil) != (rr.cancel == nil) {
panic(fmt.Sprintf("Mismatch: %v vs. %v", rr.reader == nil, rr.cancel == nil))
}
// INVARIANT: start <= limit
if !(rr.start <= rr.limit) {
panic(fmt.Sprintf("Unexpected range: [%d, %d)", rr.start, rr.limit))
}
// INVARIANT: limit < 0 implies reader != nil
if rr.limit < 0 && rr.reader != nil {
panic(fmt.Sprintf("Unexpected non-nil reader with limit == %d", rr.limit))
}
}
func (rr *RangeReader) destroy() {
// Close out the reader, if we have one.
if rr.reader != nil {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
}
}
// closeReader fetches the readHandle before closing the reader instance.
func (rr *RangeReader) closeReader() {
rr.readHandle = rr.reader.ReadHandle()
err := rr.reader.Close()
if err != nil {
logger.Warnf("error while closing reader: %v", err)
}
}
func (rr *RangeReader) ReadAt(ctx context.Context, req *gcsx.GCSReaderRequest) (gcsx.ReaderResponse, error) {
readerResponse := gcsx.ReaderResponse{
DataBuf: req.Buffer,
Size: 0,
}
var err error
if req.Offset >= int64(rr.object.Size) {
err = io.EOF
return readerResponse, err
}
readerResponse.Size, err = rr.readFromRangeReader(ctx, req.Buffer, req.Offset, req.EndOffset, rr.readType)
return readerResponse, err
}
// readFromRangeReader reads using the NewReader interface of go-sdk. It uses
// the existing reader if available, otherwise makes a call to GCS.
// Before calling this method we have to use invalidateReaderIfMisalignedOrTooSmall to get the reader start at the correct position.
func (rr *RangeReader) readFromRangeReader(ctx context.Context, p []byte, offset int64, end int64, readType string) (int, error) {
var err error
// If we don't have a reader, start a read operation.
if rr.reader == nil {
err = rr.startRead(offset, end)
if err != nil {
err = fmt.Errorf("startRead: %w", err)
return 0, err
}
}
var n int
n, err = rr.readFull(ctx, p)
rr.start += int64(n)
// Sanity check.
if rr.start > rr.limit {
err = fmt.Errorf("reader returned extra bytes: %d", rr.start-rr.limit)
// Don't attempt to reuse the reader when it's malfunctioning.
rr.closeReader()
rr.reader = nil
rr.cancel = nil
rr.start = -1
rr.limit = -1
return 0, err
}
// Are we finished with this reader now?
if rr.start == rr.limit {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
}
// Handle errors.
switch {
case errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF):
// For a non-empty buffer, ReadFull returns EOF or ErrUnexpectedEOF only
// if the reader peters out early. That's fine, but it means we should
// have hit the limit above.
if rr.reader != nil {
err = fmt.Errorf("reader returned early by skipping %d bytes", rr.limit-rr.start)
return 0, err
}
err = nil
case err != nil:
// Propagate other errors.
err = fmt.Errorf("readFull: %w", err)
return 0, err
}
requestedDataSize := end - offset
common.CaptureGCSReadMetrics(ctx, rr.metricHandle, readType, requestedDataSize)
return n, err
}
// Like io.ReadFull, but deals with the cancellation issues.
//
// REQUIRES: rr.reader != nil
func (rr *RangeReader) readFull(ctx context.Context, p []byte) (int, error) {
// Start a goroutine that will cancel the read operation we block on below if
// the calling context is cancelled, but only if this method has not already
// returned (to avoid souring the reader for the next read if this one is
// successful, since the calling context will eventually be cancelled).
readDone := make(chan struct{})
defer close(readDone)
go func() {
select {
case <-readDone:
return
case <-ctx.Done():
select {
case <-readDone:
return
default:
rr.cancel()
}
}
}()
return io.ReadFull(rr.reader, p)
}
// Ensure that rr.reader is set up for a range for which [start, start+size) is
// a prefix. Irrespective of the size requested, we try to fetch more data
// from GCS defined by sequentialReadSizeMb flag to serve future read requests.
func (rr *RangeReader) startRead(start int64, end int64) error {
ctx, cancel := context.WithCancel(context.Background())
rc, err := rr.bucket.NewReaderWithReadHandle(
ctx,
&gcs.ReadObjectRequest{
Name: rr.object.Name,
Generation: rr.object.Generation,
Range: &gcs.ByteRange{
Start: uint64(start),
Limit: uint64(end),
},
ReadCompressed: rr.object.HasContentEncodingGzip(),
ReadHandle: rr.readHandle,
})
// If a file handle is open locally, but the corresponding object doesn't exist
// in GCS, it indicates a file clobbering scenario. This likely occurred because:
// - The file was deleted in GCS while a local handle was still open.
// - The file content was modified leading to different generation number.
var notFoundError *gcs.NotFoundError
if errors.As(err, ¬FoundError) {
err = &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("NewReader: %w", err),
}
cancel()
return err
}
if err != nil {
err = fmt.Errorf("NewReaderWithReadHandle: %w", err)
cancel()
return err
}
rr.reader = rc
rr.cancel = cancel
rr.start = start
rr.limit = end
requestedDataSize := end - start
common.CaptureGCSReadMetrics(ctx, rr.metricHandle, util.Sequential, requestedDataSize)
return nil
}
// skipBytes attempts to advance the reader position to the given offset without
// discarding the existing reader. If possible, it reads and discards data to
// maintain an active GCS connection, improving throughput for sequential reads.
func (rr *RangeReader) skipBytes(offset int64) {
// When the offset is AFTER the reader position, try to seek forward, within reason.
// This happens when the kernel page cache serves some data. It's very common for
// concurrent reads, often by only a few 128kB fuse read requests. The aim is to
// re-use GCS connection and avoid throwing away already read data.
// For parallel sequential reads to a single file, not throwing away the connections
// is a 15-20x improvement in throughput: 150-200 MiB/s instead of 10 MiB/s.
if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize {
bytesToSkip := offset - rr.start
discardedBytes, copyError := io.CopyN(io.Discard, rr.reader, bytesToSkip)
// io.EOF is expected if the reader is shorter than the requested offset to read.
if copyError != nil && !errors.Is(copyError, io.EOF) {
logger.Warnf("Error while skipping reader bytes: %v", copyError)
}
rr.start += discardedBytes
}
}
// invalidateReaderIfMisalignedOrTooSmall ensures that the existing reader is valid
// for the requested offset and length. If the reader is misaligned (not at the requested
// offset) or cannot serve the full request within its limit, it is closed and discarded.
//
// It attempts to skip forward to the requested offset if possible to avoid creating
// a new reader unnecessarily. If the reader is discarded due to misalignment, the method
// returns true to signal that a seek should be recorded.
//
// Parameters:
// - offset: the starting byte position of the requested read.
// - p: the buffer representing the size of the requested read.
//
// Returns:
// - true if the reader was discarded due to being misaligned (seek should be counted).
// - false otherwise.
func (rr *RangeReader) invalidateReaderIfMisalignedOrTooSmall(offset int64, p []byte) bool {
rr.skipBytes(offset)
// If we have an existing reader, but it's positioned at the wrong place,
// clean it up and throw it away.
// We will also clean up the existing reader if it can't serve the entire request.
dataToRead := math.Min(float64(offset+int64(len(p))), float64(rr.object.Size))
if rr.reader != nil && (rr.start != offset || int64(dataToRead) > rr.limit) {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
if rr.start != offset {
// Return true to increment the seek count when discarding a reader due to incorrect positioning.
// Discarding readers that can't fulfill the entire request without this check would prevent
// the reader size from growing appropriately in random read scenarios.
return true
}
}
return false
}