internal/gcsx/random_reader.go (390 lines of code) (raw):
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcsx
import (
"errors"
"fmt"
"io"
"math"
"time"
"github.com/google/uuid"
"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/file"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru"
cacheutil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"github.com/jacobsa/fuse/fuseops"
"golang.org/x/net/context"
)
// Min read size in bytes for random reads.
// We will not send a request to GCS for less than this many bytes (unless the
// end of the object comes first).
const minReadSize = MiB
// Max read size in bytes for random reads.
// If the average read size (between seeks) is below this number, reads will
// optimised for random access.
// We will skip forwards in a GCS response at most this many bytes.
// About 6 MiB of data is buffered anyway, so 8 MiB seems like a good round number.
const maxReadSize = 8 * MiB
// Minimum number of seeks before evaluating if the read pattern is random.
const minSeeksForRandom = 2
// TODO(b/385826024): Revert timeout to an appropriate value
const TimeoutForMultiRangeRead = time.Hour
// RandomReader is an object that knows how to read ranges within a particular
// generation of a particular GCS object. Optimised for (large) sequential reads.
//
// Not safe for concurrent access.
//
// TODO - (raj-prince) - Rename this with appropriate name as it also started
// fulfilling the responsibility of reading object's content from cache.
type RandomReader interface {
// Panic if any internal invariants are violated.
CheckInvariants()
// ReadAt returns the data from the requested offset and upto the size of input
// byte array. It either populates input array i.e., p or returns a different
// byte array. In case input array is populated, the same array will be returned
// as part of response. Hence the callers should use the byte array returned
// as part of response always.
ReadAt(ctx context.Context, p []byte, offset int64) (objectData ObjectData, err error)
// Return the record for the object to which the reader is bound.
Object() (o *gcs.MinObject)
// Clean up any resources associated with the reader, which must not be used
// again.
Destroy()
}
// ObjectData specifies the response returned as part of ReadAt call.
type ObjectData struct {
// Byte array populated with the requested data.
DataBuf []byte
// Size of the data returned.
Size int
// Specified whether data is served from cache or not.
CacheHit bool
}
// ReaderType represents different types of go-sdk gcs readers.
// For eg: NewReader and MRD both point to bidi read api. This enum specifies
// the go-sdk type.
type ReaderType int
// ReaderType enum values.
const (
// RangeReader corresponds to NewReader method in bucket_handle.go
RangeReader ReaderType = iota
// MultiRangeReader corresponds to NewMultiRangeDownloader method in bucket_handle.go
MultiRangeReader
)
// NewRandomReader create a random reader for the supplied object record that
// reads using the given bucket.
func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle, mrdWrapper *MultiRangeDownloaderWrapper) RandomReader {
return &randomReader{
object: o,
bucket: bucket,
start: -1,
limit: -1,
seeks: 0,
totalReadBytes: 0,
readType: util.Sequential,
sequentialReadSizeMb: sequentialReadSizeMb,
fileCacheHandler: fileCacheHandler,
cacheFileForRangeRead: cacheFileForRangeRead,
mrdWrapper: mrdWrapper,
metricHandle: metricHandle,
}
}
type randomReader struct {
object *gcs.MinObject
bucket gcs.Bucket
// If non-nil, an in-flight read request and a function for cancelling it.
//
// INVARIANT: (reader == nil) == (cancel == nil)
reader gcs.StorageReader
cancel func()
// The range of the object that we expect reader to yield, when reader is
// non-nil. When reader is nil, limit is the limit of the previous read
// operation, or -1 if there has never been one.
//
// INVARIANT: start <= limit
// INVARIANT: limit < 0 implies reader != nil
// All these properties will be used only in case of GCS reads and not for
// reads from cache.
start int64
limit int64
seeks uint64
totalReadBytes uint64
// ReadType of the reader. Will be sequential by default.
readType string
sequentialReadSizeMb int32
// fileCacheHandler is used to get file cache handle and read happens using that.
// This will be nil if the file cache is disabled.
fileCacheHandler *file.CacheHandler
// cacheFileForRangeRead is also valid for cache workflow, if true, object content
// will be downloaded for random reads as well too.
cacheFileForRangeRead bool
// fileCacheHandle is used to read from the cached location. It is created on the fly
// using fileCacheHandler for the given object and bucket.
fileCacheHandle *file.CacheHandle
// Stores the handle associated with the previously closed newReader instance.
// This will be used while making the new connection to bypass auth and metadata
// checks.
readHandle []byte
// mrdWrapper points to the wrapper object within inode.
mrdWrapper *MultiRangeDownloaderWrapper
// boolean variable to determine if MRD is being used or not.
isMRDInUse bool
metricHandle common.MetricHandle
}
func (rr *randomReader) CheckInvariants() {
// INVARIANT: (reader == nil) == (cancel == nil)
if (rr.reader == nil) != (rr.cancel == nil) {
panic(fmt.Sprintf("Mismatch: %v vs. %v", rr.reader == nil, rr.cancel == nil))
}
// INVARIANT: start <= limit
if !(rr.start <= rr.limit) {
panic(fmt.Sprintf("Unexpected range: [%d, %d)", rr.start, rr.limit))
}
// INVARIANT: limit < 0 implies reader != nil
if rr.limit < 0 && rr.reader != nil {
panic(fmt.Sprintf("Unexpected non-nil reader with limit == %d", rr.limit))
}
}
// tryReadingFromFileCache creates the cache handle first if it doesn't exist already
// and then use that handle to read object's content which is cached in local file.
// For the successful read, it returns number of bytes read, and a boolean representing
// cacheHit as true.
// For unsuccessful read, returns cacheHit as false, in this case content
// should be read from GCS.
// And it returns non-nil error in case something unexpected happens during the execution.
// In this case, we must abort the Read operation.
//
// Important: What happens if the file in cache deleted externally?
// That means, we have fileInfo entry in the fileInfoCache for that deleted file.
// (a) If a new fileCacheHandle is created in that case it will return FileNotPresentInCache
// error, given by fileCacheHandler.GetCacheHandle().
// (b) If there is already an open fileCacheHandle then it means there is an open
// fileHandle to file in cache. So, we will get the correct data from fileHandle
// because Linux does not delete a file until open fileHandle count for a file is zero.
func (rr *randomReader) tryReadingFromFileCache(ctx context.Context,
p []byte,
offset int64) (n int, cacheHit bool, err error) {
if rr.fileCacheHandler == nil {
return
}
// By default, consider read type random if the offset is non-zero.
isSeq := offset == 0
// Request log and start the execution timer.
requestId := uuid.New()
readOp := ctx.Value(ReadOp).(*fuseops.ReadFileOp)
logger.Tracef("%.13v <- FileCache(%s:/%s, offset: %d, size: %d handle: %d)", requestId, rr.bucket.Name(), rr.object.Name, offset, len(p), readOp.Handle)
startTime := time.Now()
// Response log
defer func() {
executionTime := time.Since(startTime)
var requestOutput string
if err != nil {
requestOutput = fmt.Sprintf("err: %v (%v)", err, executionTime)
} else {
if rr.fileCacheHandle != nil {
isSeq = rr.fileCacheHandle.IsSequential(offset)
}
requestOutput = fmt.Sprintf("OK (isSeq: %t, hit: %t) (%v)", isSeq, cacheHit, executionTime)
}
// Here rr.fileCacheHandle will not be nil since we return from the above in those cases.
logger.Tracef("%.13v -> %s", requestId, requestOutput)
readType := util.Random
if isSeq {
readType = util.Sequential
}
captureFileCacheMetrics(ctx, rr.metricHandle, readType, n, cacheHit, executionTime)
}()
// Create fileCacheHandle if not already.
if rr.fileCacheHandle == nil {
rr.fileCacheHandle, err = rr.fileCacheHandler.GetCacheHandle(rr.object, rr.bucket, rr.cacheFileForRangeRead, offset)
if err != nil {
// We fall back to GCS if file size is greater than the cache size
if errors.Is(err, lru.ErrInvalidEntrySize) {
logger.Warnf("tryReadingFromFileCache: while creating CacheHandle: %v", err)
return 0, false, nil
} else if errors.Is(err, cacheutil.ErrCacheHandleNotRequiredForRandomRead) {
// Fall back to GCS if it is a random read, cacheFileForRangeRead is
// False and there doesn't already exist file in cache.
isSeq = false
return 0, false, nil
}
return 0, false, fmt.Errorf("tryReadingFromFileCache: while creating CacheHandle instance: %w", err)
}
}
n, cacheHit, err = rr.fileCacheHandle.Read(ctx, rr.bucket, rr.object, offset, p)
if err == nil {
return
}
cacheHit = false
n = 0
if cacheutil.IsCacheHandleInvalid(err) {
logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", rr.fileCacheHandle, rr.bucket.Name(), rr.object.Name)
err = rr.fileCacheHandle.Close()
if err != nil {
logger.Warnf("tryReadingFromFileCache: while closing fileCacheHandle: %v", err)
}
rr.fileCacheHandle = nil
} else if !errors.Is(err, cacheutil.ErrFallbackToGCS) {
err = fmt.Errorf("tryReadingFromFileCache: while reading via cache: %w", err)
return
}
err = nil
return
}
func (rr *randomReader) ReadAt(
ctx context.Context,
p []byte,
offset int64) (objectData ObjectData, err error) {
objectData = ObjectData{
DataBuf: p,
CacheHit: false,
Size: 0,
}
if offset >= int64(rr.object.Size) {
err = io.EOF
return
}
// Note: If we are reading the file for the first time and read type is sequential
// then the file cache behavior is write-through i.e. data is first read from
// GCS, cached in file and then served from that file. But the cacheHit is
// false in that case.
n, cacheHit, err := rr.tryReadingFromFileCache(ctx, p, offset)
if err != nil {
err = fmt.Errorf("ReadAt: while reading from cache: %w", err)
return
}
// Data was served from cache.
if cacheHit || n == len(p) || (n < len(p) && uint64(offset)+uint64(n) == rr.object.Size) {
objectData.CacheHit = cacheHit
objectData.Size = n
return
}
// Check first if we can read using existing reader. if not, determine which
// api to use and call gcs accordingly.
// When the offset is AFTER the reader position, try to seek forward, within reason.
// This happens when the kernel page cache serves some data. It's very common for
// concurrent reads, often by only a few 128kB fuse read requests. The aim is to
// re-use GCS connection and avoid throwing away already read data.
// For parallel sequential reads to a single file, not throwing away the connections
// is a 15-20x improvement in throughput: 150-200 MiB/s instead of 10 MiB/s.
if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize {
bytesToSkip := offset - rr.start
discardedBytes, copyError := io.CopyN(io.Discard, rr.reader, int64(bytesToSkip))
// io.EOF is expected if the reader is shorter than the requested offset to read.
if copyError != nil && !errors.Is(copyError, io.EOF) {
logger.Warnf("Error while skipping reader bytes: %v", copyError)
}
rr.start += discardedBytes
}
// If we have an existing reader, but it's positioned at the wrong place,
// clean it up and throw it away.
// We will also clean up the existing reader if it can't serve the entire request.
dataToRead := math.Min(float64(offset+int64(len(p))), float64(rr.object.Size))
if rr.reader != nil && (rr.start != offset || int64(dataToRead) > rr.limit) {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
if rr.start != offset {
// We should only increase the seek count if we have to discard the reader when it's
// positioned at wrong place. Discarding it if can't serve the entire request would
// result in reader size not growing for random reads scenario.
rr.seeks++
}
}
if rr.reader != nil {
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, -1, rr.readType)
return
}
// If we don't have a reader, determine whether to read from NewReader or MRR.
end, err := rr.getReadInfo(offset, int64(len(p)))
if err != nil {
err = fmt.Errorf("ReadAt: getReaderInfo: %w", err)
return
}
readerType := readerType(rr.readType, offset, end, rr.bucket.BucketType())
if readerType == RangeReader {
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, end, rr.readType)
return
}
objectData.Size, err = rr.readFromMultiRangeReader(ctx, p, offset, end, TimeoutForMultiRangeRead)
return
}
func (rr *randomReader) Object() (o *gcs.MinObject) {
o = rr.object
return
}
func (rr *randomReader) Destroy() {
defer func() {
if rr.isMRDInUse {
err := rr.mrdWrapper.DecrementRefCount()
if err != nil {
logger.Errorf("randomReader::Destroy:%v", err)
}
rr.isMRDInUse = false
}
}()
// Close out the reader, if we have one.
if rr.reader != nil {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
}
if rr.fileCacheHandle != nil {
logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", rr.fileCacheHandle, rr.bucket.Name(), rr.object.Name)
err := rr.fileCacheHandle.Close()
if err != nil {
logger.Warnf("rr.Destroy(): while closing cacheFileHandle: %v", err)
}
rr.fileCacheHandle = nil
}
}
// Like io.ReadFull, but deals with the cancellation issues.
//
// REQUIRES: rr.reader != nil
func (rr *randomReader) readFull(
ctx context.Context,
p []byte) (n int, err error) {
// Start a goroutine that will cancel the read operation we block on below if
// the calling context is cancelled, but only if this method has not already
// returned (to avoid souring the reader for the next read if this one is
// successful, since the calling context will eventually be cancelled).
readDone := make(chan struct{})
defer close(readDone)
go func() {
select {
case <-readDone:
return
case <-ctx.Done():
select {
case <-readDone:
return
default:
rr.cancel()
}
}
}()
// Call through.
n, err = io.ReadFull(rr.reader, p)
return
}
// Ensure that rr.reader is set up for a range for which [start, start+size) is
// a prefix. Irrespective of the size requested, we try to fetch more data
// from GCS defined by sequentialReadSizeMb flag to serve future read requests.
func (rr *randomReader) startRead(start int64, end int64) (err error) {
// Begin the read.
ctx, cancel := context.WithCancel(context.Background())
rc, err := rr.bucket.NewReaderWithReadHandle(
ctx,
&gcs.ReadObjectRequest{
Name: rr.object.Name,
Generation: rr.object.Generation,
Range: &gcs.ByteRange{
Start: uint64(start),
Limit: uint64(end),
},
ReadCompressed: rr.object.HasContentEncodingGzip(),
ReadHandle: rr.readHandle,
})
// If a file handle is open locally, but the corresponding object doesn't exist
// in GCS, it indicates a file clobbering scenario. This likely occurred because:
// - The file was deleted in GCS while a local handle was still open.
// - The file content was modified leading to different generation number.
var notFoundError *gcs.NotFoundError
if errors.As(err, ¬FoundError) {
err = &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("NewReader: %w", err),
}
return
}
if err != nil {
err = fmt.Errorf("NewReaderWithReadHandle: %w", err)
return
}
rr.reader = rc
rr.cancel = cancel
rr.start = start
rr.limit = end
requestedDataSize := end - start
common.CaptureGCSReadMetrics(ctx, rr.metricHandle, util.Sequential, requestedDataSize)
return
}
// getReaderInfo determines the readType and provides the range to query GCS.
// Range here is [start, end]. End is computed using the readType, start offset
// and size of the data the callers needs.
func (rr *randomReader) getReadInfo(
start int64,
size int64) (end int64, err error) {
// Make sure start and size are legal.
if start < 0 || uint64(start) > rr.object.Size || size < 0 {
err = fmt.Errorf(
"range [%d, %d) is illegal for %d-byte object",
start,
start+size,
rr.object.Size)
return
}
if err != nil {
return
}
// GCS requests are expensive. Prefer to issue read requests defined by
// sequentialReadSizeMb flag. Sequential reads will simply sip from the fire house
// with each call to ReadAt. In practice, GCS will fill the TCP buffers
// with about 6 MiB of data. Requests from outside GCP will be charged
// about 6MB of egress data, even if less data is read. Inside GCP
// regions, GCS egress is free. This logic should limit the number of
// GCS read requests, which are not free.
// But if we notice random read patterns after a minimum number of seeks,
// optimise for random reads. Random reads will read data in chunks of
// (average read size in bytes rounded up to the next MiB).
end = int64(rr.object.Size)
if rr.seeks >= minSeeksForRandom {
rr.readType = util.Random
averageReadBytes := rr.totalReadBytes / rr.seeks
if averageReadBytes < maxReadSize {
randomReadSize := int64(((averageReadBytes / MiB) + 1) * MiB)
if randomReadSize < minReadSize {
randomReadSize = minReadSize
}
if randomReadSize > maxReadSize {
randomReadSize = maxReadSize
}
end = start + randomReadSize
}
}
if end > int64(rr.object.Size) {
end = int64(rr.object.Size)
}
// To avoid overloading GCS and to have reasonable latencies, we will only
// fetch data of max size defined by sequentialReadSizeMb.
maxSizeToReadFromGCS := int64(rr.sequentialReadSizeMb * MiB)
if end-start > maxSizeToReadFromGCS {
end = start + maxSizeToReadFromGCS
}
return
}
// readerType specifies the go-sdk interface to use for reads.
func readerType(readType string, start int64, end int64, bucketType gcs.BucketType) ReaderType {
bytesToBeRead := end - start
if readType == util.Random && bytesToBeRead < maxReadSize && bucketType.Zonal {
return MultiRangeReader
}
return RangeReader
}
// readFromRangeReader reads using the NewReader interface of go-sdk. Its uses
// the existing reader if available, otherwise makes a call to GCS.
func (rr *randomReader) readFromRangeReader(ctx context.Context, p []byte, offset int64, end int64, readType string) (n int, err error) {
// If we don't have a reader, start a read operation.
if rr.reader == nil {
err = rr.startRead(offset, end)
if err != nil {
err = fmt.Errorf("startRead: %w", err)
return
}
}
// Now we have a reader positioned at the correct place. Consume as much from
// it as possible.
n, err = rr.readFull(ctx, p)
rr.start += int64(n)
rr.totalReadBytes += uint64(n)
// Sanity check.
if rr.start > rr.limit {
err = fmt.Errorf("Reader returned extra bytes: %d", rr.start-rr.limit)
// Don't attempt to reuse the reader when it's behaving wackily.
rr.closeReader()
rr.reader = nil
rr.cancel = nil
rr.start = -1
rr.limit = -1
return
}
// Are we finished with this reader now?
if rr.start == rr.limit {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
}
// Handle errors.
switch {
case err == io.EOF || err == io.ErrUnexpectedEOF:
// For a non-empty buffer, ReadFull returns EOF or ErrUnexpectedEOF only
// if the reader peters out early. That's fine, but it means we should
// have hit the limit above.
if rr.reader != nil {
err = fmt.Errorf("Reader returned early by skipping %d bytes", rr.limit-rr.start)
return
}
err = nil
case err != nil:
// Propagate other errors.
err = fmt.Errorf("readFull: %w", err)
return
}
requestedDataSize := end - offset
common.CaptureGCSReadMetrics(ctx, rr.metricHandle, readType, requestedDataSize)
return
}
func (rr *randomReader) readFromMultiRangeReader(ctx context.Context, p []byte, offset, end int64, timeout time.Duration) (bytesRead int, err error) {
if rr.mrdWrapper == nil {
return 0, fmt.Errorf("readFromMultiRangeReader: Invalid MultiRangeDownloaderWrapper")
}
if !rr.isMRDInUse {
rr.isMRDInUse = true
rr.mrdWrapper.IncrementRefCount()
}
bytesRead, err = rr.mrdWrapper.Read(ctx, p, offset, end, timeout, rr.metricHandle)
rr.totalReadBytes += uint64(bytesRead)
return
}
// closeReader fetches the readHandle before closing the reader instance.
func (rr *randomReader) closeReader() {
rr.readHandle = rr.reader.ReadHandle()
err := rr.reader.Close()
if err != nil {
logger.Warnf("error while closing reader: %v", err)
}
}