internal/ratelimit/throttled_bucket.go (191 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package ratelimit import ( "io" storagev2 "cloud.google.com/go/storage" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "golang.org/x/net/context" ) // Create a bucket that limits the rate at which it calls the wrapped bucket // using opThrottle, and limits the bandwidth with which it reads from the // wrapped bucket using egressThrottle. func NewThrottledBucket( opThrottle Throttle, egressThrottle Throttle, wrapped gcs.Bucket) (b gcs.Bucket) { b = &throttledBucket{ opThrottle: opThrottle, egressThrottle: egressThrottle, wrapped: wrapped, } return } //////////////////////////////////////////////////////////////////////// // throttledBucket //////////////////////////////////////////////////////////////////////// type throttledBucket struct { opThrottle Throttle egressThrottle Throttle wrapped gcs.Bucket } func (b *throttledBucket) Name() string { return b.wrapped.Name() } func (b *throttledBucket) BucketType() gcs.BucketType { return b.wrapped.BucketType() } func (b *throttledBucket) NewReaderWithReadHandle( ctx context.Context, req *gcs.ReadObjectRequest) (rd gcs.StorageReader, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. rd, err = b.wrapped.NewReaderWithReadHandle(ctx, req) if err != nil { return } // Wrap the result in a throttled layer. rd = &throttledGCSReader{ Reader: ThrottledReader(ctx, rd, b.egressThrottle), Closer: rd, } return } func (b *throttledBucket) CreateObject( ctx context.Context, req *gcs.CreateObjectRequest) (o *gcs.Object, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. o, err = b.wrapped.CreateObject(ctx, req) return } func (b *throttledBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.CreateObjectRequest, chunkSize int, callBack func(bytesUploadedSoFar int64)) (wc gcs.Writer, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. wc, err = b.wrapped.CreateObjectChunkWriter(ctx, req, chunkSize, callBack) return } func (b *throttledBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) { // FinalizeUpload is not throttled to prevent permanent data loss in case the // limiter's burst size is exceeded. // Note: CreateObjectChunkWriter, a prerequisite for FinalizeUpload, // is throttled. return b.wrapped.FinalizeUpload(ctx, w) } func (b *throttledBucket) FlushPendingWrites(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) { // FlushPendingWrites is not throttled to prevent permanent data loss in case the // limiter's burst size is exceeded. // Note: CreateObjectChunkWriter, a prerequisite for FlushPendingWrites, // is throttled. return b.wrapped.FlushPendingWrites(ctx, w) } func (b *throttledBucket) CopyObject( ctx context.Context, req *gcs.CopyObjectRequest) (o *gcs.Object, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. o, err = b.wrapped.CopyObject(ctx, req) return } func (b *throttledBucket) ComposeObjects( ctx context.Context, req *gcs.ComposeObjectsRequest) (o *gcs.Object, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. o, err = b.wrapped.ComposeObjects(ctx, req) return } func (b *throttledBucket) StatObject( ctx context.Context, req *gcs.StatObjectRequest) (m *gcs.MinObject, e *gcs.ExtendedObjectAttributes, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. m, e, err = b.wrapped.StatObject(ctx, req) return } func (b *throttledBucket) ListObjects( ctx context.Context, req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. listing, err = b.wrapped.ListObjects(ctx, req) return } func (b *throttledBucket) UpdateObject( ctx context.Context, req *gcs.UpdateObjectRequest) (o *gcs.Object, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. o, err = b.wrapped.UpdateObject(ctx, req) return } func (b *throttledBucket) DeleteObject( ctx context.Context, req *gcs.DeleteObjectRequest) (err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. err = b.wrapped.DeleteObject(ctx, req) return } func (b *throttledBucket) MoveObject(ctx context.Context, req *gcs.MoveObjectRequest) (*gcs.Object, error) { // Wait for permission to call through. err := b.opThrottle.Wait(ctx, 1) if err != nil { return nil, err } // Call through. o, err := b.wrapped.MoveObject(ctx, req) return o, err } func (b *throttledBucket) DeleteFolder(ctx context.Context, folderName string) (err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. err = b.wrapped.DeleteFolder(ctx, folderName) return } func (b *throttledBucket) RenameFolder(ctx context.Context, folderName string, destinationFolderId string) (o *gcs.Folder, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. o, err = b.wrapped.RenameFolder(ctx, folderName, destinationFolderId) return } func (b *throttledBucket) GetFolder(ctx context.Context, folderName string) (folder *gcs.Folder, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. folder, err = b.wrapped.GetFolder(ctx, folderName) return folder, err } func (b *throttledBucket) CreateFolder(ctx context.Context, folderName string) (folder *gcs.Folder, err error) { // Wait for permission to call through. err = b.opThrottle.Wait(ctx, 1) if err != nil { return } // Call through. folder, err = b.wrapped.CreateFolder(ctx, folderName) return folder, err } func (b *throttledBucket) NewMultiRangeDownloader( ctx context.Context, req *gcs.MultiRangeDownloaderRequest) (mrd gcs.MultiRangeDownloader, err error) { // Call through. mrd, err = b.wrapped.NewMultiRangeDownloader(ctx, req) return } //////////////////////////////////////////////////////////////////////// // readerCloser //////////////////////////////////////////////////////////////////////// // An io.ReadCloser that forwards read requests to an io.Reader and close // , readHandle requests to gcs.StorageReader. type throttledGCSReader struct { Reader io.Reader Closer gcs.StorageReader } func (rc *throttledGCSReader) Read(p []byte) (n int, err error) { n, err = rc.Reader.Read(p) return } func (rc *throttledGCSReader) Close() (err error) { err = rc.Closer.Close() return } func (rc *throttledGCSReader) ReadHandle() (rh storagev2.ReadHandle) { rh = rc.Closer.ReadHandle() return }