in x-pack/filebeat/input/http_endpoint/handler.go [85:231]
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
txID := h.nextTxID()
h.log.Debugw("request", "url", r.URL, "tx_id", txID)
status, err := h.validator.validateRequest(r)
if err != nil {
h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
return
}
wait, err := getTimeoutWait(r.URL, h.log)
if err != nil {
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err)
return
}
var (
acked chan struct{}
timeout *time.Timer
)
if h.maxInFlight != 0 {
// Consider non-ACKing messages as well. These do not add
// to the sum of in-flight bytes, but we can still assess
// whether a message would take us over the limit.
inFlight := h.inFlight.Load() + r.ContentLength
if inFlight > h.maxInFlight {
w.Header().Set(headerContentEncoding, "application/json")
w.Header().Set("Retry-After", strconv.Itoa(h.retryAfter))
w.WriteHeader(http.StatusServiceUnavailable)
_, err := fmt.Fprintf(w,
`{"warn":"max in flight message memory exceeded","max_in_flight":%d,"in_flight":%d}`,
h.maxInFlight, inFlight,
)
if err != nil {
h.log.Errorw("failed to write 503", "error", err)
}
return
}
}
if wait != 0 {
acked = make(chan struct{})
timeout = time.NewTimer(wait)
h.inFlight.Add(r.ContentLength)
defer func() {
// Any return will be a message handling completion and the
// the removal of the allocation from the queue assuming that
// the client has requested a timeout. Either we have an early
// error condition or timeout and the message is dropped, we
// have ACKed all the events in the request, or the input has
// been cancelled.
h.inFlight.Add(-r.ContentLength)
}()
}
start := time.Now()
acker := newBatchACKTracker(func() {
h.metrics.batchACKTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesACKedTotal.Inc()
if acked != nil {
close(acked)
}
})
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
if err != nil {
h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
defer body.Close()
if h.reqLogger != nil {
// If we are logging, keep a copy of the body for the logger.
// This is stashed in the r.Body field. This is only safe
// because we are closing the original body in a defer and
// r.Body is not otherwise referenced by the non-logging logic
// after the call to getBodyReader above.
var buf bytes.Buffer
body = io.NopCloser(io.TeeReader(body, &buf))
r.Body = io.NopCloser(&buf)
}
objs, status, err := httpReadJSON(body, h.program)
if err != nil {
h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
var headers map[string]interface{}
if len(h.includeHeaders) != 0 {
headers = getIncludedHeaders(r, h.includeHeaders)
}
var (
respCode int
respBody string
)
h.metrics.batchSize.Update(int64(len(objs)))
for _, obj := range objs {
var err error
if h.crc != nil {
respCode, respBody, err = h.crc.validate(obj)
if err == nil {
// CRC request processed
break
} else if !errors.Is(err, errNotCRC) {
h.metrics.apiErrors.Add(1)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err)
return
}
}
acker.Add()
if err = h.publishEvent(obj, headers, acker); err != nil {
h.metrics.apiErrors.Add(1)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusInternalServerError, err)
return
}
h.metrics.eventsPublished.Add(1)
respCode, respBody = h.responseCode, h.responseBody
}
acker.Ready()
if acked == nil {
h.sendResponse(w, respCode, respBody)
} else {
select {
case <-acked:
h.log.Debugw("request acked", "tx_id", txID)
if !timeout.Stop() {
<-timeout.C
}
h.sendResponse(w, respCode, respBody)
case <-timeout.C:
h.log.Debugw("request timed out", "tx_id", txID)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, errTookTooLong)
case <-h.ctx.Done():
h.log.Debugw("request context cancelled", "tx_id", txID)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err())
}
if h.reqLogger != nil {
h.logRequest(txID, r, respCode, nil)
}
}
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesPublished.Add(1)
}