internal/sourcemap/elasticsearch.go (100 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package sourcemap
import (
"bytes"
"compress/zlib"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"github.com/go-sourcemap/sourcemap"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/logs"
)
type esFetcher struct {
client *elasticsearch.Client
index string
logger *logp.Logger
}
type esGetSourcemapResponse struct {
Found bool `json:"found"`
Source struct {
Service struct {
Name string `json:"name"`
Version string `json:"version"`
} `json:"service"`
File struct {
BundleFilepath string `json:"path"`
} `json:"file"`
Sourcemap string `json:"content"`
ContentHash string `json:"content_sha256"`
} `json:"_source"`
}
// NewElasticsearchFetcher returns a Fetcher for fetching source maps stored in Elasticsearch.
func NewElasticsearchFetcher(c *elasticsearch.Client, index string, logger *logp.Logger) Fetcher {
return &esFetcher{c, index, logger.Named(logs.Sourcemap)}
}
// Fetch fetches a source map from Elasticsearch.
func (s *esFetcher) Fetch(ctx context.Context, name, version, path string) (*sourcemap.Consumer, error) {
resp, err := s.runSearchQuery(ctx, name, version, path)
if err != nil {
var networkErr net.Error
if errors.As(err, &networkErr) {
return nil, fmt.Errorf("failed to reach elasticsearch: %w: %v ", errFetcherUnvailable, err)
}
return nil, fmt.Errorf("failure querying ES: %w", err)
}
defer resp.Body.Close()
// handle error response
if resp.StatusCode >= http.StatusMultipleChoices {
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read ES response body: %w", err)
}
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
// http.StatusNotFound -> the index is missing
// http.StatusForbidden -> we don't have permission to read from the index
// In both cases we consider the fetcher unavailable so that APM Server can
// fallback to other fetchers
return nil, fmt.Errorf("%w: %s: %s", errFetcherUnvailable, resp.Status, string(b))
}
return nil, fmt.Errorf("ES returned unknown status code: %s", resp.Status)
}
// parse response
body, err := parse(resp.Body, name, version, path, s.logger)
if err != nil {
return nil, err
}
if body == "" {
return nil, nil
}
decodedBody, err := base64.StdEncoding.DecodeString(body)
if err != nil {
return nil, fmt.Errorf("failed to base64 decode string: %w", err)
}
r, err := zlib.NewReader(bytes.NewReader(decodedBody))
if err != nil {
return nil, fmt.Errorf("failed to create zlib reader: %w", err)
}
defer r.Close()
uncompressedBody, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("failed to read sourcemap content: %w", err)
}
return parseSourceMap(uncompressedBody)
}
func (s *esFetcher) runSearchQuery(ctx context.Context, name, version, path string) (*http.Response, error) {
id := name + "-" + version + "-" + path
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/"+s.index+"/_doc/"+url.PathEscape(id), nil)
if err != nil {
return nil, err
}
return s.client.Perform(req)
}
func parse(body io.ReadCloser, name, version, path string, logger *logp.Logger) (string, error) {
var esSourcemapResponse esGetSourcemapResponse
if err := json.NewDecoder(body).Decode(&esSourcemapResponse); err != nil {
return "", fmt.Errorf("failed to decode sourcemap: %w", err)
}
if !esSourcemapResponse.Found {
return "", nil
}
return esSourcemapResponse.Source.Sourcemap, nil
}