func()

in pkg/bundle/vault/internal/operation/exporter.go [68:287]


func (op *exporter) Run(ctx context.Context) error {
	// Initialize sub context
	g, gctx := errgroup.WithContext(ctx)

	// Prepare channels
	pathChan := make(chan string)

	// Validate worker count
	if op.maxWorkerCount < 1 {
		op.maxWorkerCount = 1
	}

	// Consumers ---------------------------------------------------------------

	// Secret reader
	g.Go(func() error {
		// Initialize a semaphore with maxReaderWorker tokens
		sem := semaphore.NewWeighted(op.maxWorkerCount)

		// Reader errGroup
		gReader, gReaderCtx := errgroup.WithContext(gctx)

		// Listen for message
		for secretPath := range pathChan {
			secPath := secretPath

			if err := gReaderCtx.Err(); err != nil {
				// Stop processing
				break
			}

			// Acquire a token
			if err := sem.Acquire(gReaderCtx, 1); err != nil {
				return fmt.Errorf("unable to acquire a semaphore token: %w", err)
			}

			log.For(gReaderCtx).Debug("Exporting secret ...", zap.String("path", secretPath))

			// Build function reader
			gReader.Go(func() error {
				// Release token on finish
				defer sem.Release(1)

				if err := gReaderCtx.Err(); err != nil {
					// Context has already an error
					return nil
				}

				// Extract desired version from path
				vaultPackagePath, vaultVersion, errPackagePath := extractVersion(secPath)
				if errPackagePath != nil {
					return fmt.Errorf("unable to parse package path '%s': %w", secPath, errPackagePath)
				}

				// Read from Vault
				secretData, secretMeta, errRead := op.service.ReadVersion(gReaderCtx, vaultPackagePath, vaultVersion)
				if errRead != nil {
					// Mask path not found or empty secret value
					if errors.Is(errRead, kv.ErrNoData) || errors.Is(errRead, kv.ErrPathNotFound) {
						log.For(gReaderCtx).Debug("No data / path found for given path", zap.String("path", secPath))
						return nil
					}
					return fmt.Errorf("unexpected vault error: %w", errRead)
				}

				// Prepare secret list
				chain := &bundlev1.SecretChain{
					Version:         uint32(0),
					Data:            make([]*bundlev1.KV, 0),
					NextVersion:     nil,
					PreviousVersion: nil,
				}

				// Prepare metadata holder
				metadata := map[string]string{}

				// Iterate over secret bundle
				for k, v := range secretData {
					// Check for old metadata prefix
					if strings.HasPrefix(strings.ToLower(k), legacyBundleMetadataPrefix) {
						metadata[strings.ToLower(k)] = fmt.Sprintf("%s", v)
						// Ignore secret unpacking for this value
						continue
					}

					// Check for new metadata prefix
					if strings.EqualFold(k, kv.VaultMetadataDataKey) {
						if rawMetadata, ok := v.(map[string]interface{}); ok {
							for k, v := range rawMetadata {
								metadata[k] = fmt.Sprintf("%s", v)
							}
						} else {
							log.For(gReaderCtx).Error("Vault metadata type has unexpected type, processing skipped.", zap.String("path", secPath))
						}

						// Ignore secret unpacking for this value
						continue
					}

					// Pack secret value
					s, errPack := op.packSecret(k, v)
					if errPack != nil {
						return fmt.Errorf("unable to pack secret value for path '%s' with key '%s' : %w", secPath, k, errPack)
					}

					// Add secret to package
					chain.Data = append(chain.Data, s)
				}

				// Prepare the secret package
				pack := &bundlev1.Package{
					Labels:      map[string]string{},
					Annotations: map[string]string{},
					Name:        vaultPackagePath,
					Secrets:     chain,
				}

				// Extract useful metadata
				for k, v := range secretMeta {
					switch k {
					case "version":
						// Convert version
						rawVersion := json.Number(fmt.Sprintf("%s", v))
						version, err := rawVersion.Int64()
						if err != nil {
							log.For(gReaderCtx).Warn("unable to unpack secret version as int64.", zap.Error(err), zap.Any("value", v))
						} else {
							pack.Secrets.Version = uint32(version)
						}
					case "custom_metadata":
						// Check nil
						if types.IsNil(v) {
							continue
						}

						// Copy as metadata
						customMap, ok := v.(map[string]interface{})
						if ok {
							for metaKey, metaValue := range customMap {
								metadata[metaKey] = metaValue.(string)
							}
						} else {
							log.For(gReaderCtx).Warn("unable to unpack secret custom metadata, invalid type.", zap.Any("value", v))
						}
					}
				}

				// Process package metadata distribution
				if op.withMetadata {
					for key, value := range metadata {
						// Merge with package
						switch {
						case strings.HasPrefix(key, "label#"):
							pack.Labels[strings.TrimPrefix(key, "label#")] = value
						case strings.HasPrefix(key, legacyBundleMetadataPrefix):
							// Legacy metadata

							// Clean key
							key = strings.TrimPrefix(key, legacyBundleMetadataPrefix)

							// Unpack value
							var data map[string]string
							if errDecode := json.Unmarshal([]byte(value), &data); errDecode != nil {
								log.For(gReaderCtx).Error("unable to decode package legacy metadata object as JSON", zap.Error(errDecode), zap.String("key", key), zap.String("path", secPath))
								continue
							}

							var meta interface{}

							// Merge with package
							switch key {
							case "#annotations":
								meta = &pack.Annotations
							case "#labels":
								meta = &pack.Labels
							default:
								log.For(gReaderCtx).Warn("unhandled legacy metadata", zap.String("key", key), zap.String("path", secPath))
								continue
							}

							// Merge with Vault metadata
							if errMergo := mergo.MergeWithOverwrite(meta, data, mergo.WithOverride); errMergo != nil {
								log.For(gReaderCtx).Warn("unable to merge package legacy metadata object", zap.Error(errMergo), zap.String("key", key), zap.String("path", secPath))
								continue
							}
						default:
							pack.Annotations[key] = value
						}
					}
				}

				// Publish secret package
				select {
				case <-gReaderCtx.Done():
					return gReaderCtx.Err()
				case op.output <- pack:
					return nil
				}
			})
		}

		return gReader.Wait()
	})

	// Producers ---------------------------------------------------------------

	// Vault crawler
	g.Go(func() error {
		defer close(pathChan)
		return op.walk(gctx, op.path, op.path, pathChan)
	})

	// Wait for all goroutime to complete
	if err := g.Wait(); err != nil {
		return fmt.Errorf("vault operation error: %w", err)
	}

	// No error
	return nil
}