internal/elasticsearch/client.go (303 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package elasticsearch
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/elastic-package/internal/certs"
)
// API contains the elasticsearch APIs
type API = esapi.API
// IngestSimulateRequest configures the Ingest Simulate API request.
type IngestSimulateRequest = esapi.IngestSimulateRequest
// IngestGetPipelineRequest configures the Ingest Get Pipeline API request.
type IngestGetPipelineRequest = esapi.IngestGetPipelineRequest
// ClusterStateRequest configures the Cluster State API request.
type ClusterStateRequest = esapi.ClusterStateRequest
// clientOptions are used to configure a client.
type clientOptions struct {
address string
apiKey string
username string
password string
// certificateAuthority is the certificate to validate the server certificate.
certificateAuthority string
// skipTLSVerify disables TLS validation.
skipTLSVerify bool
}
type ClientOption func(*clientOptions)
// OptionWithAPIKey sets the API key to be used by the client for authentication.
func OptionWithAPIKey(apiKey string) ClientOption {
return func(opts *clientOptions) {
opts.apiKey = apiKey
}
}
// OptionWithAddress sets the address to be used by the client.
func OptionWithAddress(address string) ClientOption {
return func(opts *clientOptions) {
opts.address = address
}
}
// OptionWithUsername sets the username to be used by the client.
func OptionWithUsername(username string) ClientOption {
return func(opts *clientOptions) {
opts.username = username
}
}
// OptionWithPassword sets the password to be used by the client.
func OptionWithPassword(password string) ClientOption {
return func(opts *clientOptions) {
opts.password = password
}
}
// OptionWithCertificateAuthority sets the certificate authority to be used by the client.
func OptionWithCertificateAuthority(certificateAuthority string) ClientOption {
return func(opts *clientOptions) {
opts.certificateAuthority = certificateAuthority
}
}
// OptionWithSkipTLSVerify disables TLS validation.
func OptionWithSkipTLSVerify() ClientOption {
return func(opts *clientOptions) {
opts.skipTLSVerify = true
}
}
// Client is a wrapper over an Elasticsearch Client.
type Client struct {
*elasticsearch.Client
}
// NewClient method creates new instance of the Elasticsearch client.
func NewClient(customOptions ...ClientOption) (*Client, error) {
config, err := NewConfig(customOptions...)
if err != nil {
return nil, err
}
return NewClientWithConfig(config)
}
func NewConfig(customOptions ...ClientOption) (elasticsearch.Config, error) {
options := clientOptions{}
for _, option := range customOptions {
option(&options)
}
if options.address == "" {
return elasticsearch.Config{}, ErrUndefinedAddress
}
config := elasticsearch.Config{
Addresses: []string{options.address},
APIKey: options.apiKey,
Username: options.username,
Password: options.password,
}
if options.skipTLSVerify {
config.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
} else if options.certificateAuthority != "" {
rootCAs, err := certs.SystemPoolWithCACertificate(options.certificateAuthority)
if err != nil {
return config, fmt.Errorf("reading CA certificate: %w", err)
}
config.Transport = &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: rootCAs},
}
}
return config, nil
}
func NewClientWithConfig(config elasticsearch.Config) (*Client, error) {
client, err := elasticsearch.NewClient(config)
if err != nil {
return nil, fmt.Errorf("can't create instance: %w", err)
}
return &Client{Client: client}, nil
}
// CheckHealth checks the health of the cluster.
func (client *Client) CheckHealth(ctx context.Context) error {
resp, err := client.Cluster.Health(client.Cluster.Health.WithContext(ctx))
if err != nil {
return fmt.Errorf("error checking cluster health: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusGone {
// We are in a managed deployment, API not available, assume healthy.
return nil
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to check cluster health: %s", resp.String())
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading cluster health response: %w", err)
}
var clusterHealth struct {
Status string `json:"status"`
}
err = json.Unmarshal(body, &clusterHealth)
if err != nil {
return fmt.Errorf("error decoding cluster health response: %w", err)
}
if status := clusterHealth.Status; status != "green" && status != "yellow" {
if status != "red" {
return fmt.Errorf("cluster in unhealthy state: %q", status)
}
cause, err := client.redHealthCause(ctx)
if err != nil {
return fmt.Errorf("cluster in unhealthy state, failed to identify cause: %w", err)
}
return fmt.Errorf("cluster in unhealthy state: %s", cause)
}
return nil
}
type Info struct {
Name string `json:"name"`
ClusterName string `json:"cluster_name"`
ClusterUUID string `json:"cluster_uuid"`
Version struct {
Number string `json:"number"`
BuildFlavor string `json:"build_flavor"`
} `json:"version"`
}
// Info gets cluster information and metadata.
func (client *Client) Info(ctx context.Context) (*Info, error) {
resp, err := client.Client.Info(client.Client.Info.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("error getting cluster info: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to get cluster info: %s", resp.String())
}
var info Info
err = json.NewDecoder(resp.Body).Decode(&info)
if err != nil {
return nil, fmt.Errorf("error decoding cluster info: %w", err)
}
return &info, nil
}
// redHealthCause tries to identify the cause of a cluster in red state. This could be
// also used as a replacement of CheckHealth, but keeping them separated because it uses
// internal undocumented APIs that might change.
func (client *Client) redHealthCause(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/_internal/_health", nil)
if err != nil {
return "", fmt.Errorf("error creating internal health request: %w", err)
}
resp, err := client.Transport.Perform(req)
if err != nil {
return "", fmt.Errorf("error performing internal health request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("error reading internal health response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to get cause of red health; API status code = %d; response body = %s", resp.StatusCode, string(body))
}
var internalHealth struct {
Status string `json:"status"`
Indicators map[string]struct {
Status string `json:"status"`
Impacts []struct {
Severity int `json:"severity"`
} `json:"impacts"`
Diagnosis []struct {
Cause string `json:"cause"`
} `json:"diagnosis"`
} `json:"indicators"`
}
err = json.Unmarshal(body, &internalHealth)
if err != nil {
return "", fmt.Errorf("error decoding internal health response: %w", err)
}
if internalHealth.Status != "red" {
return "", errors.New("cluster state is not red?")
}
// Only diagnostics with the highest severity impacts are returned.
var highestSeverity int
var causes []string
for _, indicator := range internalHealth.Indicators {
if indicator.Status != "red" {
continue
}
var severity int
for _, impact := range indicator.Impacts {
if impact.Severity > severity {
severity = impact.Severity
}
}
switch {
case severity < highestSeverity:
continue
case severity > highestSeverity:
highestSeverity = severity
causes = nil
case severity == highestSeverity:
// Continue appending for current severity.
}
for _, diagnosis := range indicator.Diagnosis {
causes = append(causes, diagnosis.Cause)
}
}
if len(causes) == 0 {
return "", errors.New("no causes found")
}
return strings.Join(causes, ", "), nil
}
type Mappings struct {
Properties json.RawMessage `json:"properties"`
DynamicTemplates json.RawMessage `json:"dynamic_templates"`
}
func (c *Client) SimulateIndexTemplate(ctx context.Context, indexTemplateName string) (*Mappings, error) {
resp, err := c.Indices.SimulateTemplate(
c.Indices.SimulateTemplate.WithContext(ctx),
c.Indices.SimulateTemplate.WithName(indexTemplateName),
)
if err != nil {
return nil, fmt.Errorf("failed to get field mapping for data stream %q: %w", indexTemplateName, err)
}
defer resp.Body.Close()
if resp.IsError() {
return nil, fmt.Errorf("error getting mapping: %s", resp)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading mapping body: %w", err)
}
type indexTemplateSimulated struct {
// Settings json.RawMessage `json:"settings"`
Mappings Mappings `json:"mappings"`
}
type previewTemplate struct {
Template indexTemplateSimulated `json:"template"`
}
var preview previewTemplate
if err := json.Unmarshal(body, &preview); err != nil {
return nil, fmt.Errorf("error unmarshaling mappings: %w", err)
}
// In case there are no dynamic templates, set an empty array
if string(preview.Template.Mappings.DynamicTemplates) == "" {
preview.Template.Mappings.DynamicTemplates = []byte("[]")
}
// In case there are no mappings defined, set an empty map
if string(preview.Template.Mappings.Properties) == "" {
preview.Template.Mappings.Properties = []byte("{}")
}
return &preview.Template.Mappings, nil
}
func (c *Client) DataStreamMappings(ctx context.Context, dataStreamName string) (*Mappings, error) {
mappingResp, err := c.Indices.GetMapping(
c.Indices.GetMapping.WithContext(ctx),
c.Indices.GetMapping.WithIndex(dataStreamName),
)
if err != nil {
return nil, fmt.Errorf("failed to get field mapping for data stream %q: %w", dataStreamName, err)
}
defer mappingResp.Body.Close()
if mappingResp.IsError() {
return nil, fmt.Errorf("error getting mapping: %s", mappingResp)
}
body, err := io.ReadAll(mappingResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading mapping body: %w", err)
}
mappingsRaw := map[string]struct {
Mappings Mappings `json:"mappings"`
}{}
if err := json.Unmarshal(body, &mappingsRaw); err != nil {
return nil, fmt.Errorf("error unmarshaling mappings: %w", err)
}
if len(mappingsRaw) != 1 {
return nil, fmt.Errorf("exactly 1 mapping was expected, got %d", len(mappingsRaw))
}
var mappingsDefinition Mappings
for _, v := range mappingsRaw {
mappingsDefinition = v.Mappings
}
// In case there are no dynamic templates, set an empty array
if string(mappingsDefinition.DynamicTemplates) == "" {
mappingsDefinition.DynamicTemplates = []byte("[]")
}
// In case there are no mappings defined, set an empty map
if string(mappingsDefinition.Properties) == "" {
mappingsDefinition.Properties = []byte("{}")
}
return &mappingsDefinition, nil
}