in pkg/berglas/delete.go [120:226]
func (c *Client) storageDelete(ctx context.Context, i *StorageDeleteRequest) error {
bucket := i.Bucket
if bucket == "" {
return fmt.Errorf("missing bucket name")
}
object := i.Object
if object == "" {
return fmt.Errorf("missing object name")
}
logger := logging.FromContext(ctx).With(
"bucket", bucket,
"object", object,
)
logger.DebugContext(ctx, "delete.start")
defer logger.DebugContext(ctx, "delete.finish")
it := c.storageClient.
Bucket(bucket).
Objects(ctx, &storage.Query{
Prefix: object,
Versions: true,
})
// Create a workerpool for parallel deletion of resources
parallelism := int64(runtime.NumCPU() - 1)
sem := semaphore.NewWeighted(parallelism)
errCh := make(chan error)
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
logger.DebugContext(ctx, "deleting secrets", "parallelism", parallelism)
L:
for {
obj, err := it.Next()
if err == iterator.Done {
logger.DebugContext(ctx, "out of objects")
break
}
if err != nil {
logger.ErrorContext(ctx, "failed to get object", "error", err)
select {
case <-childCtx.Done():
logger.DebugContext(ctx, "exiting because context finished")
case errCh <- fmt.Errorf("failed to list secrets: %w", err):
logger.DebugContext(ctx, "pushed error onto channel, canceling other jobs")
cancel()
default:
logger.ErrorContext(ctx, "received error, but channel blocked", "error", err)
}
}
// Don't queue more tasks if a failure has been encountered already
select {
case <-childCtx.Done():
logger.DebugContext(ctx, "child context is finished, exiting")
break L
default:
logger := logger.With("generation", obj.Generation)
logger.DebugContext(ctx, "queueing delete worker")
if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire semaphore: %w", err)
}
go func() {
defer sem.Release(1)
if err := c.storageClient.
Bucket(bucket).
Object(object).
Generation(obj.Generation).
Delete(childCtx); err != nil {
logger.ErrorContext(ctx, "worker failed to delete object", "error", err)
select {
case <-childCtx.Done():
case errCh <- fmt.Errorf("failed to delete generation: %w", err):
logger.DebugContext(ctx, "worker pushed error onto channel, canceling other jobs")
cancel()
default:
logger.ErrorContext(ctx, "worker received error but channel blocked", "error", err)
cancel()
}
}
}()
}
}
// Wait for jobs to finish
logger.DebugContext(ctx, "waiting for delete jobs to finish")
if err := sem.Acquire(ctx, parallelism); err != nil {
return fmt.Errorf("failed to wait for jobs to finish: %w", err)
}
select {
case err := <-errCh:
return fmt.Errorf("failed to delete secret: %w", err)
default:
return nil
}
}