in pkg/bundle/vault/internal/operation/exporter.go [68:287]
func (op *exporter) Run(ctx context.Context) error {
// Initialize sub context
g, gctx := errgroup.WithContext(ctx)
// Prepare channels
pathChan := make(chan string)
// Validate worker count
if op.maxWorkerCount < 1 {
op.maxWorkerCount = 1
}
// Consumers ---------------------------------------------------------------
// Secret reader
g.Go(func() error {
// Initialize a semaphore with maxReaderWorker tokens
sem := semaphore.NewWeighted(op.maxWorkerCount)
// Reader errGroup
gReader, gReaderCtx := errgroup.WithContext(gctx)
// Listen for message
for secretPath := range pathChan {
secPath := secretPath
if err := gReaderCtx.Err(); err != nil {
// Stop processing
break
}
// Acquire a token
if err := sem.Acquire(gReaderCtx, 1); err != nil {
return fmt.Errorf("unable to acquire a semaphore token: %w", err)
}
log.For(gReaderCtx).Debug("Exporting secret ...", zap.String("path", secretPath))
// Build function reader
gReader.Go(func() error {
// Release token on finish
defer sem.Release(1)
if err := gReaderCtx.Err(); err != nil {
// Context has already an error
return nil
}
// Extract desired version from path
vaultPackagePath, vaultVersion, errPackagePath := extractVersion(secPath)
if errPackagePath != nil {
return fmt.Errorf("unable to parse package path '%s': %w", secPath, errPackagePath)
}
// Read from Vault
secretData, secretMeta, errRead := op.service.ReadVersion(gReaderCtx, vaultPackagePath, vaultVersion)
if errRead != nil {
// Mask path not found or empty secret value
if errors.Is(errRead, kv.ErrNoData) || errors.Is(errRead, kv.ErrPathNotFound) {
log.For(gReaderCtx).Debug("No data / path found for given path", zap.String("path", secPath))
return nil
}
return fmt.Errorf("unexpected vault error: %w", errRead)
}
// Prepare secret list
chain := &bundlev1.SecretChain{
Version: uint32(0),
Data: make([]*bundlev1.KV, 0),
NextVersion: nil,
PreviousVersion: nil,
}
// Prepare metadata holder
metadata := map[string]string{}
// Iterate over secret bundle
for k, v := range secretData {
// Check for old metadata prefix
if strings.HasPrefix(strings.ToLower(k), legacyBundleMetadataPrefix) {
metadata[strings.ToLower(k)] = fmt.Sprintf("%s", v)
// Ignore secret unpacking for this value
continue
}
// Check for new metadata prefix
if strings.EqualFold(k, kv.VaultMetadataDataKey) {
if rawMetadata, ok := v.(map[string]interface{}); ok {
for k, v := range rawMetadata {
metadata[k] = fmt.Sprintf("%s", v)
}
} else {
log.For(gReaderCtx).Error("Vault metadata type has unexpected type, processing skipped.", zap.String("path", secPath))
}
// Ignore secret unpacking for this value
continue
}
// Pack secret value
s, errPack := op.packSecret(k, v)
if errPack != nil {
return fmt.Errorf("unable to pack secret value for path '%s' with key '%s' : %w", secPath, k, errPack)
}
// Add secret to package
chain.Data = append(chain.Data, s)
}
// Prepare the secret package
pack := &bundlev1.Package{
Labels: map[string]string{},
Annotations: map[string]string{},
Name: vaultPackagePath,
Secrets: chain,
}
// Extract useful metadata
for k, v := range secretMeta {
switch k {
case "version":
// Convert version
rawVersion := json.Number(fmt.Sprintf("%s", v))
version, err := rawVersion.Int64()
if err != nil {
log.For(gReaderCtx).Warn("unable to unpack secret version as int64.", zap.Error(err), zap.Any("value", v))
} else {
pack.Secrets.Version = uint32(version)
}
case "custom_metadata":
// Check nil
if types.IsNil(v) {
continue
}
// Copy as metadata
customMap, ok := v.(map[string]interface{})
if ok {
for metaKey, metaValue := range customMap {
metadata[metaKey] = metaValue.(string)
}
} else {
log.For(gReaderCtx).Warn("unable to unpack secret custom metadata, invalid type.", zap.Any("value", v))
}
}
}
// Process package metadata distribution
if op.withMetadata {
for key, value := range metadata {
// Merge with package
switch {
case strings.HasPrefix(key, "label#"):
pack.Labels[strings.TrimPrefix(key, "label#")] = value
case strings.HasPrefix(key, legacyBundleMetadataPrefix):
// Legacy metadata
// Clean key
key = strings.TrimPrefix(key, legacyBundleMetadataPrefix)
// Unpack value
var data map[string]string
if errDecode := json.Unmarshal([]byte(value), &data); errDecode != nil {
log.For(gReaderCtx).Error("unable to decode package legacy metadata object as JSON", zap.Error(errDecode), zap.String("key", key), zap.String("path", secPath))
continue
}
var meta interface{}
// Merge with package
switch key {
case "#annotations":
meta = &pack.Annotations
case "#labels":
meta = &pack.Labels
default:
log.For(gReaderCtx).Warn("unhandled legacy metadata", zap.String("key", key), zap.String("path", secPath))
continue
}
// Merge with Vault metadata
if errMergo := mergo.MergeWithOverwrite(meta, data, mergo.WithOverride); errMergo != nil {
log.For(gReaderCtx).Warn("unable to merge package legacy metadata object", zap.Error(errMergo), zap.String("key", key), zap.String("path", secPath))
continue
}
default:
pack.Annotations[key] = value
}
}
}
// Publish secret package
select {
case <-gReaderCtx.Done():
return gReaderCtx.Err()
case op.output <- pack:
return nil
}
})
}
return gReader.Wait()
})
// Producers ---------------------------------------------------------------
// Vault crawler
g.Go(func() error {
defer close(pathChan)
return op.walk(gctx, op.path, op.path, pathChan)
})
// Wait for all goroutime to complete
if err := g.Wait(); err != nil {
return fmt.Errorf("vault operation error: %w", err)
}
// No error
return nil
}