internal/proxy/stub_es.go (165 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
// Package proxy contains the code to build http proxy for apm.
package proxy
import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
"net/http"
"strings"
"sync"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"go.uber.org/zap"
)
const (
defaultLicense = `{"license":{"uid":"cc49813b-6b8e-2138-fbb8-243ae2b3deed","type":"enterprise","status":"active"}}`
defaultInfo = `{
"name": "instance-0000000001",
"cluster_name": "eca3b3c3bbee4816bb92f82184e328dd",
"cluster_uuid": "cc49813b-6b8e-2138-fbb8-243ae2b3deed",
"version": {
"number": "8.15.1",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "253e8544a65ad44581194068936f2a5d57c2c051",
"build_date": "2024-09-02T22:04:47.310170297Z",
"build_snapshot": false,
"lucene_version": "9.11.1",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0"
},
"tagline": "You Know, for Search"
}`
)
var memPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
type stubES struct {
logger *zap.Logger
auth string
info, license []byte
unknownPathCallback http.HandlerFunc
}
func NewHandlerStubES(options ...StubESOption) http.Handler {
h := new(stubES)
options = append([]StubESOption{
StubESWithLogger(zap.NewNop()),
StubESWithInfo(defaultInfo),
StubESWithLicense(defaultLicense),
StubESWithUnknownPathCallback(func(w http.ResponseWriter, req *http.Request) {
h.logger.Error("unknown path", zap.String("path", req.URL.Path))
}),
}, options...)
for _, opt := range options {
opt(h)
}
return h
}
func (h stubES) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
auth, _ := strings.CutPrefix(req.Header.Get("Authorization"), "Basic ")
if len(h.auth) > 0 && string(auth) != h.auth {
h.logger.Error(
"authentication failed",
zap.String("actual", auth),
zap.String("expected", h.auth),
)
w.WriteHeader(http.StatusUnauthorized)
return
}
switch req.URL.Path {
case "/":
_, _ = w.Write(h.info)
return
case "/.apm-agent-configuration/_search":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{}`))
w.WriteHeader(http.StatusOK)
case "/_license":
_, _ = w.Write(h.license)
return
case "/_bulk":
first := true
var body io.Reader
switch req.Header.Get("Content-Encoding") {
case "gzip":
r, err := gzip.NewReader(req.Body)
if err != nil {
h.logger.Error("gzip reader err", zap.Error(err))
http.Error(w, fmt.Sprintf("reader error: %v", err), http.StatusInternalServerError)
return
}
defer r.Close()
body = r
case "zstd":
r, err := zstd.NewReader(req.Body)
if err != nil {
h.logger.Error("zstd reader err", zap.Error(err))
http.Error(w, fmt.Sprintf("reader error: %v", err), http.StatusInternalServerError)
return
}
defer r.Close()
body = r
default:
body = req.Body
}
jsonw := memPool.Get().(*bytes.Buffer)
defer func() {
jsonw.Reset()
memPool.Put(jsonw)
}()
_, _ = jsonw.Write([]byte(`{"items":[`))
scanner := bufio.NewScanner(body)
for scanner.Scan() {
// Action is always "create", skip decoding.
if !scanner.Scan() {
h.logger.Error("unexpected payload")
http.Error(w, "expected source", http.StatusBadRequest)
return
}
if first {
first = false
} else {
_ = jsonw.WriteByte(',')
}
jsonw.Write([]byte(`{"create":{"status":201}}`))
}
if err := scanner.Err(); err != nil {
h.logger.Error("scanner error", zap.Error(err))
http.Error(w, fmt.Sprintf("scanner error: %v", err), http.StatusBadRequest)
} else {
jsonw.Write([]byte(`]}`))
_, _ = w.Write(jsonw.Bytes())
}
default:
h.unknownPathCallback(w, req)
}
}
type StubESOption func(*stubES)
func StubESWithLogger(logger *zap.Logger) StubESOption {
return func(h *stubES) {
h.logger = logger
}
}
func StubESWithAuth(username, password string) StubESOption {
return func(h *stubES) {
h.auth = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password)))
}
}
func StubESWithInfo(info string) StubESOption {
return func(h *stubES) {
h.info = []byte(info)
}
}
func StubESWithLicense(license string) StubESOption {
return func(h *stubES) {
h.license = []byte(license)
}
}
func StubESWithUnknownPathCallback(callback http.HandlerFunc) StubESOption {
return func(h *stubES) {
h.unknownPathCallback = callback
}
}