in internal/agentcfg/elasticsearch.go [224:274]
func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
span, ctx := apm.StartSpan(ctx, "ElasticsearchFetcher.refreshCache", "")
defer span.End()
scrollID := ""
buffer := make([]AgentConfig, 0, len(f.cache))
// The refresh cache operation should complete within refreshCacheTimeout.
ctx, cancel := context.WithTimeout(ctx, refreshCacheTimeout)
defer cancel()
defer func() {
if err != nil {
f.esCacheRefreshFailures.Add(context.Background(), 1)
} else {
f.esCacheRefreshSuccesses.Add(context.Background(), 1)
}
}()
for {
result, err := f.singlePageRefresh(ctx, scrollID)
if err != nil {
f.clearScroll(ctx, scrollID)
return err
}
for _, hit := range result.Hits.Hits {
buffer = append(buffer, AgentConfig{
ServiceName: hit.Source.Service.Name,
ServiceEnvironment: hit.Source.Service.Environment,
AgentName: hit.Source.AgentName,
Etag: hit.Source.ETag,
Config: hit.Source.Settings,
})
}
scrollID = result.ScrollID
if len(result.Hits.Hits) == 0 {
break
}
}
f.clearScroll(ctx, scrollID)
f.mu.Lock()
f.cache = buffer
f.mu.Unlock()
f.cacheInitialized.Store(true)
f.esCacheEntriesCount.Record(context.Background(), int64(len(f.cache)))
f.last = time.Now()
return nil
}