func()

in receiver/elasticapmreceiver/receiver.go [157:206]


func (r *elasticAPMReceiver) newElasticAPMEventsHandler() http.HandlerFunc {
	var (
		// TODO make semaphore size configurable and/or find a different way
		// to limit concurrency that fits better with OTel Collector.
		sem = semaphore.NewWeighted(100)

		// TODO make event size configurable
		maxEventSize = 1024 * 1024 // 1MiB

		// TODO make batch size configurable?
		batchSize = 10
	)

	batchProcessor := modelpb.ProcessBatchFunc(r.processBatch)
	elasticapmProcessor := elasticapm.NewProcessor(elasticapm.Config{
		Logger:       r.settings.Logger,
		MaxEventSize: maxEventSize,
		Semaphore:    sem,
	})

	return func(w http.ResponseWriter, r *http.Request) {
		statusCode := http.StatusAccepted

		var elasticapmResult elasticapm.Result
		baseEvent := &modelpb.APMEvent{}
		baseEvent.Event = &modelpb.Event{}
		streamErr := elasticapmProcessor.HandleStream(
			r.Context(),
			baseEvent,
			r.Body,
			batchSize,
			batchProcessor,
			&elasticapmResult,
		)
		_ = streamErr
		// TODO record metrics about errors?

		var result struct {
			Accepted int         `json:"accepted"`
			Errors   []jsonError `json:"errors,omitempty"`
		}
		result.Accepted = elasticapmResult.Accepted
		// TODO process elasticapmResult.Errors, add to result
		// TODO process streamErr, conditionally add to result
		// TODO process r.Context().Err(), conditionally add to result

		w.WriteHeader(statusCode)
		_ = json.NewEncoder(w).Encode(&result)
	}
}