cmd/apmtool/espoll.go (129 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"strings"
"time"
"github.com/urfave/cli/v3"
"github.com/elastic/apm-tools/pkg/espoll"
"github.com/elastic/go-elasticsearch/v8"
)
var maxElasticsearchBackoff = 10 * time.Second
type config struct {
query string
esURL string
esUsername string
esPassword string
tlsSkipVerify bool
target string
timeout time.Duration
hits uint64
}
func (cmd *Commands) pollDocs(ctx context.Context, c *cli.Command) error {
cfg := config{
query: c.String("query"),
esURL: cmd.cfg.ElasticsearchURL,
esUsername: cmd.cfg.Username,
esPassword: cmd.cfg.Password,
tlsSkipVerify: cmd.cfg.TLSSkipVerify,
target: c.String("target"),
timeout: c.Duration("timeout"),
hits: c.Uint("min-hits"),
}
query := c.String("query")
if query == "" {
stat, err := os.Stdin.Stat()
if err != nil {
log.Fatalf("failed to stat stdin: %s", err.Error())
}
if stat.Size() == 0 {
log.Fatal("empty -query flag and stdin, please set one.")
}
b, err := io.ReadAll(os.Stdin)
if err != nil {
log.Fatalf("failed to read stdin: %s", err.Error())
}
query = string(strings.Trim(string(b), "\n"))
}
log.Println("query:", query)
ctxMain, cancel := signal.NotifyContext(ctx, os.Interrupt, os.Kill)
defer cancel()
if err := Main(ctxMain, cfg); err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
return nil
}
// NewESPollCmd returns pointer to Command that queries documents from Elasticsearch
func NewESPollCmd(commands *Commands) *cli.Command {
return &cli.Command{
Name: "espoll",
Usage: "poll documents from Elasticsearch",
Action: commands.pollDocs,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "query",
Usage: "The Elasticsearch query in Query DSL. Must be set via this flag or stdin.",
},
&cli.StringFlag{
Name: "target",
Value: "traces-*,logs-*,metrics-*",
Usage: "Comma-separated list of data streams, indices, and aliases to search (Supports wildcards (*)).",
},
&cli.DurationFlag{
Name: "timeout",
Value: 30 * time.Second,
Usage: "Elasticsearch request timeout",
},
&cli.UintFlag{
Name: "min-hits",
Value: 1,
Usage: "When specified and > 10, this should cause the size parameter to be set.",
},
},
}
}
func Main(ctx context.Context, cfg config) error {
if cfg.query == "" {
return errors.New("query cannot be empty")
}
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: cfg.tlsSkipVerify}
client, err := elasticsearch.NewClient(elasticsearch.Config{
Username: cfg.esUsername,
Password: cfg.esPassword,
Addresses: strings.Split(cfg.esURL, ","),
Transport: transport,
MaxRetries: 5,
RetryBackoff: func(attempt int) time.Duration {
backoff := (500 * time.Millisecond) * (1 << (attempt - 1))
if backoff > maxElasticsearchBackoff {
backoff = maxElasticsearchBackoff
}
return backoff
},
})
if err != nil {
return err
}
esClient := espoll.WrapClient(client)
result, err := esClient.SearchIndexMinDocs(ctx,
int(cfg.hits), cfg.target, stringMarshaler(cfg.query),
espoll.WithTimeout(cfg.timeout),
)
if err != nil {
return fmt.Errorf("search request returned error: %w", err)
}
if err := json.NewEncoder(os.Stdout).Encode(result); err != nil {
return fmt.Errorf("failed to encode search result: %w", err)
}
return nil
}
type stringMarshaler string
func (s stringMarshaler) MarshalJSON() ([]byte, error) { return []byte(s), nil }