internal/elasticsearch/ingest/pipeline.go (115 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package ingest
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"gopkg.in/yaml.v3"
"github.com/elastic/elastic-package/internal/elasticsearch"
)
type simulatePipelineRequest struct {
Docs []pipelineDocument `json:"docs"`
}
type simulatePipelineResponse struct {
Docs []pipelineIngestedDocument `json:"docs"`
}
type pipelineDocument struct {
Index string `json:"_index"`
Source json.RawMessage `json:"_source"`
}
type pipelineIngestedDocument struct {
Doc pipelineDocument `json:"doc"`
}
// Pipeline represents a pipeline resource loaded from a file
type Pipeline struct {
Path string // Path of the file with the pipeline definition.
Name string // Name of the pipeline.
Format string // Format (extension) of the pipeline.
Content []byte // Content is the pipeline file contents with reroute processors if any.
ContentOriginal []byte // Content is the original file contents.
}
// Filename returns the original filename associated with the pipeline.
func (p *Pipeline) Filename() string {
pos := strings.LastIndexByte(p.Name, '-')
if pos == -1 {
pos = len(p.Name)
}
return p.Name[:pos] + "." + p.Format
}
// MarshalJSON returns the pipeline contents in JSON format.
func (p *Pipeline) MarshalJSON() (asJSON []byte, err error) {
switch p.Format {
case "json":
asJSON = p.Content
case "yaml", "yml":
var node map[string]interface{}
err = yaml.Unmarshal(p.Content, &node)
if err != nil {
return nil, fmt.Errorf("unmarshalling pipeline content failed (pipeline: %s): %w", p.Name, err)
}
if asJSON, err = json.Marshal(node); err != nil {
return nil, fmt.Errorf("marshalling pipeline content failed (pipeline: %s): %w", p.Name, err)
}
default:
return nil, fmt.Errorf("unsupported pipeline format '%s' (pipeline: %s)", p.Format, p.Name)
}
return asJSON, nil
}
func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName string, events []json.RawMessage, simulateDataStream string) ([]json.RawMessage, error) {
var request simulatePipelineRequest
for _, event := range events {
request.Docs = append(request.Docs, pipelineDocument{
Index: simulateDataStream,
Source: event,
})
}
requestBody, err := json.Marshal(&request)
if err != nil {
return nil, fmt.Errorf("marshalling simulate request failed: %w", err)
}
r, err := api.Ingest.Simulate(bytes.NewReader(requestBody),
api.Ingest.Simulate.WithContext(ctx),
api.Ingest.Simulate.WithPipelineID(pipelineName),
)
if err != nil {
return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err)
}
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("failed to read Simulate API response body: %w", err)
}
if r.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response status for Simulate (%d): %s: %w", r.StatusCode, r.Status(), elasticsearch.NewError(body))
}
var response simulatePipelineResponse
err = json.Unmarshal(body, &response)
if err != nil {
return nil, fmt.Errorf("unmarshalling simulate request failed: %w", err)
}
processedEvents := make([]json.RawMessage, len(response.Docs))
for i, doc := range response.Docs {
processedEvents[i] = doc.Doc.Source
}
return processedEvents, nil
}
func UninstallPipelines(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
for _, p := range pipelines {
err := uninstallPipeline(ctx, api, p.Name)
if err != nil {
return err
}
}
return nil
}
func uninstallPipeline(ctx context.Context, api *elasticsearch.API, name string) error {
resp, err := api.Ingest.DeletePipeline(name, api.Ingest.DeletePipeline.WithContext(ctx))
if err != nil {
return fmt.Errorf("delete pipeline API call failed (pipelineName: %s): %w", name, err)
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to uninstall pipeline %s: %s", name, resp.String())
}
return nil
}