internal/gcsx/client_readers/multi_range_reader.go (58 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcsx
import (
"context"
"fmt"
"io"
"time"
"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
)
// TimeoutForMultiRangeRead is the timeout value for multi-range read operations.
//
// TODO(b/385826024): Revert timeout to an appropriate value. This value is currently a placeholder and needs to be adjusted.
const TimeoutForMultiRangeRead = time.Hour
type MultiRangeReader struct {
gcsx.GCSReader
object *gcs.MinObject
// mrdWrapper points to the wrapper object within inode.
mrdWrapper *gcsx.MultiRangeDownloaderWrapper
// boolean variable to determine if MRD is being used or not.
isMRDInUse bool
metricHandle common.MetricHandle
}
func NewMultiRangeReader(object *gcs.MinObject, metricHandle common.MetricHandle, mrdWrapper *gcsx.MultiRangeDownloaderWrapper) *MultiRangeReader {
return &MultiRangeReader{
object: object,
metricHandle: metricHandle,
mrdWrapper: mrdWrapper,
}
}
// readFromMultiRangeReader reads data from the underlying MultiRangeDownloaderWrapper.
//
// It increments the reference count of the mrdWrapper if it's not already in use.
// It then calls the Read method of the mrdWrapper with the provided parameters.
//
// Parameters:
// - ctx: The context for the read operation. It can be used to cancel the operation or set a timeout.
// - p: The byte slice to read data into.
// - offset: The starting offset for the read operation.
// - end: The ending offset for the read operation.
// - timeout: The maximum duration for the read operation.
//
// Returns:
// - int: The number of bytes read.
// - error: An error if the read operation fails.
func (mrd *MultiRangeReader) readFromMultiRangeReader(ctx context.Context, p []byte, offset, end int64, timeout time.Duration) (int, error) {
if mrd.mrdWrapper == nil {
return 0, fmt.Errorf("readFromMultiRangeReader: Invalid MultiRangeDownloaderWrapper")
}
if !mrd.isMRDInUse {
mrd.isMRDInUse = true
mrd.mrdWrapper.IncrementRefCount()
}
return mrd.mrdWrapper.Read(ctx, p, offset, end, timeout, mrd.metricHandle)
}
func (mrd *MultiRangeReader) ReadAt(ctx context.Context, req *gcsx.GCSReaderRequest) (gcsx.ReaderResponse, error) {
readerResponse := gcsx.ReaderResponse{
DataBuf: req.Buffer,
Size: 0,
}
var err error
if req.Offset >= int64(mrd.object.Size) {
err = io.EOF
return readerResponse, err
}
readerResponse.Size, err = mrd.readFromMultiRangeReader(ctx, req.Buffer, req.Offset, req.EndOffset, TimeoutForMultiRangeRead)
return readerResponse, err
}
func (mrd *MultiRangeReader) destroy() {
if mrd.isMRDInUse {
err := mrd.mrdWrapper.DecrementRefCount()
if err != nil {
logger.Errorf("randomReader::Destroy:%v", err)
}
mrd.isMRDInUse = false
}
}