pkg/apmclient/client.go (150 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmclient
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/tidwall/gjson"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/security/createapikey"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder"
)
type Client struct {
es *elasticsearch.TypedClient
}
// New returns a new Client for querying APM data.
func New(cfg Config) (*Client, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: cfg.TLSSkipVerify}
es, err := elasticsearch.NewTypedClient(elasticsearch.Config{
Addresses: []string{cfg.ElasticsearchURL},
Username: cfg.Username,
APIKey: cfg.APIKey,
Password: cfg.Password,
Transport: transport,
})
if err != nil {
return nil, fmt.Errorf("error creating Elasticsearch client: %w", err)
}
return &Client{
es: es,
}, nil
}
// GetElasticCloudAPMInput returns the APM configuration as defined
// in the "elastic-cloud-apm" integration policy,
func (c *Client) GetElasticCloudAPMInput(ctx context.Context) (gjson.Result, error) {
size := 1
resp, err := c.es.Search().Index(".fleet-policies").Request(&search.Request{
Size: &size,
Sort: []types.SortCombinations{types.SortOptions{
SortOptions: map[string]types.FieldSort{
"revision_idx": {
Order: &sortorder.Desc,
},
},
}},
Query: &types.Query{
Term: map[string]types.TermQuery{
"policy_id": {
Value: "policy-elastic-agent-on-cloud",
},
},
},
}).Do(ctx)
if err != nil {
return gjson.Result{}, fmt.Errorf("error searching .fleet-policies: %w", err)
}
if n := len(resp.Hits.Hits); n != 1 {
return gjson.Result{}, fmt.Errorf("expected 1 policy, got %d", n)
}
result := gjson.GetBytes(resp.Hits.Hits[0].Source_, `data.inputs.#(id=="elastic-cloud-apm")`)
if !result.Exists() {
return gjson.Result{}, fmt.Errorf("input %q missing", "elastic-cloud-apm")
}
return result, nil
}
// CreateAgentAPIKey creates an agent API Key, and returns it in the
// base64-encoded form that agents should provide.
//
// If expiration is less than or equal to zero, then the API Key never expires.
func (c *Client) CreateAgentAPIKey(ctx context.Context, expiration time.Duration) (string, error) {
name := "apm-agent"
var maybeExpiration types.Duration
if expiration > 0 {
maybeExpiration = formatDurationElasticsearch(expiration)
}
resp, err := c.es.Security.CreateApiKey().Request(&createapikey.Request{
Name: &name,
Expiration: maybeExpiration,
RoleDescriptors: map[string]types.RoleDescriptor{
"apm": {
Applications: []types.ApplicationPrivileges{{
Application: "apm",
Resources: []string{"*"},
Privileges: []string{"event:write", "config_agent:read"},
}},
},
},
Metadata: map[string]json.RawMessage{
"application": []byte(`"apm"`),
"creator": []byte(`"apmclient"`),
},
}).Do(ctx)
if err != nil {
return "", fmt.Errorf("error creating agent API Key: %w", err)
}
return resp.Encoded, nil
}
// ServiceSummary returns ServiceSummary objects by aggregating `service_summary` metric sets.
func (c *Client) ServiceSummary(ctx context.Context, options ...Option) ([]ServiceSummary, error) {
// TODO options
req := &search.Request{
Aggregations: map[string]types.Aggregations{
"services": {
MultiTerms: &types.MultiTermsAggregation{
Terms: []types.MultiTermLookup{{
Field: "service.name",
}, {
Field: "service.environment",
Missing: "",
}, {
Field: "service.language.name",
}, {
Field: "agent.name",
}},
},
},
},
}
// TODO select appropriate resolution according to the time filter.
resp, err := c.es.Search().
Index("metrics-apm.service_summary.1m-*").
Size(0).Request(req).Do(ctx)
if err != nil {
return nil, fmt.Errorf("error search service_summmary metrics")
}
servicesAggregation := resp.Aggregations["services"].(*types.MultiTermsAggregate)
buckets := servicesAggregation.Buckets.([]types.MultiTermsBucket)
out := make([]ServiceSummary, len(buckets))
for i, bucket := range buckets {
out[i] = ServiceSummary{
Name: bucket.Key[0].(string),
Environment: bucket.Key[1].(string),
Language: bucket.Key[2].(string),
Agent: bucket.Key[3].(string),
}
}
return out, nil
}
var elasticsearchTimeUnits = []struct {
Duration time.Duration
Unit string
}{
{time.Hour, "h"},
{time.Minute, "m"},
{time.Second, "s"},
{time.Millisecond, "ms"},
{time.Microsecond, "micros"},
}
// formatDurationElasticsearch formats a duration using
// Elasticsearch supported time units.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/api-conventions.html#time-units
func formatDurationElasticsearch(d time.Duration) string {
for _, tu := range elasticsearchTimeUnits {
if d%tu.Duration == 0 {
return fmt.Sprintf("%d%s", d/tu.Duration, tu.Unit)
}
}
return fmt.Sprintf("%dnanos", d)
}