in receiver/splunkhecreceiver/receiver.go [378:496]
func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if req.Method != http.MethodPost {
r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod)
return
}
encoding := req.Header.Get(httpContentEncodingHeader)
if encoding != "" && encoding != gzipEncoding {
r.failRequest(resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, errInvalidEncoding)
return
}
channelID, extracted := r.extractChannel(req)
if extracted {
if channelErr := r.validateChannelHeader(channelID); channelErr != nil {
r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr)
return
}
}
bodyReader := req.Body
if encoding == gzipEncoding {
reader := r.gzipReaderPool.Get().(*gzip.Reader)
err := reader.Reset(bodyReader)
if err != nil {
r.failRequest(resp, http.StatusBadRequest, errGzipReaderRespBody, err)
return
}
bodyReader = reader
defer r.gzipReaderPool.Put(reader)
}
if req.ContentLength == 0 {
r.failRequest(resp, http.StatusBadRequest, noDataRespBody, nil)
return
}
dec := jsoniter.NewDecoder(bodyReader)
var events []*splunk.Event
var metricEvents []*splunk.Event
for dec.More() {
var msg splunk.Event
err := dec.Decode(&msg)
if err != nil {
r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err)
return
}
for _, v := range msg.Fields {
if !isFlatJSONField(v) {
r.failRequest(resp, http.StatusBadRequest, []byte(fmt.Sprintf(responseErrHandlingIndexedFields, len(events)+len(metricEvents))), nil)
return
}
}
if msg.IsMetric() {
if r.metricsConsumer == nil {
r.failRequest(resp, http.StatusBadRequest, errUnsupportedMetricEvent, err)
return
}
metricEvents = append(metricEvents, &msg)
} else {
if msg.Event == nil {
r.failRequest(resp, http.StatusBadRequest, eventRequiredRespBody, nil)
return
}
if msg.Event == "" {
r.failRequest(resp, http.StatusBadRequest, eventBlankRespBody, nil)
return
}
if r.logsConsumer == nil {
r.failRequest(resp, http.StatusBadRequest, errUnsupportedLogEvent, err)
return
}
events = append(events, &msg)
}
}
resourceCustomizer := r.createResourceCustomizer(req)
if r.logsConsumer != nil && len(events) > 0 {
ld, err := splunkHecToLogData(r.settings.Logger, events, resourceCustomizer, r.config)
if err != nil {
r.failRequest(resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
return
}
ctx = r.obsrecv.StartLogsOp(ctx)
decodeErr := r.logsConsumer.ConsumeLogs(ctx, ld)
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), nil)
if decodeErr != nil {
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
return
}
}
if r.metricsConsumer != nil && len(metricEvents) > 0 {
md, _ := splunkHecToMetricsData(r.settings.Logger, metricEvents, resourceCustomizer, r.config)
ctx = r.obsrecv.StartMetricsOp(ctx)
decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md)
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), nil)
if decodeErr != nil {
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
return
}
}
var ackErr error
if len(channelID) > 0 && r.ackExt != nil {
ackErr = r.processSuccessResponseWithAck(resp, channelID)
} else {
ackErr = r.processSuccessResponse(resp, okRespBody)
}
if ackErr != nil {
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, ackErr)
}
}