in pkg/bundle/vault/pull.go [83:173]
func runPull(ctx context.Context, client *api.Client, paths []string, opts *options) (*bundlev1.Bundle, error) {
var res *bundlev1.Bundle
// Initialize operation
packageChan := make(chan *bundlev1.Package)
// Prepare output
g, gctx := errgroup.WithContext(ctx)
// Preprocess paths
if len(opts.exclusions) > 0 {
paths = collect(paths, opts.exclusions, false)
}
if len(opts.includes) > 0 {
paths = collect(paths, opts.includes, true)
}
// Fork consumer
// Secret packages consumer
g.Go(func() error {
b := &bundlev1.Bundle{}
// Wait for all packages
for p := range packageChan {
b.Packages = append(b.Packages, p)
}
// Assign result
res = b
// No error
return nil
})
// Fork reader
g.Go(func() error {
defer close(packageChan)
gReader, gReaderctx := errgroup.WithContext(gctx)
// Wrap process in a builder to be able to pass p parameter
exportBuilder := func(p string) func() error {
return func() error {
// Create dedicated service reader
service, err := kv.New(client, p, kv.WithVaultMetatadata(opts.withVaultMetadata))
if err != nil {
return fmt.Errorf("unable to prepare vault reader for path '%s': %w", p, err)
}
// Create an exporter
op := operation.Exporter(service, vpath.SanitizePath(p), packageChan, opts.withSecretMetadata, opts.workerCount)
// Run the job
if err := op.Run(gReaderctx); err != nil {
return fmt.Errorf("unable to export secret values for path `%s': %w", p, err)
}
// No error
return nil
}
}
// Generate producers
for _, p := range paths {
// For the process
gReader.Go(exportBuilder(p))
}
// Wait for all producers to finish
if err := gReader.Wait(); err != nil {
return fmt.Errorf("unable to read secrets: %w", err)
}
// No error
return nil
})
// Wait for completion
if err := g.Wait(); err != nil {
return nil, fmt.Errorf("unable to pull secrets: %w", err)
}
// Check bundle result
if res == nil {
return nil, fmt.Errorf("result bundle is nil")
}
// No error
return res, nil
}