in internal/gcsx/garbage_collect.go [30:95]
func garbageCollectOnce(
ctx context.Context,
tmpObjectPrefix string,
bucket gcs.Bucket) (objectsDeleted uint64, err error) {
const stalenessThreshold = 30 * time.Minute
group, ctx := errgroup.WithContext(ctx)
// List all objects with the temporary prefix.
minObjects := make(chan *gcs.MinObject, 100)
group.Go(func() (err error) {
defer close(minObjects)
err = storageutil.ListPrefix(ctx, bucket, tmpObjectPrefix, minObjects)
if err != nil {
err = fmt.Errorf("ListPrefix: %w", err)
return
}
return
})
// Filter to the names of objects that are stale.
now := time.Now()
staleNames := make(chan string, 100)
group.Go(func() (err error) {
defer close(staleNames)
for o := range minObjects {
if now.Sub(o.Updated) < stalenessThreshold {
continue
}
select {
case <-ctx.Done():
err = ctx.Err()
return
case staleNames <- o.Name:
}
}
return
})
// Delete those objects.
group.Go(func() (err error) {
for name := range staleNames {
err = bucket.DeleteObject(
ctx,
&gcs.DeleteObjectRequest{
Name: name,
Generation: 0, // Latest generation of stale object.
})
if err != nil {
err = fmt.Errorf("DeleteObject(%q): %w", name, err)
return
}
atomic.AddUint64(&objectsDeleted, 1)
}
return
})
err = group.Wait()
return
}