internal/gcsx/garbage_collect.go (94 lines of code) (raw):

// Copyright 2015 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcsx import ( "fmt" "sync/atomic" "time" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" "golang.org/x/net/context" "golang.org/x/sync/errgroup" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" ) func garbageCollectOnce( ctx context.Context, tmpObjectPrefix string, bucket gcs.Bucket) (objectsDeleted uint64, err error) { const stalenessThreshold = 30 * time.Minute group, ctx := errgroup.WithContext(ctx) // List all objects with the temporary prefix. minObjects := make(chan *gcs.MinObject, 100) group.Go(func() (err error) { defer close(minObjects) err = storageutil.ListPrefix(ctx, bucket, tmpObjectPrefix, minObjects) if err != nil { err = fmt.Errorf("ListPrefix: %w", err) return } return }) // Filter to the names of objects that are stale. now := time.Now() staleNames := make(chan string, 100) group.Go(func() (err error) { defer close(staleNames) for o := range minObjects { if now.Sub(o.Updated) < stalenessThreshold { continue } select { case <-ctx.Done(): err = ctx.Err() return case staleNames <- o.Name: } } return }) // Delete those objects. group.Go(func() (err error) { for name := range staleNames { err = bucket.DeleteObject( ctx, &gcs.DeleteObjectRequest{ Name: name, Generation: 0, // Latest generation of stale object. }) if err != nil { err = fmt.Errorf("DeleteObject(%q): %w", name, err) return } atomic.AddUint64(&objectsDeleted, 1) } return }) err = group.Wait() return } // Periodically delete stale temporary objects from the supplied bucket until // the context is cancelled. func garbageCollect( ctx context.Context, tmpObjectPrefix string, bucket gcs.Bucket) { const period = 10 * time.Minute ticker := time.NewTicker(period) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: } logger.Info("Starting a garbage collection run.") startTime := time.Now() objectsDeleted, err := garbageCollectOnce(ctx, tmpObjectPrefix, bucket) if err != nil { logger.Infof( "Garbage collection failed after deleting %d objects in %v, "+ "with error: %v", objectsDeleted, time.Since(startTime), err) } else { logger.Infof( "Garbage collection succeeded after deleted %d objects in %v.", objectsDeleted, time.Since(startTime)) } } }