func()

in internal/gcsx/bucket_manager.go [159:253]


func (bm *bucketManager) SetUpBucket(
	ctx context.Context,
	name string,
	isMultibucketMount bool,
	metricHandle common.MetricHandle,
) (sb SyncerBucket, err error) {
	var b gcs.Bucket
	// Set up the appropriate backing bucket.
	if name == canned.FakeBucketName {
		b = canned.MakeFakeBucket(ctx)
	} else {
		b, err = bm.storageHandle.BucketHandle(ctx, name, bm.config.BillingProject)
		if err != nil {
			err = fmt.Errorf("BucketHandle: %w", err)
			return
		}
	}

	// Enable monitoring.
	if bm.config.EnableMonitoring {
		b = monitor.NewMonitoringBucket(b, metricHandle)
	}

	// Enable gcs logs.
	b = storage.NewDebugBucket(b)

	// Limit to a requested prefix of the bucket, if any.
	if bm.config.OnlyDir != "" {
		b, err = NewPrefixBucket(path.Clean(bm.config.OnlyDir)+"/", b)
		if err != nil {
			err = fmt.Errorf("NewPrefixBucket: %w", err)
			return
		}
	}

	// Enable rate limiting, if requested.
	b, err = setUpRateLimiting(
		b,
		bm.config.OpRateLimitHz,
		bm.config.EgressBandwidthLimitBytesPerSecond)

	if err != nil {
		err = fmt.Errorf("setUpRateLimiting: %w", err)
		return
	}

	// Enable cached StatObject results based on stat cache config.
	// Disabling stat cache with below config also disables negative stat cache.
	if bm.config.StatCacheTTL != 0 && bm.sharedStatCache != nil {
		var statCache metadata.StatCache
		if isMultibucketMount {
			statCache = metadata.NewStatCacheBucketView(bm.sharedStatCache, name)
		} else {
			statCache = metadata.NewStatCacheBucketView(bm.sharedStatCache, "")
		}

		b = caching.NewFastStatBucket(
			bm.config.StatCacheTTL,
			statCache,
			timeutil.RealClock(),
			b,
			bm.config.NegativeStatCacheTTL)
	}

	// Enable content type awareness
	b = NewContentTypeBucket(b)

	// Enable Syncer
	if bm.config.TmpObjectPrefix == "" {
		err = errors.New("you must set TmpObjectPrefix")
		return
	}
	sb = NewSyncerBucket(
		bm.config.AppendThreshold,
		bm.config.ChunkTransferTimeoutSecs,
		bm.config.TmpObjectPrefix,
		b)

	// Fetch bucket type from storage layout api and set bucket type.
	b.BucketType()

	// Check whether this bucket works, giving the user a warning early if there
	// is some problem.
	{
		_, err = b.ListObjects(ctx, &gcs.ListObjectsRequest{MaxResults: 1})
		if err != nil {
			return
		}
	}

	// Periodically garbage collect temporary objects
	go garbageCollect(bm.gcCtx, bm.config.TmpObjectPrefix, sb)

	return
}