func()

in receiver/splunkhecreceiver/receiver.go [378:496]


func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) {
	ctx := req.Context()

	if req.Method != http.MethodPost {
		r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod)
		return
	}

	encoding := req.Header.Get(httpContentEncodingHeader)
	if encoding != "" && encoding != gzipEncoding {
		r.failRequest(resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, errInvalidEncoding)
		return
	}

	channelID, extracted := r.extractChannel(req)
	if extracted {
		if channelErr := r.validateChannelHeader(channelID); channelErr != nil {
			r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr)
			return
		}
	}

	bodyReader := req.Body
	if encoding == gzipEncoding {
		reader := r.gzipReaderPool.Get().(*gzip.Reader)
		err := reader.Reset(bodyReader)
		if err != nil {
			r.failRequest(resp, http.StatusBadRequest, errGzipReaderRespBody, err)
			return
		}
		bodyReader = reader
		defer r.gzipReaderPool.Put(reader)
	}

	if req.ContentLength == 0 {
		r.failRequest(resp, http.StatusBadRequest, noDataRespBody, nil)
		return
	}

	dec := jsoniter.NewDecoder(bodyReader)

	var events []*splunk.Event
	var metricEvents []*splunk.Event

	for dec.More() {
		var msg splunk.Event
		err := dec.Decode(&msg)
		if err != nil {
			r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err)
			return
		}

		for _, v := range msg.Fields {
			if !isFlatJSONField(v) {
				r.failRequest(resp, http.StatusBadRequest, []byte(fmt.Sprintf(responseErrHandlingIndexedFields, len(events)+len(metricEvents))), nil)
				return
			}
		}

		if msg.IsMetric() {
			if r.metricsConsumer == nil {
				r.failRequest(resp, http.StatusBadRequest, errUnsupportedMetricEvent, err)
				return
			}
			metricEvents = append(metricEvents, &msg)
		} else {
			if msg.Event == nil {
				r.failRequest(resp, http.StatusBadRequest, eventRequiredRespBody, nil)
				return
			}

			if msg.Event == "" {
				r.failRequest(resp, http.StatusBadRequest, eventBlankRespBody, nil)
				return
			}

			if r.logsConsumer == nil {
				r.failRequest(resp, http.StatusBadRequest, errUnsupportedLogEvent, err)
				return
			}
			events = append(events, &msg)
		}
	}
	resourceCustomizer := r.createResourceCustomizer(req)
	if r.logsConsumer != nil && len(events) > 0 {
		ld, err := splunkHecToLogData(r.settings.Logger, events, resourceCustomizer, r.config)
		if err != nil {
			r.failRequest(resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
			return
		}
		ctx = r.obsrecv.StartLogsOp(ctx)
		decodeErr := r.logsConsumer.ConsumeLogs(ctx, ld)
		r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), nil)
		if decodeErr != nil {
			r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
			return
		}
	}
	if r.metricsConsumer != nil && len(metricEvents) > 0 {
		md, _ := splunkHecToMetricsData(r.settings.Logger, metricEvents, resourceCustomizer, r.config)
		ctx = r.obsrecv.StartMetricsOp(ctx)
		decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md)
		r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), nil)
		if decodeErr != nil {
			r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
			return
		}
	}

	var ackErr error
	if len(channelID) > 0 && r.ackExt != nil {
		ackErr = r.processSuccessResponseWithAck(resp, channelID)
	} else {
		ackErr = r.processSuccessResponse(resp, okRespBody)
	}
	if ackErr != nil {
		r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, ackErr)
	}
}