internal/sourcemap/metadata_fetcher.go (298 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package sourcemap
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
"go.elastic.co/apm/v2"
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/elastic-agent-libs/logp"
)
type MetadataESFetcher struct {
esClient *elasticsearch.Client
index string
set map[identifier]string
alias map[identifier]*identifier
mu sync.RWMutex
logger *logp.Logger
init chan struct{}
initErr error
invalidationChan chan<- []identifier
tracer *apm.Tracer
}
func NewMetadataFetcher(
ctx context.Context,
esClient *elasticsearch.Client,
index string,
tracer *apm.Tracer,
logger *logp.Logger,
) (MetadataFetcher, <-chan []identifier) {
invalidationCh := make(chan []identifier)
s := &MetadataESFetcher{
esClient: esClient,
index: index,
set: make(map[identifier]string),
alias: make(map[identifier]*identifier),
logger: logger.Named(logs.Sourcemap),
init: make(chan struct{}),
invalidationChan: invalidationCh,
tracer: tracer,
}
s.startBackgroundSync(ctx)
return s, invalidationCh
}
func (s *MetadataESFetcher) getID(key identifier) (*identifier, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
if _, ok := s.set[key]; ok {
return &key, ok
}
// path is missing from the metadata cache (and ES).
// Is it an alias ?
// Try to retrieve the sourcemap from the alias map
i, ok := s.alias[key]
return i, ok
}
func (s *MetadataESFetcher) ready() <-chan struct{} {
return s.init
}
func (s *MetadataESFetcher) err() error {
select {
case <-s.ready():
s.mu.RLock()
defer s.mu.RUnlock()
return s.initErr
default:
return errors.New("metadata es fetcher not ready")
}
}
func (s *MetadataESFetcher) startBackgroundSync(ctx context.Context) {
go func() {
s.logger.Debug("populating metadata cache")
// First run, populate cache
if err := s.sync(ctx); err != nil {
s.initErr = fmt.Errorf("failed to populate sourcemap metadata: %w", err)
s.logger.Error(s.initErr)
}
close(s.init)
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
if err := s.sync(ctx); err != nil {
s.logger.Errorf("failed to sync sourcemaps metadata: %v", err)
}
case <-ctx.Done():
s.logger.Info("update routine done")
// close invalidation channel
close(s.invalidationChan)
return
}
}
}()
}
func (s *MetadataESFetcher) sync(ctx context.Context) error {
tx := s.tracer.StartTransaction("MetadataESFetcher.sync", "")
defer tx.End()
ctx = apm.ContextWithTransaction(ctx, tx)
sourcemaps := make(map[identifier]string)
result, err := s.initialSearch(ctx, sourcemaps)
if err != nil {
if e := apm.CaptureError(ctx, err); e != nil {
e.Send()
}
return err
}
scrollID := result.ScrollID
if scrollID == "" {
s.update(ctx, sourcemaps)
return nil
}
for {
result, err = s.scrollsearch(ctx, scrollID, sourcemaps)
if err != nil {
s.clearScroll(ctx, scrollID)
if e := apm.CaptureError(ctx, err); e != nil {
e.Send()
}
return fmt.Errorf("failed scroll search: %w", err)
}
// From the docs: The initial search request and each subsequent scroll
// request each return a _scroll_id. While the _scroll_id may change between
// requests, it doesn't always change - in any case, only the most recently
// received _scroll_id should be used.
if result.ScrollID != "" {
scrollID = result.ScrollID
}
// Stop if there are no new updates
if len(result.Hits.Hits) == 0 {
break
}
}
s.clearScroll(ctx, scrollID)
s.update(ctx, sourcemaps)
return nil
}
func (s *MetadataESFetcher) clearScroll(ctx context.Context, scrollID string) {
if scrollID == "" {
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, "/_search/scroll/"+scrollID, nil)
if err != nil {
s.logger.Warnf("failed to clear scroll: %v", err)
return
}
resp, err := s.esClient.Perform(req)
if err != nil {
s.logger.Warnf("failed to clear scroll: %v", err)
return
}
if resp.StatusCode > 299 {
s.logger.Warnf("clearscroll request returned error: %s", resp.Status)
}
resp.Body.Close()
}
func (s *MetadataESFetcher) update(ctx context.Context, sourcemaps map[identifier]string) {
span := apm.TransactionFromContext(ctx).StartSpan("MetadataESFetcher.update", "", nil)
defer span.End()
s.mu.Lock()
defer s.mu.Unlock()
var invalidation []identifier
for id, contentHash := range s.set {
if updatedHash, ok := sourcemaps[id]; ok {
if contentHash == updatedHash {
// already in the cache, remove from the updates.
delete(sourcemaps, id)
} else {
// content hash changed, invalidate the sourcemap cache
s.logger.Debugf("Hash changed: %s -> %s: invalidating %v", contentHash, updatedHash, id)
invalidation = append(invalidation, id)
}
} else {
// the sourcemap no longer exists in ES.
// invalidate the sourcemap cache.
invalidation = append(invalidation, id)
// the sourcemap no longer exists in ES.
// remove from metadata cache
delete(s.set, id)
// remove aliases
for _, k := range getAliases(id.name, id.version, id.path) {
delete(s.alias, k)
}
}
}
if len(invalidation) != 0 {
select {
case s.invalidationChan <- invalidation:
case <-ctx.Done():
s.logger.Debug("timed out while invalidating soucemaps")
}
}
// add new sourcemaps to the metadata cache.
for id, contentHash := range sourcemaps {
s.set[id] = contentHash
s.logger.Debugf("Added metadata id %v", id)
// store aliases with a pointer to the original id.
// The id is then passed over to the backend fetcher
// to minimize the size of the lru cache and
// and increase cache hits.
for _, k := range getAliases(id.name, id.version, id.path) {
s.logger.Debugf("Added metadata alias %v -> %v", k, id)
s.alias[k] = &id
}
}
s.initErr = nil
s.logger.Debugf("Metadata cache now has %d entries.", len(s.set))
}
func (s *MetadataESFetcher) initialSearch(ctx context.Context, updates map[identifier]string) (*esSearchSourcemapResponse, error) {
span := apm.TransactionFromContext(ctx).StartSpan("MetadataESFetcher.initialSearch", "", nil)
defer span.End()
resp, err := s.runSearchQuery(ctx)
if err != nil {
if e := apm.CaptureError(ctx, err); e != nil {
e.Send()
}
return nil, fmt.Errorf("failed to run initial search query: %w: %v", errFetcherUnvailable, err)
}
defer resp.Body.Close()
return s.handleUpdateRequest(resp, updates)
}
func (s *MetadataESFetcher) runSearchQuery(ctx context.Context) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/"+s.index+"/_search", nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
q.Set("_source", strings.Join([]string{"service.*", "file.path", "content_sha256"}, ","))
q.Set("track_total_hits", "true")
q.Set("scroll", strconv.FormatInt(time.Minute.Milliseconds(), 10)+"ms")
req.URL.RawQuery = q.Encode()
return s.esClient.Perform(req)
}
type esSearchSourcemapResponse struct {
ScrollID string `json:"_scroll_id"`
esSourcemapResponse
}
type esSourcemapResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
Source struct {
Service struct {
Name string `json:"name"`
Version string `json:"version"`
} `json:"service"`
File struct {
BundleFilepath string `json:"path"`
} `json:"file"`
Sourcemap string `json:"content"`
ContentHash string `json:"content_sha256"`
} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
func (s *MetadataESFetcher) handleUpdateRequest(resp *http.Response, updates map[identifier]string) (*esSearchSourcemapResponse, error) {
// handle error response
if resp.StatusCode >= http.StatusMultipleChoices {
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusUnauthorized {
return nil, fmt.Errorf("%w: %s: %s", errFetcherUnvailable, resp.Status, string(b))
}
return nil, fmt.Errorf("ES returned unknown status code: %s", resp.Status)
}
// parse response
body, err := parseResponse(resp.Body, s.logger)
if err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}
for _, v := range body.Hits.Hits {
id := identifier{
name: v.Source.Service.Name,
version: v.Source.Service.Version,
path: v.Source.File.BundleFilepath,
}
updates[id] = v.Source.ContentHash
}
return body, nil
}
func parseResponse(body io.ReadCloser, logger *logp.Logger) (*esSearchSourcemapResponse, error) {
b, err := io.ReadAll(body)
if err != nil {
return nil, err
}
var esSourcemapResponse esSearchSourcemapResponse
if err := json.Unmarshal(b, &esSourcemapResponse); err != nil {
return nil, err
}
return &esSourcemapResponse, nil
}
func (s *MetadataESFetcher) scrollsearch(ctx context.Context, scrollID string, updates map[identifier]string) (*esSearchSourcemapResponse, error) {
span := apm.TransactionFromContext(ctx).StartSpan("MetadataESFetcher.scrollSearch", "", nil)
defer span.End()
resp, err := s.runScrollSearchQuery(ctx, scrollID)
if err != nil {
return nil, fmt.Errorf("failed to run scroll search query: %w", err)
}
defer resp.Body.Close()
return s.handleUpdateRequest(resp, updates)
}
func (s *MetadataESFetcher) runScrollSearchQuery(ctx context.Context, id string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/_search/scroll", nil)
q := req.URL.Query()
q.Set("scroll", strconv.FormatInt(time.Minute.Milliseconds(), 10)+"ms")
q.Set("scroll_id", id)
req.URL.RawQuery = q.Encode()
if err != nil {
return nil, err
}
return s.esClient.Perform(req)
}