action/log/logadapter/elasticsearch.go (213 lines of code) (raw):
package logadapter
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/olivere/elastic/v7"
"github.com/seata/seata-ctl/tool"
"io"
"net/http"
"strings"
)
// QueryLogs is a function that queries specific documents
func (e *Elasticsearch) QueryLogs(filter map[string]interface{}, currency *Currency, number int) error {
client, err := createElasticClient(currency)
if err != nil {
return fmt.Errorf("failed to create elasticsearch client: %w", err)
}
indexName := currency.Source
indexFields, err := getEsIndexList(currency)
if err != nil {
return err
}
query, err := buildQuery(filter, indexFields)
if err != nil {
return err
}
// Execute the search query
searchResult, err := client.Search().
Index(indexName).
Size(number).
Query(query).
Do(context.Background())
if err != nil {
return fmt.Errorf("error fetching documents: %w", err)
}
err = processSearchHits(searchResult, currency)
if err != nil {
return err
}
return nil
}
// createElasticClient configures and creates a new Elasticsearch client
func createElasticClient(currency *Currency) (*elastic.Client, error) {
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
client, err := elastic.NewClient(
elastic.SetURL(currency.Address),
elastic.SetHttpClient(httpClient),
elastic.SetSniff(false),
elastic.SetBasicAuth(currency.Username, currency.Password),
)
if err != nil {
return nil, err
}
return client, nil
}
// createEsDefaultClient configures and creates a new Elasticsearch client
func createEsDefaultClient(currency *Currency) (*elasticsearch.Client, error) {
// Configure the Elasticsearch client
cfg := elasticsearch.Config{
Addresses: []string{
currency.Address,
},
Username: currency.Username,
Password: currency.Password,
// Skip certificate verification if using a self-signed certificate
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
// Create the client instance
es, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("error creating the client: %s", err)
}
return es, nil
}
// processSearchHits handles and formats the search results
func processSearchHits(searchResult *elastic.SearchResult, currency *Currency) error {
if len(searchResult.Hits.Hits) == 0 {
return fmt.Errorf("no documents found")
}
for _, hit := range searchResult.Hits.Hits {
var doc map[string]interface{}
if err := json.Unmarshal(hit.Source, &doc); err != nil {
return fmt.Errorf("failed to unmarshal document: %w", err)
}
// Pretty print the document content
for key, value := range doc {
if key == currency.Index {
if strings.Contains(value.(string), "INFO") {
tool.Logger.Info(fmt.Sprintf("%v", value))
}
if strings.Contains(value.(string), "ERROR") {
tool.Logger.Error(fmt.Sprintf("%v", value))
}
if strings.Contains(value.(string), "WARN") {
tool.Logger.Warn(fmt.Sprintf("%v", value))
}
}
}
}
return nil
}
// getFieldNames recursively extracts field names under the "fields" key
func getFieldNames(properties map[string]interface{}, prefix string) []string {
fieldNames := []string{}
for fieldName, fieldValue := range properties {
// Generate the full path for the current field
fullName := fieldName
if prefix != "" {
fullName = prefix + "." + fieldName
}
// Check if the field contains a "fields" node
if fieldMap, ok := fieldValue.(map[string]interface{}); ok {
if fields, ok := fieldMap["fields"].(map[string]interface{}); ok {
// If there is a "fields" node, iterate through its fields and add to the result
for subField := range fields {
fieldNames = append(fieldNames, fullName+"."+subField)
}
}
// If the field contains nested "properties", recursively parse subfields
if nestedProperties, ok := fieldMap["properties"].(map[string]interface{}); ok {
fieldNames = append(fieldNames, getFieldNames(nestedProperties, fullName)...)
}
}
}
return fieldNames
}
// extractFields extracts all field names from a nested map structure
func extractFields(data map[string]interface{}) []string {
var allFields []string
// Iterate through each index to get its field names
for _, indexData := range data {
if indexMap, ok := indexData.(map[string]interface{}); ok {
if mappings, ok := indexMap["mappings"].(map[string]interface{}); ok {
if properties, ok := mappings["properties"].(map[string]interface{}); ok {
// Get all field names under "fields" and merge into the result
allFields = append(allFields, getFieldNames(properties, "")...)
}
}
}
}
return allFields
}
// ParseJobString parses the input string and returns a map
func ParseJobString(input string) (map[string]string, error) {
// Remove curly braces
input = strings.Trim(input, "{}")
// Split by ','
parts := strings.Split(input, ",")
kvMap := make(map[string]string)
for _, part := range parts {
// Split by '=' to get key-value pairs
kv := strings.Split(part, "=")
if len(kv) != 2 {
return nil, fmt.Errorf("invalid key=value pair: %s", part)
}
kvMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
}
return kvMap, nil
}
// Contains checks if a string exists in a slice of strings
func Contains(slice []string, str string) bool {
for _, item := range slice {
if item == str {
return true
}
}
return false
}
// getEsIndexList retrieves field names from the specified Elasticsearch index
func getEsIndexList(currency *Currency) ([]string, error) {
es, err := createEsDefaultClient(currency)
if err != nil {
return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err)
}
// Build the request to get the mappings
req := esapi.IndicesGetMappingRequest{
Index: []string{currency.Source}, // Specify the index name
}
// Execute the request
res, err := req.Do(context.Background(), es)
if err != nil {
return nil, fmt.Errorf("error getting mapping: %s", err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
tool.Logger.Error("error closing body")
}
}(res.Body)
// Check if the response is successful
if res.IsError() {
return nil, fmt.Errorf("error response: %s", res.String())
}
// Read and parse the response
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to make request to %s", err)
}
// Call method to extract field names
indexFields := extractFields(result)
indexFields = removeKeywordSuffix(indexFields)
return indexFields, nil
}
// removeKeywordSuffix removes ".keyword" suffix from each string in the slice
func removeKeywordSuffix(input []string) []string {
var result []string
for _, str := range input {
// Check if the string ends with ".keyword"
if strings.HasSuffix(str, ".keyword") {
// Remove the ".keyword" suffix
str = strings.TrimSuffix(str, ".keyword")
}
result = append(result, str) // Add the processed string to the result slice
}
return result
}
// buildQuery constructs a BoolQuery based on the provided filter and index fields
func buildQuery(filter map[string]interface{}, indexFields []string) (*elastic.BoolQuery, error) {
query := elastic.NewBoolQuery()
if filter["query"].(string) != "{}" {
indexMap, err := ParseJobString(filter["query"].(string))
if err != nil {
return query, err
}
for k, v := range indexMap {
if Contains(indexFields, k) {
query.Should(elastic.NewTermQuery(k, v))
} else {
return query, fmt.Errorf("invalid index key: %s", k)
}
}
}
return query, nil
}