func garbageCollectOnce()

in internal/gcsx/garbage_collect.go [30:95]


func garbageCollectOnce(
	ctx context.Context,
	tmpObjectPrefix string,
	bucket gcs.Bucket) (objectsDeleted uint64, err error) {
	const stalenessThreshold = 30 * time.Minute
	group, ctx := errgroup.WithContext(ctx)

	// List all objects with the temporary prefix.
	minObjects := make(chan *gcs.MinObject, 100)
	group.Go(func() (err error) {
		defer close(minObjects)
		err = storageutil.ListPrefix(ctx, bucket, tmpObjectPrefix, minObjects)
		if err != nil {
			err = fmt.Errorf("ListPrefix: %w", err)
			return
		}

		return
	})

	// Filter to the names of objects that are stale.
	now := time.Now()
	staleNames := make(chan string, 100)
	group.Go(func() (err error) {
		defer close(staleNames)
		for o := range minObjects {
			if now.Sub(o.Updated) < stalenessThreshold {
				continue
			}

			select {
			case <-ctx.Done():
				err = ctx.Err()
				return

			case staleNames <- o.Name:
			}
		}

		return
	})

	// Delete those objects.
	group.Go(func() (err error) {
		for name := range staleNames {
			err = bucket.DeleteObject(
				ctx,
				&gcs.DeleteObjectRequest{
					Name:       name,
					Generation: 0, // Latest generation of stale object.
				})

			if err != nil {
				err = fmt.Errorf("DeleteObject(%q): %w", name, err)
				return
			}

			atomic.AddUint64(&objectsDeleted, 1)
		}

		return
	})

	err = group.Wait()
	return
}