x-pack/winlogbeat/module/wintest/simulate.go (229 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package wintest
import (
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net/http"
"os"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
var KeepRunning = flag.Bool("keep-running", false, "don't tear down simulate docker (will print command to manually stop instance)")
// TestCase is a file input and Elasticsearch response set returned by SimulatePipeline.
type TestCase struct {
Path string
Collected []json.RawMessage
Processed []json.RawMessage
Err error
}
// SimulatePipeline runs the Elasticsearch simulate pipeline on the provided host using
// user and pass as authentication. The pipeline used must already exist in the elasticsearch
// instance. The paths is the set of JSON documents to send to simulate.
//
// The returned test cases will contain the name of the input file, the input data,
// the resulting processed documents and any Elasticsearch error messages. If error
// is non-nil, the returned test cases are not valid.
func SimulatePipeline(host, user, pass, pipeline string, paths []string) ([]TestCase, error) {
if host == "" {
return nil, errors.New("missing required host name")
}
cases, err := readRawTestData(paths)
if err != nil {
return nil, err
}
config := elasticsearch.Config{
Addresses: []string{host},
Username: user,
Password: pass,
}
client, err := elasticsearch.NewClient(config)
if err != nil {
return nil, fmt.Errorf("failed to make client: %w", err)
}
for i, k := range cases {
cases[i].Processed, cases[i].Err = simulatePipeline(client.API, pipeline, k.Collected)
for j := range k.Collected {
cases[i].Collected[j], err = marshalNormalizedJSON(cases[i].Collected[j])
if err != nil {
return nil, err
}
}
for j := range cases[i].Processed {
cases[i].Processed[j], err = marshalNormalizedJSON(cases[i].Processed[j])
if err != nil {
return nil, err
}
}
}
return cases, nil
}
// readRawTestData loads the unprocessed data held in the provided paths.
func readRawTestData(paths []string) ([]TestCase, error) {
var cases []TestCase
for _, path := range paths {
events, err := readEvents(path)
if err != nil {
return nil, err
}
cases = append(cases, TestCase{
Path: path,
Collected: events,
})
}
return cases, nil
}
func readEvents(path string) ([]json.RawMessage, error) {
b, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var events []json.RawMessage
err = json.Unmarshal(b, &events)
return events, err
}
// simulatePipeline runs a single simulate query on the specified pipeline
// with the provided documents.
func simulatePipeline(api *esapi.API, pipeline string, docs []json.RawMessage) ([]json.RawMessage, error) {
var request simulatePipelineRequest
for _, event := range docs {
request.Docs = append(request.Docs, pipelineDocument{
Source: event,
})
}
requestBody, err := json.Marshal(request)
if err != nil {
return nil, fmt.Errorf("marshaling simulate request failed: %w", err)
}
resp, err := api.Ingest.Simulate(bytes.NewReader(requestBody), func(request *esapi.IngestSimulateRequest) {
request.PipelineID = pipeline
})
if err != nil {
return nil, fmt.Errorf("failed to simulate %q pipeline: %w", pipeline, err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read simulate response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response status for simulate: %s (%d): %w", resp.Status(), resp.StatusCode, newError(body))
}
var response simulatePipelineResponse
err = json.Unmarshal(body, &response)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal simulate response: %w", err)
}
var events []json.RawMessage
for _, doc := range response.Docs {
events = append(events, doc.Doc.Source)
}
return events, nil
}
type simulatePipelineRequest struct {
Docs []pipelineDocument `json:"docs"`
}
type simulatePipelineResponse struct {
Docs []pipelineIngestedDocument `json:"docs"`
}
type pipelineIngestedDocument struct {
Doc pipelineDocument `json:"doc"`
}
type pipelineDocument struct {
Source json.RawMessage `json:"_source"`
}
// newError returns a new error constructed from the given response body.
// This assumes the body contains a JSON encoded error. If the body cannot
// be parsed then an error is returned that contains the raw body.
func newError(body []byte) error {
var msg struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
ProcessorType string `json:"processor_type,omitempty"`
ScriptStack []string `json:"script_stack,omitempty"`
Script string `json:"script,omitempty"`
Lang string `json:"lang,omitempty"`
Position struct {
Offset int `json:"offset"`
Start int `json:"start"`
End int `json:"end"`
} `json:"position,omitempty"`
Suppressed []struct {
Type string `json:"type"`
Reason string `json:"reason"`
ProcessorType string `json:"processor_type"`
} `json:"suppressed,omitempty"`
} `json:"root_cause,omitempty"`
Type string `json:"type"`
Reason string `json:"reason"`
ProcessorType string `json:"processor_type,omitempty"`
ScriptStack []string `json:"script_stack,omitempty"`
Script string `json:"script,omitempty"`
Lang string `json:"lang,omitempty"`
Position struct {
Offset int `json:"offset"`
Start int `json:"start"`
End int `json:"end"`
} `json:"position,omitempty"`
CausedBy struct {
Type string `json:"type"`
Reason string `json:"reason"`
CausedBy struct {
Type string `json:"type"`
Reason interface{} `json:"reason"`
} `json:"caused_by,omitempty"`
} `json:"caused_by,omitempty"`
Suppressed []struct {
Type string `json:"type"`
Reason string `json:"reason"`
ProcessorType string `json:"processor_type"`
} `json:"suppressed,omitempty"`
} `json:"error"`
Status int `json:"status"`
}
err := json.Unmarshal(body, &msg)
if err != nil {
// Fall back to including to raw body if it cannot be parsed.
return fmt.Errorf("elasticsearch error: %s", body)
}
if len(msg.Error.RootCause) > 0 {
cause, _ := json.MarshalIndent(msg.Error.RootCause, "", " ")
return fmt.Errorf("elasticsearch error (type=%s): %s\nRoot cause:\n%s", msg.Error.Type, msg.Error.Reason, cause)
}
return fmt.Errorf("elasticsearch error (type=%s): %s", msg.Error.Type, msg.Error.Reason)
}
// marshalNormalizedJSON marshals test results ensuring that field
// order remains consistent independent of field order returned by
// ES to minimize diff noise during changes.
func marshalNormalizedJSON(v interface{}) ([]byte, error) {
msg, err := json.Marshal(v)
if err != nil {
return msg, err
}
var obj interface{}
err = jsonUnmarshalUsingNumber(msg, &obj)
if err != nil {
return msg, err
}
return json.MarshalIndent(obj, "", " ")
}
// jsonUnmarshalUsingNumber is a drop-in replacement for json.Unmarshal that
// does not default to unmarshaling numeric values to float64 in order to
// prevent low bit truncation of values greater than 1<<53.
// See https://golang.org/cl/6202068 for details.
func jsonUnmarshalUsingNumber(data []byte, v interface{}) error {
dec := json.NewDecoder(bytes.NewReader(data))
dec.UseNumber()
err := dec.Decode(v)
if err != nil {
if err == io.EOF { //nolint:errorlint // Bad linter! // io.EOF is never wrapped.
return errors.New("unexpected end of JSON input")
}
return err
}
// Make sure there is no more data after the message
// to approximate json.Unmarshal's behaviour.
if dec.More() {
return fmt.Errorf("more data after top-level value")
}
return nil
}
// ErrorMessage returns any Elasticsearch error.message in the provided
// JSON data.
func ErrorMessage(msg json.RawMessage) error {
var event struct {
Error struct {
Message interface{}
}
}
err := json.Unmarshal(msg, &event)
if err != nil {
return fmt.Errorf("can't unmarshal event to check pipeline error: %#q: %w", msg, err)
}
switch m := event.Error.Message.(type) {
case nil:
return nil
case string, []string:
return fmt.Errorf("unexpected pipeline error: %s", m)
default:
return fmt.Errorf("unexpected pipeline error (unexpected error.message type %T): %[1]v", m)
}
}