pkg/bundle/vault/internal/operation/exporter.go (229 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package operation
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"path"
"strconv"
"strings"
"github.com/imdario/mergo"
"go.uber.org/zap"
bundlev1 "github.com/elastic/harp/api/gen/go/harp/bundle/v1"
"github.com/elastic/harp/pkg/bundle/secret"
"github.com/elastic/harp/pkg/sdk/log"
"github.com/elastic/harp/pkg/sdk/types"
"github.com/elastic/harp/pkg/vault/kv"
vaultPath "github.com/elastic/harp/pkg/vault/path"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
// Exporter initialize a secret exporter operation
func Exporter(service kv.Service, backendPath string, output chan *bundlev1.Package, withMetadata bool, maxWorkerCount int64) Operation {
return &exporter{
service: service,
path: backendPath,
withMetadata: withMetadata,
output: output,
maxWorkerCount: maxWorkerCount,
}
}
// -----------------------------------------------------------------------------
type exporter struct {
service kv.Service
path string
withMetadata bool
output chan *bundlev1.Package
maxWorkerCount int64
}
// Run the implemented operation
//
//nolint:funlen,gocognit,gocyclo // refactor
func (op *exporter) Run(ctx context.Context) error {
// Initialize sub context
g, gctx := errgroup.WithContext(ctx)
// Prepare channels
pathChan := make(chan string)
// Validate worker count
if op.maxWorkerCount < 1 {
op.maxWorkerCount = 1
}
// Consumers ---------------------------------------------------------------
// Secret reader
g.Go(func() error {
// Initialize a semaphore with maxReaderWorker tokens
sem := semaphore.NewWeighted(op.maxWorkerCount)
// Reader errGroup
gReader, gReaderCtx := errgroup.WithContext(gctx)
// Listen for message
for secretPath := range pathChan {
secPath := secretPath
if err := gReaderCtx.Err(); err != nil {
// Stop processing
break
}
// Acquire a token
if err := sem.Acquire(gReaderCtx, 1); err != nil {
return fmt.Errorf("unable to acquire a semaphore token: %w", err)
}
log.For(gReaderCtx).Debug("Exporting secret ...", zap.String("path", secretPath))
// Build function reader
gReader.Go(func() error {
// Release token on finish
defer sem.Release(1)
if err := gReaderCtx.Err(); err != nil {
// Context has already an error
return nil
}
// Extract desired version from path
vaultPackagePath, vaultVersion, errPackagePath := extractVersion(secPath)
if errPackagePath != nil {
return fmt.Errorf("unable to parse package path '%s': %w", secPath, errPackagePath)
}
// Read from Vault
secretData, secretMeta, errRead := op.service.ReadVersion(gReaderCtx, vaultPackagePath, vaultVersion)
if errRead != nil {
// Mask path not found or empty secret value
if errors.Is(errRead, kv.ErrNoData) || errors.Is(errRead, kv.ErrPathNotFound) {
log.For(gReaderCtx).Debug("No data / path found for given path", zap.String("path", secPath))
return nil
}
return fmt.Errorf("unexpected vault error: %w", errRead)
}
// Prepare secret list
chain := &bundlev1.SecretChain{
Version: uint32(0),
Data: make([]*bundlev1.KV, 0),
NextVersion: nil,
PreviousVersion: nil,
}
// Prepare metadata holder
metadata := map[string]string{}
// Iterate over secret bundle
for k, v := range secretData {
// Check for old metadata prefix
if strings.HasPrefix(strings.ToLower(k), legacyBundleMetadataPrefix) {
metadata[strings.ToLower(k)] = fmt.Sprintf("%s", v)
// Ignore secret unpacking for this value
continue
}
// Check for new metadata prefix
if strings.EqualFold(k, kv.VaultMetadataDataKey) {
if rawMetadata, ok := v.(map[string]interface{}); ok {
for k, v := range rawMetadata {
metadata[k] = fmt.Sprintf("%s", v)
}
} else {
log.For(gReaderCtx).Error("Vault metadata type has unexpected type, processing skipped.", zap.String("path", secPath))
}
// Ignore secret unpacking for this value
continue
}
// Pack secret value
s, errPack := op.packSecret(k, v)
if errPack != nil {
return fmt.Errorf("unable to pack secret value for path '%s' with key '%s' : %w", secPath, k, errPack)
}
// Add secret to package
chain.Data = append(chain.Data, s)
}
// Prepare the secret package
pack := &bundlev1.Package{
Labels: map[string]string{},
Annotations: map[string]string{},
Name: vaultPackagePath,
Secrets: chain,
}
// Extract useful metadata
for k, v := range secretMeta {
switch k {
case "version":
// Convert version
rawVersion := json.Number(fmt.Sprintf("%s", v))
version, err := rawVersion.Int64()
if err != nil {
log.For(gReaderCtx).Warn("unable to unpack secret version as int64.", zap.Error(err), zap.Any("value", v))
} else {
pack.Secrets.Version = uint32(version)
}
case "custom_metadata":
// Check nil
if types.IsNil(v) {
continue
}
// Copy as metadata
customMap, ok := v.(map[string]interface{})
if ok {
for metaKey, metaValue := range customMap {
metadata[metaKey] = metaValue.(string)
}
} else {
log.For(gReaderCtx).Warn("unable to unpack secret custom metadata, invalid type.", zap.Any("value", v))
}
}
}
// Process package metadata distribution
if op.withMetadata {
for key, value := range metadata {
// Merge with package
switch {
case strings.HasPrefix(key, "label#"):
pack.Labels[strings.TrimPrefix(key, "label#")] = value
case strings.HasPrefix(key, legacyBundleMetadataPrefix):
// Legacy metadata
// Clean key
key = strings.TrimPrefix(key, legacyBundleMetadataPrefix)
// Unpack value
var data map[string]string
if errDecode := json.Unmarshal([]byte(value), &data); errDecode != nil {
log.For(gReaderCtx).Error("unable to decode package legacy metadata object as JSON", zap.Error(errDecode), zap.String("key", key), zap.String("path", secPath))
continue
}
var meta interface{}
// Merge with package
switch key {
case "#annotations":
meta = &pack.Annotations
case "#labels":
meta = &pack.Labels
default:
log.For(gReaderCtx).Warn("unhandled legacy metadata", zap.String("key", key), zap.String("path", secPath))
continue
}
// Merge with Vault metadata
if errMergo := mergo.MergeWithOverwrite(meta, data, mergo.WithOverride); errMergo != nil {
log.For(gReaderCtx).Warn("unable to merge package legacy metadata object", zap.Error(errMergo), zap.String("key", key), zap.String("path", secPath))
continue
}
default:
pack.Annotations[key] = value
}
}
}
// Publish secret package
select {
case <-gReaderCtx.Done():
return gReaderCtx.Err()
case op.output <- pack:
return nil
}
})
}
return gReader.Wait()
})
// Producers ---------------------------------------------------------------
// Vault crawler
g.Go(func() error {
defer close(pathChan)
return op.walk(gctx, op.path, op.path, pathChan)
})
// Wait for all goroutime to complete
if err := g.Wait(); err != nil {
return fmt.Errorf("vault operation error: %w", err)
}
// No error
return nil
}
// -----------------------------------------------------------------------------
func (op *exporter) walk(ctx context.Context, basePath, currPath string, keys chan string) error {
// List secret of basepath
res, err := op.service.List(ctx, basePath)
if err != nil {
return fmt.Errorf("unable to list secret entries for '%s': %w", basePath, err)
}
// Check path is a leaf
if res == nil {
select {
case <-ctx.Done():
return ctx.Err()
case keys <- currPath:
}
return nil
}
// Iterate on all subpath
for _, p := range res {
if err := op.walk(ctx, path.Join(basePath, p), path.Join(currPath, p), keys); err != nil {
return fmt.Errorf("unable to walk '%s' : %w", path.Join(basePath, p), err)
}
}
// No error
return nil
}
func (op *exporter) packSecret(key string, value interface{}) (*bundlev1.KV, error) {
// Pack secret value
payload, err := secret.Pack(value)
if err != nil {
return nil, fmt.Errorf("unable to pack secret '%s': %w", key, err)
}
// Build the secret object
return &bundlev1.KV{
Key: key,
Type: fmt.Sprintf("%T", value),
Value: payload,
}, nil
}
func extractVersion(packagePath string) (mountPath string, backendVersion uint32, err error) {
// Check arguments
if packagePath == "" {
return "", 0, fmt.Errorf("unable to extract path and version from an empty string")
}
// Looks a little hack-ish for me
u, err := url.ParseRequestURI(fmt.Sprintf("harp://bundle/%s", packagePath))
if err != nil {
return "", 0, fmt.Errorf("unable to parse package path: %w", err)
}
// Get version
versionRaw := u.Query().Get("version")
if versionRaw == "" {
// Get latest
return vaultPath.SanitizePath(u.Path), 0, nil
}
// Convert
versionUnit, errParse := strconv.ParseUint(versionRaw, 10, 32)
if errParse != nil {
return "", 0, fmt.Errorf("unable to parse version as a valid integer: %w", err)
}
// Return path elements
return vaultPath.SanitizePath(u.Path), uint32(versionUnit), nil
}