pkg/bundle/vault/internal/operation/importer.go (122 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package operation import ( "context" "fmt" "path" "strings" "sync" "github.com/hashicorp/vault/api" "go.uber.org/zap" bundlev1 "github.com/elastic/harp/api/gen/go/harp/bundle/v1" "github.com/elastic/harp/pkg/bundle/secret" "github.com/elastic/harp/pkg/sdk/log" "github.com/elastic/harp/pkg/vault/kv" vpath "github.com/elastic/harp/pkg/vault/path" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) // Importer initialize a secret importer operation func Importer(client *api.Client, bundleFile *bundlev1.Bundle, prefix string, withMetadata, withVaultMetadata bool, maxWorkerCount int64) Operation { return &importer{ client: client, bundle: bundleFile, prefix: prefix, withMetadata: withMetadata || withVaultMetadata, withVaultMetadata: withVaultMetadata, backends: map[string]kv.Service{}, maxWorkerCount: maxWorkerCount, } } // ----------------------------------------------------------------------------- type importer struct { client *api.Client bundle *bundlev1.Bundle prefix string withMetadata bool withVaultMetadata bool backends map[string]kv.Service backendsMutex sync.RWMutex maxWorkerCount int64 } // Run the implemented operation // //nolint:gocognit,funlen,gocyclo // To refactor func (op *importer) Run(ctx context.Context) error { // Initialize sub context g, gctx := errgroup.WithContext(ctx) // Prepare channels packageChan := make(chan *bundlev1.Package) // Validate worker count if op.maxWorkerCount < 1 { op.maxWorkerCount = 1 } // consumers --------------------------------------------------------------- // Secret writer g.Go(func() error { // Initialize a semaphore with maxReaderWorker tokens sem := semaphore.NewWeighted(op.maxWorkerCount) // Writer errGroup gWriter, gWriterCtx := errgroup.WithContext(gctx) // Listen for message for secretPackage := range packageChan { if err := gWriterCtx.Err(); err != nil { // Stop processing break } // Acquire a token if err := sem.Acquire(gWriterCtx, 1); err != nil { return fmt.Errorf("unable to acquire a semaphore token: %w", err) } log.For(gWriterCtx).Debug("Writing secret ...", zap.String("prefix", op.prefix), zap.String("path", secretPackage.Name)) // Build function reader gWriter.Go(func() error { defer sem.Release(1) if err := gWriterCtx.Err(); err != nil { // Context has already an error return nil } // No data to insert if secretPackage.Secrets == nil { return nil } data := map[string]interface{}{} // Wrap secret k/v as a map for _, s := range secretPackage.Secrets.Data { // Unpack secret to original value var value interface{} if err := secret.Unpack(s.Value, &value); err != nil { return fmt.Errorf("unable to unpack secret value for path '%s' with key '%s': %w", secretPackage.Name, s.Key, err) } // Assign to map for vault storage data[s.Key] = value } // Export metadata metadata := map[string]interface{}{} if op.withMetadata { // Has annotations if len(secretPackage.Annotations) > 0 { for k, v := range secretPackage.Annotations { metadata[k] = v } } // Has labels if len(secretPackage.Labels) > 0 { for k, v := range secretPackage.Labels { metadata[fmt.Sprintf("label#%s", k)] = v } } } // Assemble secret path secretPath := secretPackage.Name if op.prefix != "" { secretPath = path.Join(op.prefix, secretPath) } // Extract root backend path rootPath := strings.Split(vpath.SanitizePath(secretPath), "/")[0] // Check backend initialization if _, ok := op.backends[rootPath]; !ok { // Initialize new service for backend service, err := kv.New(op.client, rootPath, kv.WithVaultMetatadata(op.withVaultMetadata)) if err != nil { return fmt.Errorf("unable to initialize Vault service for '%s' KV backend: %w", op.prefix, err) } // All queries will be handled by same backend service op.backendsMutex.Lock() op.backends[rootPath] = service op.backendsMutex.Unlock() } // Write secret to Vault if err := op.backends[rootPath].WriteWithMeta(gWriterCtx, secretPath, data, metadata); err != nil { return fmt.Errorf("unable to write secret data for path '%s': %w", secretPath, err) } // No error return nil }) } // No error return gWriter.Wait() }) // producers --------------------------------------------------------------- // Bundle package publisher g.Go(func() error { defer close(packageChan) for _, p := range op.bundle.Packages { select { case <-gctx.Done(): return gctx.Err() case packageChan <- p: } } // No error return nil }) // Wait for all goroutime to complete if err := g.Wait(); err != nil { return fmt.Errorf("vault operation error: %w", err) } // No error return nil }