func()

in pkg/bundle/vault/internal/operation/importer.go [69:212]


func (op *importer) Run(ctx context.Context) error {
	// Initialize sub context
	g, gctx := errgroup.WithContext(ctx)

	// Prepare channels
	packageChan := make(chan *bundlev1.Package)

	// Validate worker count
	if op.maxWorkerCount < 1 {
		op.maxWorkerCount = 1
	}

	// consumers ---------------------------------------------------------------

	// Secret writer
	g.Go(func() error {
		// Initialize a semaphore with maxReaderWorker tokens
		sem := semaphore.NewWeighted(op.maxWorkerCount)

		// Writer errGroup
		gWriter, gWriterCtx := errgroup.WithContext(gctx)

		// Listen for message
		for secretPackage := range packageChan {
			if err := gWriterCtx.Err(); err != nil {
				// Stop processing
				break
			}

			// Acquire a token
			if err := sem.Acquire(gWriterCtx, 1); err != nil {
				return fmt.Errorf("unable to acquire a semaphore token: %w", err)
			}

			log.For(gWriterCtx).Debug("Writing secret ...", zap.String("prefix", op.prefix), zap.String("path", secretPackage.Name))

			// Build function reader
			gWriter.Go(func() error {
				defer sem.Release(1)

				if err := gWriterCtx.Err(); err != nil {
					// Context has already an error
					return nil
				}

				// No data to insert
				if secretPackage.Secrets == nil {
					return nil
				}

				data := map[string]interface{}{}
				// Wrap secret k/v as a map
				for _, s := range secretPackage.Secrets.Data {
					// Unpack secret to original value
					var value interface{}
					if err := secret.Unpack(s.Value, &value); err != nil {
						return fmt.Errorf("unable to unpack secret value for path '%s' with key '%s': %w", secretPackage.Name, s.Key, err)
					}

					// Assign to map for vault storage
					data[s.Key] = value
				}

				// Export metadata
				metadata := map[string]interface{}{}
				if op.withMetadata {
					// Has annotations
					if len(secretPackage.Annotations) > 0 {
						for k, v := range secretPackage.Annotations {
							metadata[k] = v
						}
					}

					// Has labels
					if len(secretPackage.Labels) > 0 {
						for k, v := range secretPackage.Labels {
							metadata[fmt.Sprintf("label#%s", k)] = v
						}
					}
				}

				// Assemble secret path
				secretPath := secretPackage.Name
				if op.prefix != "" {
					secretPath = path.Join(op.prefix, secretPath)
				}

				// Extract root backend path
				rootPath := strings.Split(vpath.SanitizePath(secretPath), "/")[0]

				// Check backend initialization
				if _, ok := op.backends[rootPath]; !ok {
					// Initialize new service for backend
					service, err := kv.New(op.client, rootPath, kv.WithVaultMetatadata(op.withVaultMetadata))
					if err != nil {
						return fmt.Errorf("unable to initialize Vault service for '%s' KV backend: %w", op.prefix, err)
					}

					// All queries will be handled by same backend service
					op.backendsMutex.Lock()
					op.backends[rootPath] = service
					op.backendsMutex.Unlock()
				}

				// Write secret to Vault
				if err := op.backends[rootPath].WriteWithMeta(gWriterCtx, secretPath, data, metadata); err != nil {
					return fmt.Errorf("unable to write secret data for path '%s': %w", secretPath, err)
				}

				// No error
				return nil
			})
		}

		// No error
		return gWriter.Wait()
	})

	// producers ---------------------------------------------------------------

	// Bundle package publisher
	g.Go(func() error {
		defer close(packageChan)

		for _, p := range op.bundle.Packages {
			select {
			case <-gctx.Done():
				return gctx.Err()
			case packageChan <- p:
			}
		}

		// No error
		return nil
	})

	// Wait for all goroutime to complete
	if err := g.Wait(); err != nil {
		return fmt.Errorf("vault operation error: %w", err)
	}

	// No error
	return nil
}