dev/import-beats/streams_config_parser.go (329 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package main import ( "bytes" "fmt" "regexp" "text/template/parse" "github.com/pkg/errors" "github.com/elastic/package-registry/packages" ) type streamConfigParsed struct { tree *parse.Tree } func parseStreamConfig(content []byte) (*streamConfigParsed, error) { mapOfParsed, err := parse.Parse("input-config", string(content), "", "", map[string]interface{}{ "eq": func() {}, "printf": func() {}, "tojson": func() {}, "inList": func() {}, }) if err != nil { return nil, errors.Wrapf(err, "parsing template failed") } return &streamConfigParsed{ tree: mapOfParsed["input-config"], }, nil } func (scp *streamConfigParsed) inputTypes() []string { return uniqueStringValues(inputTypesForNode(scp.tree.Root)) } func inputTypesForNode(node parse.Node) []string { textNode, isTextNode := node.(*parse.TextNode) if isTextNode { inputType, ok := extractInputTypeFromTextNode(textNode) if ok { return []string{inputType} } return nil } listNode, isListNode := node.(*parse.ListNode) if isListNode { return inputTypesForListNode(listNode) } ifNode, isIfNode := node.(*parse.IfNode) if isIfNode { var inputTypes []string if ifNode.List != nil { inputTypes = append(inputTypes, inputTypesForListNode(ifNode.List)...) } if ifNode.ElseList != nil { inputTypes = append(inputTypes, inputTypesForListNode(ifNode.ElseList)...) } return inputTypes } return nil } func extractInputTypeFromTextNode(textNode *parse.TextNode) (string, bool) { i := bytes.Index(textNode.Text, []byte("type: ")) if i > -1 && (i == 0 || textNode.Text[i-1] == '\n') { aType := textNode.Text[i+6:] j := bytes.IndexByte(aType, '\n') if j < 0 { j = len(aType) } aType = aType[:j] return string(aType), true } return "", false } func inputTypesForListNode(listNode *parse.ListNode) []string { var inputTypes []string for _, listedNode := range listNode.Nodes { it := inputTypesForNode(listedNode) inputTypes = append(inputTypes, it...) } return inputTypes } func (scp *streamConfigParsed) configForInput(inputType string) []byte { if inputType == "log" { inputType = "file" } config := configForInputForNode(scp.tree.Root, inputType) r := regexp.MustCompile("(\n)+") return bytes.TrimSpace(r.ReplaceAll(config, []byte{'\n'})) } func configForInputForNode(node parse.Node, inputType string) []byte { textNode, isTextNode := node.(*parse.TextNode) if isTextNode { return writeHandlebarsTextNode(textNode) } listNode, isListNode := node.(*parse.ListNode) if isListNode { return writeHandlebarsListNode(listNode, inputType) } ifNode, isIfNode := node.(*parse.IfNode) if isIfNode { return writeHandlebarsIfNode(ifNode, inputType) } rangeNode, isRangeNode := node.(*parse.RangeNode) if isRangeNode { return writeHandlebarsRangeNode(rangeNode, inputType) } actionNode, isActionNode := node.(*parse.ActionNode) if isActionNode { return writeHandlebarsActionNode(actionNode) } panic(fmt.Sprintf("unsupported node: %s", node.String())) } func writeHandlebarsTextNode(textNode *parse.TextNode) []byte { i := bytes.Index(textNode.Text, []byte("type: ")) if i > -1 && (i == 0 || textNode.Text[i-1] == '\n') { var buffer bytes.Buffer buffer.Write(textNode.Text[0:i]) j := bytes.Index(textNode.Text[i:], []byte{'\n'}) if j > 0 { buffer.Write(textNode.Text[i+j+1:]) return buffer.Bytes() } } return textNode.Text } func writeHandlebarsListNode(listNode *parse.ListNode, inputType string) []byte { var buffer bytes.Buffer for _, listedNode := range listNode.Nodes { buf := configForInputForNode(listedNode, inputType) buffer.Write(buf) } return buffer.Bytes() } func writeHandlebarsIfNode(ifNode *parse.IfNode, inputType string) []byte { var buffer bytes.Buffer if isIfNodeEqInput(ifNode) { if isIfNodeEqInputInputType(ifNode, inputType) { if ifNode.List != nil { buffer.Write(configForInputForNode(ifNode.List, inputType)) } } else { if ifNode.ElseList != nil { buffer.Write(configForInputForNode(ifNode.ElseList, inputType)) } } } else { if len(ifNode.Pipe.Cmds[0].Args) == 1 { var1 := ifNode.Pipe.Cmds[0].Args[0].String()[1:] buffer.WriteString(fmt.Sprintf("{{#if %s}}", var1)) } else { buffer.WriteString(fmt.Sprintf("{{#if %s}}", ifNode.Pipe.String())) } if ifNode.List != nil { buffer.Write(configForInputForNode(ifNode.List, inputType)) } if ifNode.ElseList != nil { buffer.WriteString("{{else}}") buffer.Write(configForInputForNode(ifNode.ElseList, inputType)) } buffer.WriteString("{{/if}}") } return buffer.Bytes() } func isIfNodeEqInput(ifNode *parse.IfNode) bool { if len(ifNode.Pipe.Cmds[0].Args) > 1 { op := ifNode.Pipe.Cmds[0].Args[0].String() var1 := ifNode.Pipe.Cmds[0].Args[1].String() if op == "eq" && var1 == ".input" { return true } } return false } func isIfNodeEqInputInputType(ifNode *parse.IfNode, inputType string) bool { if len(ifNode.Pipe.Cmds[0].Args) > 1 { op := ifNode.Pipe.Cmds[0].Args[0].String() var1 := ifNode.Pipe.Cmds[0].Args[1].String() var2 := ifNode.Pipe.Cmds[0].Args[2].String() if op == "eq" && var1 == ".input" && var2 == fmt.Sprintf(`"%s"`, inputType) { return true } } return false } func writeHandlebarsActionNode(actionNode *parse.ActionNode) []byte { var buffer bytes.Buffer if len(actionNode.Pipe.Cmds) > 0 { cmdArgs := writeHandlebarsCmdArgs(actionNode.Pipe.Cmds[0].Args) buffer.WriteString("{{") buffer.Write(cmdArgs) buffer.WriteString("}}") } return buffer.Bytes() } func writeHandlebarsRangeNode(rangeNode *parse.RangeNode, inputType string) []byte { var buffer bytes.Buffer cmdArgs := writeHandlebarsCmdArgs(rangeNode.Pipe.Cmds[0].Args) decl := writeHandlebarsCmdDecl(rangeNode.Pipe.Decl) buffer.WriteString("{{#each ") buffer.Write(cmdArgs) buffer.Write(decl) buffer.WriteString("}}") buffer.Write(writeHandlebarsListNode(rangeNode.List, inputType)) buffer.WriteString("{{/each}}") return buffer.Bytes() } func writeHandlebarsCmdArgs(args []parse.Node) []byte { var buffer bytes.Buffer for i, arg := range args { argWithoutDot := arg.String()[1:] if len(argWithoutDot) == 0 { argWithoutDot = "this" } buffer.WriteString(argWithoutDot) if i != (len(args) - 1) { buffer.WriteString(" ") } } return buffer.Bytes() } func writeHandlebarsCmdDecl(decl []*parse.VariableNode) []byte { var buffer bytes.Buffer if len(decl) > 0 { buffer.WriteString(" as |") } for i := len(decl) - 1; i >= 0; i-- { aVar := decl[i].String()[1:] buffer.WriteString(aVar) if i != 0 { buffer.WriteByte(' ') } } if len(decl) > 0 { buffer.WriteString("|") } return buffer.Bytes() } func (scp *streamConfigParsed) filterVarsForInput(inputType string, vars []packages.Variable) []packages.Variable { variableNamesForInput := scp.variableNamesForInput(inputType) var filtered []packages.Variable for _, aVar := range vars { var found bool for _, variableName := range variableNamesForInput { if aVar.Name == variableName { found = true break } } if found { filtered = append(filtered, aVar) } } return filtered } func (scp *streamConfigParsed) variableNamesForInput(inputType string) []string { if inputType == "log" { inputType = "file" } var variables []string variables = variableNamesForInputForNode(scp.tree.Root, inputType, variables) return uniqueStringValues(variables) } func variableNamesForInputForNode(node parse.Node, inputType string, variables []string) []string { _, isTextNode := node.(*parse.TextNode) if isTextNode { return variables // do nothing, there are no variables } listNode, isListNode := node.(*parse.ListNode) if isListNode { return variableNamesListNode(listNode, inputType, variables) } ifNode, isIfNode := node.(*parse.IfNode) if isIfNode { return variableNamesIfNode(ifNode, inputType, variables) } rangeNode, isRangeNode := node.(*parse.RangeNode) if isRangeNode { return variableNamesRangeNode(rangeNode, inputType, variables) } actionNode, isActionNode := node.(*parse.ActionNode) if isActionNode { return variableNamesForNodeArgs(actionNode.Pipe.Cmds[0].Args, variables) } panic(fmt.Sprintf("unsupported node: %s", node.String())) } func variableNamesListNode(listNode *parse.ListNode, inputType string, vars []string) []string { var variables []string variables = append(variables, vars...) for _, listedNode := range listNode.Nodes { variables = uniqueStringValues(append(variables, variableNamesForInputForNode(listedNode, inputType, variables)...)) } return variables } func variableNamesIfNode(ifNode *parse.IfNode, inputType string, vars []string) []string { var variables []string variables = append(variables, vars...) if isIfNodeEqInput(ifNode) { if isIfNodeEqInputInputType(ifNode, inputType) { if ifNode.List != nil { variables = uniqueStringValues(append(variableNamesForInputForNode(ifNode.List, inputType, variables))) } } else { if ifNode.ElseList != nil { variables = uniqueStringValues(append(variableNamesForInputForNode(ifNode.ElseList, inputType, variables))) } } } else { if ifNode.List != nil { variables = uniqueStringValues(append(variableNamesForInputForNode(ifNode.List, inputType, variables))) } if ifNode.ElseList != nil { variables = uniqueStringValues(append(variableNamesForInputForNode(ifNode.ElseList, inputType, variables))) } variables = uniqueStringValues(append(variables, variableNamesForNodeArgs(ifNode.Pipe.Cmds[0].Args, variables)...)) } return variables } func variableNamesRangeNode(rangeNode *parse.RangeNode, inputType string, vars []string) []string { var variables []string variables = append(variables, vars...) variables = uniqueStringValues(append(variables, variableNamesListNode(rangeNode.List, inputType, variables)...)) variables = uniqueStringValues(append(variables, variableNamesForNodeArgs(rangeNode.Pipe.Cmds[0].Args, variables)...)) return variables } func variableNamesForNodeArgs(args []parse.Node, vars []string) []string { var variables []string variables = append(variables, vars...) if len(args) > 0 { for _, arg := range args { variables = append(variables, arg.String()[1:]) } } return variables }