func()

in pkg/berglas/delete.go [120:226]


func (c *Client) storageDelete(ctx context.Context, i *StorageDeleteRequest) error {
	bucket := i.Bucket
	if bucket == "" {
		return fmt.Errorf("missing bucket name")
	}

	object := i.Object
	if object == "" {
		return fmt.Errorf("missing object name")
	}

	logger := logging.FromContext(ctx).With(
		"bucket", bucket,
		"object", object,
	)

	logger.DebugContext(ctx, "delete.start")
	defer logger.DebugContext(ctx, "delete.finish")

	it := c.storageClient.
		Bucket(bucket).
		Objects(ctx, &storage.Query{
			Prefix:   object,
			Versions: true,
		})

	// Create a workerpool for parallel deletion of resources
	parallelism := int64(runtime.NumCPU() - 1)
	sem := semaphore.NewWeighted(parallelism)

	errCh := make(chan error)
	childCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	logger.DebugContext(ctx, "deleting secrets", "parallelism", parallelism)

L:
	for {
		obj, err := it.Next()
		if err == iterator.Done {
			logger.DebugContext(ctx, "out of objects")
			break
		}
		if err != nil {
			logger.ErrorContext(ctx, "failed to get object", "error", err)

			select {
			case <-childCtx.Done():
				logger.DebugContext(ctx, "exiting because context finished")
			case errCh <- fmt.Errorf("failed to list secrets: %w", err):
				logger.DebugContext(ctx, "pushed error onto channel, canceling other jobs")
				cancel()
			default:
				logger.ErrorContext(ctx, "received error, but channel blocked", "error", err)
			}
		}

		// Don't queue more tasks if a failure has been encountered already
		select {
		case <-childCtx.Done():
			logger.DebugContext(ctx, "child context is finished, exiting")
			break L
		default:
			logger := logger.With("generation", obj.Generation)
			logger.DebugContext(ctx, "queueing delete worker")

			if err := sem.Acquire(ctx, 1); err != nil {
				return fmt.Errorf("failed to acquire semaphore: %w", err)
			}

			go func() {
				defer sem.Release(1)

				if err := c.storageClient.
					Bucket(bucket).
					Object(object).
					Generation(obj.Generation).
					Delete(childCtx); err != nil {
					logger.ErrorContext(ctx, "worker failed to delete object", "error", err)

					select {
					case <-childCtx.Done():
					case errCh <- fmt.Errorf("failed to delete generation: %w", err):
						logger.DebugContext(ctx, "worker pushed error onto channel, canceling other jobs")
						cancel()
					default:
						logger.ErrorContext(ctx, "worker received error but channel blocked", "error", err)
						cancel()
					}
				}
			}()
		}
	}

	// Wait for jobs to finish
	logger.DebugContext(ctx, "waiting for delete jobs to finish")
	if err := sem.Acquire(ctx, parallelism); err != nil {
		return fmt.Errorf("failed to wait for jobs to finish: %w", err)
	}

	select {
	case err := <-errCh:
		return fmt.Errorf("failed to delete secret: %w", err)
	default:
		return nil
	}
}