internal/gcsx/bucket_manager.go (167 lines of code) (raw):

// Copyright 2015 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcsx import ( "context" "errors" "fmt" "path" "time" "github.com/googlecloudplatform/gcsfuse/v2/common" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru" "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/metadata" "github.com/googlecloudplatform/gcsfuse/v2/internal/canned" "github.com/googlecloudplatform/gcsfuse/v2/internal/monitor" "github.com/googlecloudplatform/gcsfuse/v2/internal/ratelimit" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/caching" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/util" "github.com/jacobsa/timeutil" ) type BucketConfig struct { BillingProject string OnlyDir string EgressBandwidthLimitBytesPerSecond float64 OpRateLimitHz float64 StatCacheMaxSizeMB uint64 // Config for TTL of entries for existing file in stat cache StatCacheTTL time.Duration // Config for TTL of entries for non-existing file in stat cache NegativeStatCacheTTL time.Duration EnableMonitoring bool // Files backed by on object of length at least AppendThreshold that have // only been appended to (i.e. none of the object's contents have been // dirtied) will be written out by "appending" to the object in GCS with this // process: // // 1. Write out a temporary object containing the appended contents whose // name begins with TmpObjectPrefix. // // 2. Compose the original object and the temporary object on top of the // original object. // // 3. Delete the temporary object. // // Note that if the process fails or is interrupted the temporary object will // not be cleaned up, so the user must ensure that TmpObjectPrefix is // periodically garbage collected. AppendThreshold int64 ChunkTransferTimeoutSecs int64 TmpObjectPrefix string } // BucketManager manages the lifecycle of buckets. type BucketManager interface { SetUpBucket( ctx context.Context, name string, isMultibucketMount bool, metricHandle common.MetricHandle) (b SyncerBucket, err error) // Shuts down the bucket manager and its buckets ShutDown() } type bucketManager struct { config BucketConfig storageHandle storage.StorageHandle sharedStatCache *lru.Cache // Garbage collector gcCtx context.Context stopGarbageCollecting func() } func NewBucketManager(config BucketConfig, storageHandle storage.StorageHandle) BucketManager { var c *lru.Cache if config.StatCacheMaxSizeMB > 0 { c = lru.NewCache(util.MiBsToBytes(config.StatCacheMaxSizeMB)) } bm := &bucketManager{ config: config, storageHandle: storageHandle, sharedStatCache: c, } bm.gcCtx, bm.stopGarbageCollecting = context.WithCancel(context.Background()) return bm } func setUpRateLimiting( in gcs.Bucket, opRateLimitHz float64, egressBandwidthLimit float64) (out gcs.Bucket, err error) { // If no rate limiting has been requested, just return the bucket. if !(opRateLimitHz > 0 || egressBandwidthLimit > 0) { out = in return } // Treat a disabled limit as a very large one. if !(opRateLimitHz > 0) { opRateLimitHz = 1e15 } if !(egressBandwidthLimit > 0) { egressBandwidthLimit = 1e15 } // Choose token bucket capacities, targeting only a few percent error in each // window of the given size. const window = 8 * time.Hour opCapacity, err := ratelimit.ChooseLimiterCapacity( opRateLimitHz, window) if err != nil { err = fmt.Errorf("choosing operation token bucket capacity: %w", err) return } egressCapacity, err := ratelimit.ChooseLimiterCapacity( egressBandwidthLimit, window) if err != nil { err = fmt.Errorf("choosing egress bandwidth token bucket capacity: %w", err) return } // Create the throttles. opThrottle := ratelimit.NewThrottle(opRateLimitHz, opCapacity) egressThrottle := ratelimit.NewThrottle(egressBandwidthLimit, egressCapacity) // And the bucket. out = ratelimit.NewThrottledBucket( opThrottle, egressThrottle, in) return } func (bm *bucketManager) SetUpBucket( ctx context.Context, name string, isMultibucketMount bool, metricHandle common.MetricHandle, ) (sb SyncerBucket, err error) { var b gcs.Bucket // Set up the appropriate backing bucket. if name == canned.FakeBucketName { b = canned.MakeFakeBucket(ctx) } else { b, err = bm.storageHandle.BucketHandle(ctx, name, bm.config.BillingProject) if err != nil { err = fmt.Errorf("BucketHandle: %w", err) return } } // Enable monitoring. if bm.config.EnableMonitoring { b = monitor.NewMonitoringBucket(b, metricHandle) } // Enable gcs logs. b = storage.NewDebugBucket(b) // Limit to a requested prefix of the bucket, if any. if bm.config.OnlyDir != "" { b, err = NewPrefixBucket(path.Clean(bm.config.OnlyDir)+"/", b) if err != nil { err = fmt.Errorf("NewPrefixBucket: %w", err) return } } // Enable rate limiting, if requested. b, err = setUpRateLimiting( b, bm.config.OpRateLimitHz, bm.config.EgressBandwidthLimitBytesPerSecond) if err != nil { err = fmt.Errorf("setUpRateLimiting: %w", err) return } // Enable cached StatObject results based on stat cache config. // Disabling stat cache with below config also disables negative stat cache. if bm.config.StatCacheTTL != 0 && bm.sharedStatCache != nil { var statCache metadata.StatCache if isMultibucketMount { statCache = metadata.NewStatCacheBucketView(bm.sharedStatCache, name) } else { statCache = metadata.NewStatCacheBucketView(bm.sharedStatCache, "") } b = caching.NewFastStatBucket( bm.config.StatCacheTTL, statCache, timeutil.RealClock(), b, bm.config.NegativeStatCacheTTL) } // Enable content type awareness b = NewContentTypeBucket(b) // Enable Syncer if bm.config.TmpObjectPrefix == "" { err = errors.New("you must set TmpObjectPrefix") return } sb = NewSyncerBucket( bm.config.AppendThreshold, bm.config.ChunkTransferTimeoutSecs, bm.config.TmpObjectPrefix, b) // Fetch bucket type from storage layout api and set bucket type. b.BucketType() // Check whether this bucket works, giving the user a warning early if there // is some problem. { _, err = b.ListObjects(ctx, &gcs.ListObjectsRequest{MaxResults: 1}) if err != nil { return } } // Periodically garbage collect temporary objects go garbageCollect(bm.gcCtx, bm.config.TmpObjectPrefix, sb) return } func (bm *bucketManager) ShutDown() { bm.stopGarbageCollecting() }