func()

in x-pack/filebeat/input/http_endpoint/handler.go [85:231]


func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	txID := h.nextTxID()
	h.log.Debugw("request", "url", r.URL, "tx_id", txID)
	status, err := h.validator.validateRequest(r)
	if err != nil {
		h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
		return
	}

	wait, err := getTimeoutWait(r.URL, h.log)
	if err != nil {
		h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err)
		return
	}
	var (
		acked   chan struct{}
		timeout *time.Timer
	)
	if h.maxInFlight != 0 {
		// Consider non-ACKing messages as well. These do not add
		// to the sum of in-flight bytes, but we can still assess
		// whether a message would take us over the limit.
		inFlight := h.inFlight.Load() + r.ContentLength
		if inFlight > h.maxInFlight {
			w.Header().Set(headerContentEncoding, "application/json")
			w.Header().Set("Retry-After", strconv.Itoa(h.retryAfter))
			w.WriteHeader(http.StatusServiceUnavailable)
			_, err := fmt.Fprintf(w,
				`{"warn":"max in flight message memory exceeded","max_in_flight":%d,"in_flight":%d}`,
				h.maxInFlight, inFlight,
			)
			if err != nil {
				h.log.Errorw("failed to write 503", "error", err)
			}
			return
		}
	}
	if wait != 0 {
		acked = make(chan struct{})
		timeout = time.NewTimer(wait)
		h.inFlight.Add(r.ContentLength)
		defer func() {
			// Any return will be a message handling completion and the
			// the removal of the allocation from the queue assuming that
			// the client has requested a timeout. Either we have an early
			// error condition or timeout and the message is dropped, we
			// have ACKed all the events in the request, or the input has
			// been cancelled.
			h.inFlight.Add(-r.ContentLength)
		}()
	}
	start := time.Now()
	acker := newBatchACKTracker(func() {
		h.metrics.batchACKTime.Update(time.Since(start).Nanoseconds())
		h.metrics.batchesACKedTotal.Inc()
		if acked != nil {
			close(acked)
		}
	})
	h.metrics.batchesReceived.Add(1)
	h.metrics.contentLength.Update(r.ContentLength)
	body, status, err := getBodyReader(r)
	if err != nil {
		h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
		h.metrics.apiErrors.Add(1)
		return
	}
	defer body.Close()

	if h.reqLogger != nil {
		// If we are logging, keep a copy of the body for the logger.
		// This is stashed in the r.Body field. This is only safe
		// because we are closing the original body in a defer and
		// r.Body is not otherwise referenced by the non-logging logic
		// after the call to getBodyReader above.
		var buf bytes.Buffer
		body = io.NopCloser(io.TeeReader(body, &buf))
		r.Body = io.NopCloser(&buf)
	}

	objs, status, err := httpReadJSON(body, h.program)
	if err != nil {
		h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
		h.metrics.apiErrors.Add(1)
		return
	}

	var headers map[string]interface{}
	if len(h.includeHeaders) != 0 {
		headers = getIncludedHeaders(r, h.includeHeaders)
	}

	var (
		respCode int
		respBody string
	)

	h.metrics.batchSize.Update(int64(len(objs)))
	for _, obj := range objs {
		var err error
		if h.crc != nil {
			respCode, respBody, err = h.crc.validate(obj)
			if err == nil {
				// CRC request processed
				break
			} else if !errors.Is(err, errNotCRC) {
				h.metrics.apiErrors.Add(1)
				h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err)
				return
			}
		}

		acker.Add()
		if err = h.publishEvent(obj, headers, acker); err != nil {
			h.metrics.apiErrors.Add(1)
			h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusInternalServerError, err)
			return
		}
		h.metrics.eventsPublished.Add(1)
		respCode, respBody = h.responseCode, h.responseBody
	}

	acker.Ready()
	if acked == nil {
		h.sendResponse(w, respCode, respBody)
	} else {
		select {
		case <-acked:
			h.log.Debugw("request acked", "tx_id", txID)
			if !timeout.Stop() {
				<-timeout.C
			}
			h.sendResponse(w, respCode, respBody)
		case <-timeout.C:
			h.log.Debugw("request timed out", "tx_id", txID)
			h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, errTookTooLong)
		case <-h.ctx.Done():
			h.log.Debugw("request context cancelled", "tx_id", txID)
			h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err())
		}
		if h.reqLogger != nil {
			h.logRequest(txID, r, respCode, nil)
		}
	}
	h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
	h.metrics.batchesPublished.Add(1)
}