agentcfg/elasticsearch.go (187 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package agentcfg // import "github.com/elastic/opentelemetry-collector-components/internal/agentcfg" import ( "context" "encoding/json" "errors" "fmt" "io" "net/http" "sync" "sync/atomic" "time" "go.uber.org/zap" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" ) const ElasticsearchIndexName = ".apm-agent-configuration" const ( // ErrInfrastructureNotReady is returned when a fetch request comes in while // the infrastructure is not ready to serve the request. // This may happen when the local cache is not initialized and no fallback fetcher is configured. ErrInfrastructureNotReady = "agentcfg infrastructure is not ready" // ErrNoValidElasticsearchConfig is an error where the server is // not properly configured to fetch agent configuration. ErrNoValidElasticsearchConfig = "no valid elasticsearch config to fetch agent config" ) const ( refreshCacheTimeout = 5 * time.Second loggerRateLimit = time.Minute ) // TODO: // - Add Otel tracer // - Collection metrics type ElasticsearchFetcher struct { last time.Time client *elasticsearch.Client logger *zap.Logger cache []AgentConfig cacheDuration time.Duration searchSize int mu sync.RWMutex invalidESCfg atomic.Bool cacheInitialized atomic.Bool } func NewElasticsearchFetcher( client *elasticsearch.Client, cacheDuration time.Duration, logger *zap.Logger, ) *ElasticsearchFetcher { return &ElasticsearchFetcher{ client: client, cacheDuration: cacheDuration, searchSize: 100, logger: logger, } } // Fetch finds a matching agent config based on the received query. func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result, error) { if f.cacheInitialized.Load() { // Happy path: serve fetch requests using an initialized cache. f.mu.RLock() defer f.mu.RUnlock() return matchAgentConfig(query, f.cache), nil } if f.invalidESCfg.Load() { return Result{}, errors.New(ErrNoValidElasticsearchConfig) } return Result{}, errors.New(ErrInfrastructureNotReady) } // Run refreshes the fetcher cache by querying Elasticsearch periodically. func (f *ElasticsearchFetcher) Run(ctx context.Context) error { refresh := func() bool { // refresh returns a bool that indicates whether Run should return // immediately without error, e.g. due to invalid Elasticsearch config. if err := f.refreshCache(ctx); err != nil { f.logger.Error(fmt.Sprintf("refresh cache error: %s", err)) if f.invalidESCfg.Load() { f.logger.Warn("stopping refresh cache background job: elasticsearch config is invalid") return true } } else { f.logger.Debug("refresh cache success") } return false } // Trigger initial run. select { case <-ctx.Done(): return ctx.Err() default: if stop := refresh(); stop { return nil } } // Then schedule subsequent runs. t := time.NewTicker(f.cacheDuration) defer t.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-t.C: if stop := refresh(); stop { return nil } } } } type cacheResult struct { ScrollID string `json:"_scroll_id"` Hits struct { Hits []struct { Source struct { Settings map[string]string `json:"settings"` Service struct { Name string `json:"name"` Environment string `json:"environment"` } `json:"service"` AgentName string `json:"agent_name"` ETag string `json:"etag"` } `json:"_source"` } `json:"hits"` } `json:"hits"` } func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) { scrollID := "" buffer := make([]AgentConfig, 0, len(f.cache)) // The refresh cache operation should complete within refreshCacheTimeout. ctx, cancel := context.WithTimeout(ctx, refreshCacheTimeout) defer cancel() for { result, err := f.singlePageRefresh(ctx, scrollID) if err != nil { f.clearScroll(ctx, scrollID) return err } for _, hit := range result.Hits.Hits { buffer = append(buffer, AgentConfig{ ServiceName: hit.Source.Service.Name, ServiceEnvironment: hit.Source.Service.Environment, AgentName: hit.Source.AgentName, Etag: hit.Source.ETag, Config: hit.Source.Settings, }) } scrollID = result.ScrollID if len(result.Hits.Hits) == 0 { break } } f.clearScroll(ctx, scrollID) f.mu.Lock() f.cache = buffer f.mu.Unlock() f.cacheInitialized.Store(true) f.last = time.Now() return nil } func (f *ElasticsearchFetcher) clearScroll(ctx context.Context, scrollID string) { resp, err := esapi.ClearScrollRequest{ ScrollID: []string{scrollID}, }.Do(ctx, f.client) if err != nil { f.logger.Warn(fmt.Sprintf("failed to clear scroll: %v", err)) return } if resp.IsError() { f.logger.Warn(fmt.Sprintf("clearscroll request returned error: %s", resp.Status())) } resp.Body.Close() } func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) { var result cacheResult var err error var resp *esapi.Response switch scrollID { case "": resp, err = esapi.SearchRequest{ Index: []string{ElasticsearchIndexName}, Size: &f.searchSize, Scroll: f.cacheDuration, }.Do(ctx, f.client) default: resp, err = esapi.ScrollRequest{ ScrollID: scrollID, Scroll: f.cacheDuration, }.Do(ctx, f.client) } if err != nil { return result, err } defer resp.Body.Close() if resp.StatusCode >= http.StatusBadRequest { // Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { f.invalidESCfg.Store(true) } bodyBytes, err := io.ReadAll(resp.Body) if err == nil { f.logger.Debug(fmt.Sprintf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))) } return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode) } return result, json.NewDecoder(resp.Body).Decode(&result) }