internal/elasticsearch/ingest/datastream.go (263 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package ingest import ( "bytes" "context" "fmt" "io" "net/http" "os" "path/filepath" "regexp" "strings" "time" "gopkg.in/yaml.v3" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/packages" ) var ( ingestPipelineTag = regexp.MustCompile(`{{\s*IngestPipeline.+}}`) defaultPipelineJSON = "default.json" defaultPipelineYML = "default.yml" ) type Rule struct { TargetDataset interface{} `yaml:"target_dataset"` If string `yaml:"if"` Namespace interface{} `yaml:"namespace"` } type RoutingRule struct { SourceDataset string `yaml:"source_dataset"` Rules []Rule `yaml:"rules"` } type RerouteProcessor struct { Tag string `yaml:"tag"` If string `yaml:"if"` Dataset []string `yaml:"dataset"` Namespace []string `yaml:"namespace"` } func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) { dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile)) if err != nil { return "", nil, fmt.Errorf("reading data stream manifest failed: %w", err) } nonce := time.Now().UnixNano() mainPipeline := getPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce) pipelines, err := loadIngestPipelineFiles(dataStreamPath, nonce) if err != nil { return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err) } err = installPipelinesInElasticsearch(ctx, api, pipelines) if err != nil { return "", nil, err } return mainPipeline, pipelines, nil } func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) { elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline") var pipelineFiles []string for _, pattern := range []string{"*.json", "*.yml"} { files, err := filepath.Glob(filepath.Join(elasticsearchPath, pattern)) if err != nil { return nil, fmt.Errorf("listing '%s' in '%s': %w", pattern, elasticsearchPath, err) } pipelineFiles = append(pipelineFiles, files...) } var pipelines []Pipeline for _, path := range pipelineFiles { c, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("reading ingest pipeline failed (path: %s): %w", path, err) } c = ingestPipelineTag.ReplaceAllFunc(c, func(found []byte) []byte { s := strings.Split(string(found), `"`) if len(s) != 3 { err = fmt.Errorf("invalid IngestPipeline tag in template (path: %s)", path) return nil } pipelineTag := s[1] return []byte(getPipelineNameWithNonce(pipelineTag, nonce)) }) if err != nil { return nil, err } cWithRerouteProcessors, err := addRerouteProcessors(c, dataStreamPath, path) if err != nil { return nil, err } name := filepath.Base(path) pipelines = append(pipelines, Pipeline{ Path: path, Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce), Format: filepath.Ext(path)[1:], Content: cWithRerouteProcessors, ContentOriginal: c, }) } return pipelines, nil } func addRerouteProcessors(pipeline []byte, dataStreamPath, path string) ([]byte, error) { // Only attach routing_rules.yml reroute processors after the default pipeline filename := filepath.Base(path) if filename != defaultPipelineJSON && filename != defaultPipelineYML { return pipeline, nil } // Read routing_rules.yml and convert it into reroute processors in ingest pipeline rerouteProcessors, err := loadRoutingRuleFile(dataStreamPath) if err != nil { return nil, fmt.Errorf("failed loading routing rules: %v", err) } if len(rerouteProcessors) == 0 { return pipeline, nil } var yamlPipeline map[string]any err = yaml.Unmarshal(pipeline, &yamlPipeline) if err != nil { return nil, fmt.Errorf("failed to unmarshal ingest pipeline YAML data (path: %s): %w", path, err) } var processors []any v, found := yamlPipeline["processors"] if found { list, ok := v.([]any) if !ok { return nil, fmt.Errorf("unexpected processors type, expected []any, found %T", v) } processors = list } for _, p := range rerouteProcessors { processors = append(processors, p) } yamlPipeline["processors"] = processors pipeline, err = yaml.Marshal(yamlPipeline) if err != nil { return nil, fmt.Errorf("failed to marshal modified ingest pipeline YAML data: %v", err) } return pipeline, nil } func loadRoutingRuleFile(dataStreamPath string) ([]map[string]interface{}, error) { routingRulePath := filepath.Join(dataStreamPath, "routing_rules.yml") c, err := os.ReadFile(routingRulePath) if err != nil { // routing_rules.yml does not exist if os.IsNotExist(err) { return nil, nil } else { return nil, fmt.Errorf("reading routing_rules.yml failed (path: %s): %w", routingRulePath, err) } } // unmarshal yaml into a struct var routingRule []RoutingRule err = yaml.Unmarshal(c, &routingRule) if err != nil { return nil, fmt.Errorf("unmarshalling routing_rules.yml content failed: %w", err) } // Now you can work with the data as Go structs var rerouteProcessors []map[string]interface{} for _, r := range routingRule { for _, rule := range r.Rules { td, err := convertValue(rule.TargetDataset, "target_dataset") if err != nil { return nil, fmt.Errorf("convertValue failed: %w", err) } ns, err := convertValue(rule.Namespace, "namespace") if err != nil { return nil, fmt.Errorf("convertValue failed: %w", err) } processor := make(map[string]interface{}) processor["reroute"] = RerouteProcessor{ Tag: r.SourceDataset, If: rule.If, Dataset: td, Namespace: ns, } rerouteProcessors = append(rerouteProcessors, processor) } } return rerouteProcessors, nil } func convertValue(value interface{}, label string) ([]string, error) { switch value := value.(type) { case string: return []string{value}, nil case []string: return value, nil case []interface{}: result := make([]string, 0, len(value)) for _, v := range value { if vStr, ok := v.(string); ok { result = append(result, vStr) } else { return nil, fmt.Errorf("%s in routing_rules.yml has to be a string or an array of strings: %v", label, value) } } return result, nil case nil: // namespace is not required in routing_rules.yml if label != "namespace" { return nil, fmt.Errorf("%s in routing_rules.yml cannot be empty", label) } return nil, nil default: return nil, fmt.Errorf("%s in routing_rules.yml has to be a string or an array of strings: %v", label, value) } } func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { if err := installPipeline(ctx, api, p); err != nil { return err } } return nil } func pipelineError(err error, pipeline Pipeline, format string, args ...interface{}) error { context := "pipelineName: " + pipeline.Name if pipeline.Path != "" { context += ", path: " + pipeline.Path } errorStr := fmt.Sprintf(format+" ("+context+")", args...) return fmt.Errorf("%s: %w", errorStr, err) } func installPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error { if err := putIngestPipeline(ctx, api, pipeline); err != nil { return err } // Just to be sure the pipeline has been uploaded. return getIngestPipeline(ctx, api, pipeline) } func putIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error { source, err := pipeline.MarshalJSON() if err != nil { return err } r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source), api.Ingest.PutPipeline.WithContext(ctx), ) if err != nil { return pipelineError(err, pipeline, "PutPipeline API call failed") } defer r.Body.Close() body, err := io.ReadAll(r.Body) if err != nil { return pipelineError(err, pipeline, "failed to read PutPipeline API response body") } if r.StatusCode != http.StatusOK { return pipelineError(elasticsearch.NewError(body), pipeline, "unexpected response status for PutPipeline (%d): %s", r.StatusCode, r.Status()) } return nil } func getIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error { r, err := api.Ingest.GetPipeline( api.Ingest.GetPipeline.WithContext(ctx), api.Ingest.GetPipeline.WithPipelineID(pipeline.Name), ) if err != nil { return pipelineError(err, pipeline, "GetPipeline API call failed") } defer r.Body.Close() body, err := io.ReadAll(r.Body) if err != nil { return pipelineError(err, pipeline, "failed to read GetPipeline API response body") } if r.StatusCode != http.StatusOK { return pipelineError(elasticsearch.NewError(body), pipeline, "unexpected response status for GetPipeline (%d): %s", r.StatusCode, r.Status()) } return nil } func getPipelineNameWithNonce(pipelineName string, nonce int64) string { return fmt.Sprintf("%s-%d", pipelineName, nonce) }