in collector/otlp/logs_transfer.go [68:156]
func (s *LogsService) Handler(w http.ResponseWriter, r *http.Request) {
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/v1/logs"})
defer r.Body.Close()
if !s.healthChecker.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}
switch r.Header.Get("Content-Type") {
case "application/x-protobuf":
// Consume the request body and marshal into a protobuf
b := gbp.Get(int(r.ContentLength))
defer gbp.Put(b)
n, err := io.ReadFull(r.Body, b)
if err != nil {
s.logger.Error("Failed to read request body", "Error", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
if n < int(r.ContentLength) {
s.logger.Warn("Short read")
w.WriteHeader(http.StatusBadRequest)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
return
}
b = b[:n]
if logger.IsDebug() {
s.logger.Debug("Received request body", "Bytes", n)
}
msg := &v1.ExportLogsServiceRequest{}
if err := proto.Unmarshal(b, msg); err != nil {
s.logger.Error("Failed to unmarshal request body", "Error", err)
w.WriteHeader(http.StatusBadRequest)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
return
}
logBatch := types.LogBatchPool.Get(1).(*types.LogBatch)
logBatch.Reset()
droppedLogMissingMetadata := s.convertToLogBatch(msg, logBatch)
s.outputQueue <- logBatch
// The logs have been committed by the OTLP endpoint
resp := &v1.ExportLogsServiceResponse{}
if droppedLogMissingMetadata > 0 {
resp.SetPartialSuccess(&v1.ExportLogsPartialSuccess{
RejectedLogRecords: droppedLogMissingMetadata,
ErrorMessage: "Logs lacking kube.database and kube.table attributes or body fields",
})
metrics.InvalidLogsDropped.WithLabelValues().Add(float64(droppedLogMissingMetadata))
}
respBodyBytes, err := proto.Marshal(resp)
if err != nil {
s.logger.Error("Failed to marshal response", "Error", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
// Even in the case of a partial response, OTLP API requires us to send StatusOK
w.Header().Add("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
w.Write(respBodyBytes)
m.WithLabelValues(strconv.Itoa(http.StatusOK)).Inc()
case "application/json":
// We're receiving JSON, so we need to unmarshal the JSON
// into an OTLP protobuf, then use gRPC to send the OTLP
// protobuf to the OTLP endpoint
w.WriteHeader(http.StatusUnsupportedMediaType)
m.WithLabelValues(strconv.Itoa(http.StatusUnsupportedMediaType)).Inc()
default:
logger.Errorf("Unsupported Content-Type: %s", r.Header.Get("Content-Type"))
w.WriteHeader(http.StatusUnsupportedMediaType)
m.WithLabelValues(strconv.Itoa(http.StatusUnsupportedMediaType)).Inc()
}
}