in pkg/bundle/vault/internal/operation/importer.go [69:212]
func (op *importer) Run(ctx context.Context) error {
// Initialize sub context
g, gctx := errgroup.WithContext(ctx)
// Prepare channels
packageChan := make(chan *bundlev1.Package)
// Validate worker count
if op.maxWorkerCount < 1 {
op.maxWorkerCount = 1
}
// consumers ---------------------------------------------------------------
// Secret writer
g.Go(func() error {
// Initialize a semaphore with maxReaderWorker tokens
sem := semaphore.NewWeighted(op.maxWorkerCount)
// Writer errGroup
gWriter, gWriterCtx := errgroup.WithContext(gctx)
// Listen for message
for secretPackage := range packageChan {
if err := gWriterCtx.Err(); err != nil {
// Stop processing
break
}
// Acquire a token
if err := sem.Acquire(gWriterCtx, 1); err != nil {
return fmt.Errorf("unable to acquire a semaphore token: %w", err)
}
log.For(gWriterCtx).Debug("Writing secret ...", zap.String("prefix", op.prefix), zap.String("path", secretPackage.Name))
// Build function reader
gWriter.Go(func() error {
defer sem.Release(1)
if err := gWriterCtx.Err(); err != nil {
// Context has already an error
return nil
}
// No data to insert
if secretPackage.Secrets == nil {
return nil
}
data := map[string]interface{}{}
// Wrap secret k/v as a map
for _, s := range secretPackage.Secrets.Data {
// Unpack secret to original value
var value interface{}
if err := secret.Unpack(s.Value, &value); err != nil {
return fmt.Errorf("unable to unpack secret value for path '%s' with key '%s': %w", secretPackage.Name, s.Key, err)
}
// Assign to map for vault storage
data[s.Key] = value
}
// Export metadata
metadata := map[string]interface{}{}
if op.withMetadata {
// Has annotations
if len(secretPackage.Annotations) > 0 {
for k, v := range secretPackage.Annotations {
metadata[k] = v
}
}
// Has labels
if len(secretPackage.Labels) > 0 {
for k, v := range secretPackage.Labels {
metadata[fmt.Sprintf("label#%s", k)] = v
}
}
}
// Assemble secret path
secretPath := secretPackage.Name
if op.prefix != "" {
secretPath = path.Join(op.prefix, secretPath)
}
// Extract root backend path
rootPath := strings.Split(vpath.SanitizePath(secretPath), "/")[0]
// Check backend initialization
if _, ok := op.backends[rootPath]; !ok {
// Initialize new service for backend
service, err := kv.New(op.client, rootPath, kv.WithVaultMetatadata(op.withVaultMetadata))
if err != nil {
return fmt.Errorf("unable to initialize Vault service for '%s' KV backend: %w", op.prefix, err)
}
// All queries will be handled by same backend service
op.backendsMutex.Lock()
op.backends[rootPath] = service
op.backendsMutex.Unlock()
}
// Write secret to Vault
if err := op.backends[rootPath].WriteWithMeta(gWriterCtx, secretPath, data, metadata); err != nil {
return fmt.Errorf("unable to write secret data for path '%s': %w", secretPath, err)
}
// No error
return nil
})
}
// No error
return gWriter.Wait()
})
// producers ---------------------------------------------------------------
// Bundle package publisher
g.Go(func() error {
defer close(packageChan)
for _, p := range op.bundle.Packages {
select {
case <-gctx.Done():
return gctx.Err()
case packageChan <- p:
}
}
// No error
return nil
})
// Wait for all goroutime to complete
if err := g.Wait(); err != nil {
return fmt.Errorf("vault operation error: %w", err)
}
// No error
return nil
}