internal/dump/ingestpipelines.go (102 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package dump
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"slices"
"github.com/elastic/elastic-package/internal/elasticsearch"
)
// IngestPipeline contains the information needed to export an ingest pipeline.
type IngestPipeline struct {
Processors []struct {
Pipeline *struct {
Name string `json:"name"`
} `json:"pipeline,omitempty"`
} `json:"processors"`
id string
raw []byte
}
// Name returns the name of the ingest pipeline.
func (p IngestPipeline) Name() string {
return p.id
}
// JSON returns the JSON representation of the ingest pipeline.
func (p IngestPipeline) JSON() []byte {
return p.raw
}
func getIngestPipelines(ctx context.Context, api *elasticsearch.API, ids ...string) ([]IngestPipeline, error) {
if len(ids) == 0 {
return nil, nil
}
var pipelines []IngestPipeline
var collected []string
pending := ids
for len(pending) > 0 {
for _, id := range pending {
resultPipelines, err := getIngestPipelineByID(ctx, api, id)
if err != nil {
return nil, err
}
pipelines = append(pipelines, resultPipelines...)
}
collected = append(collected, pending...)
pending = pendingNestedPipelines(pipelines, collected)
}
return pipelines, nil
}
type getIngestPipelineResponse map[string]json.RawMessage
func getIngestPipelineByID(ctx context.Context, api *elasticsearch.API, id string) ([]IngestPipeline, error) {
resp, err := api.Ingest.GetPipeline(
api.Ingest.GetPipeline.WithContext(ctx),
api.Ingest.GetPipeline.WithPipelineID(id),
)
if err != nil {
return nil, fmt.Errorf("failed to get ingest pipeline %s: %w", id, err)
}
defer resp.Body.Close()
// Ingest templates referenced by other templates may not exist.
if resp.StatusCode == http.StatusNotFound {
return nil, nil
}
if resp.IsError() {
return nil, fmt.Errorf("failed to get ingest pipeline %s: %s", id, resp.String())
}
d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
var pipelinesResponse getIngestPipelineResponse
err = json.Unmarshal(d, &pipelinesResponse)
if err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
var pipelines []IngestPipeline
for id, raw := range pipelinesResponse {
var pipeline IngestPipeline
err := json.Unmarshal(raw, &pipeline)
if err != nil {
return nil, fmt.Errorf("failed to decode pipeline %s: %w", id, err)
}
pipeline.id = id
pipeline.raw = raw
pipelines = append(pipelines, pipeline)
}
return pipelines, nil
}
func pendingNestedPipelines(pipelines []IngestPipeline, collected []string) []string {
var names []string
for _, p := range pipelines {
for _, processor := range p.Processors {
if processor.Pipeline == nil {
continue
}
name := processor.Pipeline.Name
if slices.Contains(collected, name) {
continue
}
if slices.Contains(names, name) {
continue
}
names = append(names, name)
}
}
return names
}