pkg/bundle/pipeline/executor.go (134 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pipeline import ( "context" "fmt" "os" bundlev1 "github.com/elastic/harp/api/gen/go/harp/bundle/v1" "github.com/elastic/harp/pkg/bundle" ) // Run a processor. func Run(ctx context.Context, opts ...Option) error { // Initialize a running context to attach all goroutines ctx, cancel := context.WithCancel(ctx) defer cancel() var ( defaultDisableOutput = false defaultReader = os.Stdin defaultWriter = os.Stdout ) v := &bundleVisitor{ ctx: ctx, opts: &Options{ input: defaultReader, output: defaultWriter, disableOutput: defaultDisableOutput, }, position: &defaultContext{}, } // Loop through each option for _, opt := range opts { opt(v.opts) } // Read bundle from Stdin b, err := bundle.FromContainerReader(v.opts.input) if err != nil { return fmt.Errorf("unable to read bundle from stdin: %w", err) } // Apply remapping strategy v.VisitForFile(b) // Check error if err := v.Error(); err != nil { return fmt.Errorf("error during bundle processing: %w", err) } if !v.opts.disableOutput { // Write output bundle if err := bundle.ToContainerWriter(v.opts.output, b); err != nil { return fmt.Errorf("unable to dump processed bundle content: %w", err) } } // No error return nil } // Apply a pipeline process to the given bundle func Apply(ctx context.Context, input *bundlev1.Bundle, opts ...Option) (*bundlev1.Bundle, error) { v := &bundleVisitor{ ctx: ctx, opts: &Options{}, position: &defaultContext{}, } // Loop through each option for _, opt := range opts { opt(v.opts) } // Apply remapping strategy v.VisitForFile(input) // Check error if err := v.Error(); err != nil { return nil, fmt.Errorf("error during bundle processing: %w", err) } // No error return input, nil } // ----------------------------------------------------------------------------- // Context is used to pass current node location to processor type defaultContext struct { File *bundlev1.Bundle Package *bundlev1.Package Secret *bundlev1.SecretChain KV *bundlev1.KV } func (c *defaultContext) GetFile() *bundlev1.Bundle { return c.File } func (c *defaultContext) GetPackage() *bundlev1.Package { return c.Package } func (c *defaultContext) GetSecret() *bundlev1.SecretChain { return c.Secret } func (c *defaultContext) GetKeyValue() *bundlev1.KV { return c.KV } // ----------------------------------------------------------------------------- type bundleVisitor struct { ctx context.Context err error opts *Options position *defaultContext } func (bv *bundleVisitor) Error() error { return bv.err } func (bv *bundleVisitor) VisitForFile(obj *bundlev1.Bundle) { // Check argument if obj == nil { bv.err = fmt.Errorf("unable to process nil file") return } // Update position bv.position.File = obj // Crawl packages for _, p := range obj.Packages { bv.VisitForPackage(p) } // If processor given use it if bv.opts.fpf != nil { if bv.err = bv.opts.fpf(bv.position, obj); bv.err != nil { return } } } func (bv *bundleVisitor) VisitForPackage(obj *bundlev1.Package) { // Check argument if obj == nil { bv.err = fmt.Errorf("unable to process nil package") return } // Update position bv.position.Package = obj // If package has secrets if obj.Secrets != nil { bv.VisitForChain(obj.Secrets) } // If processor given use it if bv.opts.ppf != nil { if bv.err = bv.opts.ppf(bv.position, obj); bv.err != nil { return } } } func (bv *bundleVisitor) VisitForChain(obj *bundlev1.SecretChain) { // Check argument if obj == nil { bv.err = fmt.Errorf("unable to process nil secret chain") return } // Update position bv.position.Secret = obj // Crawl secret data for _, p := range obj.Data { bv.VisitForKV(p) } // If processor given use it if bv.opts.cpf != nil { if bv.err = bv.opts.cpf(bv.position, obj); bv.err != nil { return } } } func (bv *bundleVisitor) VisitForKV(obj *bundlev1.KV) { // Check argument if obj == nil { bv.err = fmt.Errorf("unable to process nil secret data") return } // Update position bv.position.KV = obj // If processor given use it if bv.opts.kpf != nil { if bv.err = bv.opts.kpf(bv.position, obj); bv.err != nil { return } } }