internal/agentcfg/elasticsearch.go (268 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package agentcfg
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/metric"
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/elastic-agent-libs/logp"
)
const ElasticsearchIndexName = ".apm-agent-configuration"
const (
// ErrInfrastructureNotReady is returned when a fetch request comes in while
// the infrastructure is not ready to serve the request.
// This may happen when the local cache is not initialized and no fallback fetcher is configured.
ErrInfrastructureNotReady = "agentcfg infrastructure is not ready"
// ErrNoValidElasticsearchConfig is an error where the server is
// not properly configured to fetch agent configuration.
ErrNoValidElasticsearchConfig = "no valid elasticsearch config to fetch agent config"
)
const (
refreshCacheTimeout = 5 * time.Second
loggerRateLimit = time.Minute
)
type ElasticsearchFetcher struct {
client *elasticsearch.Client
cacheDuration time.Duration
fallbackFetcher Fetcher
mu sync.RWMutex
last time.Time
cache []AgentConfig
searchSize int
invalidESCfg atomic.Bool
cacheInitialized atomic.Bool
logger, rateLimitedLogger *logp.Logger
tracer *apm.Tracer
esCacheEntriesCount metric.Int64Gauge
esFetchCount metric.Int64Counter
esFetchFallbackCount metric.Int64Counter
esFetchUnavailableCount metric.Int64Counter
esFetchInvalidCount metric.Int64Counter
esCacheRefreshSuccesses metric.Int64Counter
esCacheRefreshFailures metric.Int64Counter
}
func NewElasticsearchFetcher(
client *elasticsearch.Client,
cacheDuration time.Duration,
fetcher Fetcher,
tracer *apm.Tracer,
mp metric.MeterProvider,
logger *logp.Logger,
) *ElasticsearchFetcher {
meter := mp.Meter("github.com/elastic/apm-server/internal/agentcfg")
esCacheEntriesCount, _ := meter.Int64Gauge("apm-server.agentcfg.elasticsearch.cache.entries.count")
esFetchCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.es")
esFetchFallbackCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.fallback")
esFetchUnavailableCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.unavailable")
esFetchInvalidCount, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.fetch.invalid")
esCacheRefreshSuccesses, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.successes")
esCacheRefreshFailures, _ := meter.Int64Counter("apm-server.agentcfg.elasticsearch.cache.refresh.failures")
logger = logger.Named("agentcfg")
return &ElasticsearchFetcher{
client: client,
cacheDuration: cacheDuration,
fallbackFetcher: fetcher,
searchSize: 100,
logger: logger,
rateLimitedLogger: logger.WithOptions(logs.WithRateLimit(loggerRateLimit)),
tracer: tracer,
esCacheEntriesCount: esCacheEntriesCount,
esFetchCount: esFetchCount,
esFetchFallbackCount: esFetchFallbackCount,
esFetchUnavailableCount: esFetchUnavailableCount,
esFetchInvalidCount: esFetchInvalidCount,
esCacheRefreshSuccesses: esCacheRefreshSuccesses,
esCacheRefreshFailures: esCacheRefreshFailures,
}
}
// Fetch finds a matching agent config based on the received query.
func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result, error) {
if f.cacheInitialized.Load() {
// Happy path: serve fetch requests using an initialized cache.
f.mu.RLock()
defer f.mu.RUnlock()
f.esFetchCount.Add(context.Background(), 1)
return matchAgentConfig(query, f.cache), nil
}
if f.fallbackFetcher != nil {
f.esFetchFallbackCount.Add(context.Background(), 1)
return f.fallbackFetcher.Fetch(ctx, query)
}
if f.invalidESCfg.Load() {
f.esFetchInvalidCount.Add(context.Background(), 1)
f.rateLimitedLogger.Errorf("rejecting fetch request: no valid elasticsearch config")
return Result{}, errors.New(ErrNoValidElasticsearchConfig)
}
f.esFetchUnavailableCount.Add(context.Background(), 1)
f.rateLimitedLogger.Warnf("rejecting fetch request: infrastructure is not ready")
return Result{}, errors.New(ErrInfrastructureNotReady)
}
// Run refreshes the fetcher cache by querying Elasticsearch periodically.
func (f *ElasticsearchFetcher) Run(ctx context.Context) error {
refresh := func() bool {
// refresh returns a bool that indicates whether Run should return
// immediately without error, e.g. due to invalid Elasticsearch config.
tx := f.tracer.StartTransaction("ElasticsearchFetcher.refresh", "")
defer tx.End()
ctx = apm.ContextWithTransaction(ctx, tx)
if err := f.refreshCache(ctx); err != nil {
if e := apm.CaptureError(ctx, err); e != nil {
e.Send()
}
// Do not log as error when there is a fallback.
var logFunc func(string, ...interface{})
if f.fallbackFetcher == nil {
logFunc = f.logger.Errorf
} else {
logFunc = f.logger.Warnf
}
logFunc("refresh cache error: %s", err)
if f.invalidESCfg.Load() {
logFunc("stopping refresh cache background job: elasticsearch config is invalid")
return true
}
} else {
f.logger.Debugf("refresh cache success")
}
return false
}
// Trigger initial run.
select {
case <-ctx.Done():
return ctx.Err()
default:
if stop := refresh(); stop {
return nil
}
}
// Then schedule subsequent runs.
t := time.NewTicker(f.cacheDuration)
defer t.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
if stop := refresh(); stop {
return nil
}
}
}
}
type cacheResult struct {
Hits struct {
Hits []struct {
Source struct {
AgentName string `json:"agent_name"`
ETag string `json:"etag"`
Service struct {
Name string `json:"name"`
Environment string `json:"environment"`
} `json:"service"`
Settings map[string]string `json:"settings"`
} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
ScrollID string `json:"_scroll_id"`
}
func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
span, ctx := apm.StartSpan(ctx, "ElasticsearchFetcher.refreshCache", "")
defer span.End()
scrollID := ""
buffer := make([]AgentConfig, 0, len(f.cache))
// The refresh cache operation should complete within refreshCacheTimeout.
ctx, cancel := context.WithTimeout(ctx, refreshCacheTimeout)
defer cancel()
defer func() {
if err != nil {
f.esCacheRefreshFailures.Add(context.Background(), 1)
} else {
f.esCacheRefreshSuccesses.Add(context.Background(), 1)
}
}()
for {
result, err := f.singlePageRefresh(ctx, scrollID)
if err != nil {
f.clearScroll(ctx, scrollID)
return err
}
for _, hit := range result.Hits.Hits {
buffer = append(buffer, AgentConfig{
ServiceName: hit.Source.Service.Name,
ServiceEnvironment: hit.Source.Service.Environment,
AgentName: hit.Source.AgentName,
Etag: hit.Source.ETag,
Config: hit.Source.Settings,
})
}
scrollID = result.ScrollID
if len(result.Hits.Hits) == 0 {
break
}
}
f.clearScroll(ctx, scrollID)
f.mu.Lock()
f.cache = buffer
f.mu.Unlock()
f.cacheInitialized.Store(true)
f.esCacheEntriesCount.Record(context.Background(), int64(len(f.cache)))
f.last = time.Now()
return nil
}
func (f *ElasticsearchFetcher) clearScroll(ctx context.Context, scrollID string) {
if scrollID == "" {
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, "/_search/scroll/"+scrollID, nil)
if err != nil {
f.logger.Warnf("failed to clear scroll: %v", err)
return
}
resp, err := f.client.Perform(req)
if err != nil {
f.logger.Warnf("failed to clear scroll: %v", err)
return
}
if resp.StatusCode > 299 {
f.logger.Warnf("clearscroll request returned error: %s", resp.Status)
}
resp.Body.Close()
}
func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) {
var result cacheResult
var req *http.Request
var err error
var resp *http.Response
switch scrollID {
case "":
req, err = http.NewRequestWithContext(ctx, http.MethodPost, "/"+ElasticsearchIndexName+"/_search", nil)
if err != nil {
return result, err
}
q := req.URL.Query()
q.Set("scroll", strconv.FormatInt(f.cacheDuration.Milliseconds(), 10)+"ms")
q.Set("size", strconv.FormatInt(int64(f.searchSize), 10))
req.URL.RawQuery = q.Encode()
resp, err = f.client.Perform(req)
default:
req, err = http.NewRequestWithContext(ctx, http.MethodPost, "/_search/scroll", nil)
if err != nil {
return result, err
}
q := req.URL.Query()
q.Set("scroll", strconv.FormatInt(f.cacheDuration.Milliseconds(), 10)+"ms")
q.Set("scroll_id", scrollID)
req.URL.RawQuery = q.Encode()
resp, err = f.client.Perform(req)
}
if err != nil {
return result, err
}
defer resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest {
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
f.invalidESCfg.Store(true)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
f.logger.Debugf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
}
return result, json.NewDecoder(resp.Body).Decode(&result)
}