in internal/proxy/stub_es.go [73:154]
func (h stubES) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
auth, _ := strings.CutPrefix(req.Header.Get("Authorization"), "Basic ")
if len(h.auth) > 0 && string(auth) != h.auth {
h.logger.Error(
"authentication failed",
zap.String("actual", auth),
zap.String("expected", h.auth),
)
w.WriteHeader(http.StatusUnauthorized)
return
}
switch req.URL.Path {
case "/":
_, _ = w.Write(h.info)
return
case "/.apm-agent-configuration/_search":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{}`))
w.WriteHeader(http.StatusOK)
case "/_license":
_, _ = w.Write(h.license)
return
case "/_bulk":
first := true
var body io.Reader
switch req.Header.Get("Content-Encoding") {
case "gzip":
r, err := gzip.NewReader(req.Body)
if err != nil {
h.logger.Error("gzip reader err", zap.Error(err))
http.Error(w, fmt.Sprintf("reader error: %v", err), http.StatusInternalServerError)
return
}
defer r.Close()
body = r
case "zstd":
r, err := zstd.NewReader(req.Body)
if err != nil {
h.logger.Error("zstd reader err", zap.Error(err))
http.Error(w, fmt.Sprintf("reader error: %v", err), http.StatusInternalServerError)
return
}
defer r.Close()
body = r
default:
body = req.Body
}
jsonw := memPool.Get().(*bytes.Buffer)
defer func() {
jsonw.Reset()
memPool.Put(jsonw)
}()
_, _ = jsonw.Write([]byte(`{"items":[`))
scanner := bufio.NewScanner(body)
for scanner.Scan() {
// Action is always "create", skip decoding.
if !scanner.Scan() {
h.logger.Error("unexpected payload")
http.Error(w, "expected source", http.StatusBadRequest)
return
}
if first {
first = false
} else {
_ = jsonw.WriteByte(',')
}
jsonw.Write([]byte(`{"create":{"status":201}}`))
}
if err := scanner.Err(); err != nil {
h.logger.Error("scanner error", zap.Error(err))
http.Error(w, fmt.Sprintf("scanner error: %v", err), http.StatusBadRequest)
} else {
jsonw.Write([]byte(`]}`))
_, _ = w.Write(jsonw.Bytes())
}
default:
h.unknownPathCallback(w, req)
}
}