func()

in collector/otlp/logs_transfer.go [68:156]


func (s *LogsService) Handler(w http.ResponseWriter, r *http.Request) {
	m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/v1/logs"})
	defer r.Body.Close()

	if !s.healthChecker.IsHealthy() {
		m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
		http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
		return
	}

	switch r.Header.Get("Content-Type") {
	case "application/x-protobuf":

		// Consume the request body and marshal into a protobuf
		b := gbp.Get(int(r.ContentLength))
		defer gbp.Put(b)

		n, err := io.ReadFull(r.Body, b)
		if err != nil {
			s.logger.Error("Failed to read request body", "Error", err)
			w.WriteHeader(http.StatusInternalServerError)
			m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
			return
		}
		if n < int(r.ContentLength) {
			s.logger.Warn("Short read")
			w.WriteHeader(http.StatusBadRequest)
			m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
			return
		}
		b = b[:n]

		if logger.IsDebug() {
			s.logger.Debug("Received request body", "Bytes", n)
		}

		msg := &v1.ExportLogsServiceRequest{}
		if err := proto.Unmarshal(b, msg); err != nil {
			s.logger.Error("Failed to unmarshal request body", "Error", err)
			w.WriteHeader(http.StatusBadRequest)
			m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
			return
		}

		logBatch := types.LogBatchPool.Get(1).(*types.LogBatch)
		logBatch.Reset()

		droppedLogMissingMetadata := s.convertToLogBatch(msg, logBatch)

		s.outputQueue <- logBatch

		// The logs have been committed by the OTLP endpoint
		resp := &v1.ExportLogsServiceResponse{}
		if droppedLogMissingMetadata > 0 {
			resp.SetPartialSuccess(&v1.ExportLogsPartialSuccess{
				RejectedLogRecords: droppedLogMissingMetadata,
				ErrorMessage:       "Logs lacking kube.database and kube.table attributes or body fields",
			})
			metrics.InvalidLogsDropped.WithLabelValues().Add(float64(droppedLogMissingMetadata))
		}

		respBodyBytes, err := proto.Marshal(resp)
		if err != nil {
			s.logger.Error("Failed to marshal response", "Error", err)
			w.WriteHeader(http.StatusInternalServerError)
			m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
			return
		}

		// Even in the case of a partial response, OTLP API requires us to send StatusOK
		w.Header().Add("Content-Type", "application/x-protobuf")
		w.WriteHeader(http.StatusOK)
		w.Write(respBodyBytes)
		m.WithLabelValues(strconv.Itoa(http.StatusOK)).Inc()

	case "application/json":
		// We're receiving JSON, so we need to unmarshal the JSON
		// into an OTLP protobuf, then use gRPC to send the OTLP
		// protobuf to the OTLP endpoint
		w.WriteHeader(http.StatusUnsupportedMediaType)
		m.WithLabelValues(strconv.Itoa(http.StatusUnsupportedMediaType)).Inc()

	default:
		logger.Errorf("Unsupported Content-Type: %s", r.Header.Get("Content-Type"))
		w.WriteHeader(http.StatusUnsupportedMediaType)
		m.WithLabelValues(strconv.Itoa(http.StatusUnsupportedMediaType)).Inc()
	}

}