in internal/gcsx/bucket_manager.go [159:253]
func (bm *bucketManager) SetUpBucket(
ctx context.Context,
name string,
isMultibucketMount bool,
metricHandle common.MetricHandle,
) (sb SyncerBucket, err error) {
var b gcs.Bucket
// Set up the appropriate backing bucket.
if name == canned.FakeBucketName {
b = canned.MakeFakeBucket(ctx)
} else {
b, err = bm.storageHandle.BucketHandle(ctx, name, bm.config.BillingProject)
if err != nil {
err = fmt.Errorf("BucketHandle: %w", err)
return
}
}
// Enable monitoring.
if bm.config.EnableMonitoring {
b = monitor.NewMonitoringBucket(b, metricHandle)
}
// Enable gcs logs.
b = storage.NewDebugBucket(b)
// Limit to a requested prefix of the bucket, if any.
if bm.config.OnlyDir != "" {
b, err = NewPrefixBucket(path.Clean(bm.config.OnlyDir)+"/", b)
if err != nil {
err = fmt.Errorf("NewPrefixBucket: %w", err)
return
}
}
// Enable rate limiting, if requested.
b, err = setUpRateLimiting(
b,
bm.config.OpRateLimitHz,
bm.config.EgressBandwidthLimitBytesPerSecond)
if err != nil {
err = fmt.Errorf("setUpRateLimiting: %w", err)
return
}
// Enable cached StatObject results based on stat cache config.
// Disabling stat cache with below config also disables negative stat cache.
if bm.config.StatCacheTTL != 0 && bm.sharedStatCache != nil {
var statCache metadata.StatCache
if isMultibucketMount {
statCache = metadata.NewStatCacheBucketView(bm.sharedStatCache, name)
} else {
statCache = metadata.NewStatCacheBucketView(bm.sharedStatCache, "")
}
b = caching.NewFastStatBucket(
bm.config.StatCacheTTL,
statCache,
timeutil.RealClock(),
b,
bm.config.NegativeStatCacheTTL)
}
// Enable content type awareness
b = NewContentTypeBucket(b)
// Enable Syncer
if bm.config.TmpObjectPrefix == "" {
err = errors.New("you must set TmpObjectPrefix")
return
}
sb = NewSyncerBucket(
bm.config.AppendThreshold,
bm.config.ChunkTransferTimeoutSecs,
bm.config.TmpObjectPrefix,
b)
// Fetch bucket type from storage layout api and set bucket type.
b.BucketType()
// Check whether this bucket works, giving the user a warning early if there
// is some problem.
{
_, err = b.ListObjects(ctx, &gcs.ListObjectsRequest{MaxResults: 1})
if err != nil {
return
}
}
// Periodically garbage collect temporary objects
go garbageCollect(bm.gcCtx, bm.config.TmpObjectPrefix, sb)
return
}