in receiver/elasticapmreceiver/receiver.go [157:206]
func (r *elasticAPMReceiver) newElasticAPMEventsHandler() http.HandlerFunc {
var (
// TODO make semaphore size configurable and/or find a different way
// to limit concurrency that fits better with OTel Collector.
sem = semaphore.NewWeighted(100)
// TODO make event size configurable
maxEventSize = 1024 * 1024 // 1MiB
// TODO make batch size configurable?
batchSize = 10
)
batchProcessor := modelpb.ProcessBatchFunc(r.processBatch)
elasticapmProcessor := elasticapm.NewProcessor(elasticapm.Config{
Logger: r.settings.Logger,
MaxEventSize: maxEventSize,
Semaphore: sem,
})
return func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusAccepted
var elasticapmResult elasticapm.Result
baseEvent := &modelpb.APMEvent{}
baseEvent.Event = &modelpb.Event{}
streamErr := elasticapmProcessor.HandleStream(
r.Context(),
baseEvent,
r.Body,
batchSize,
batchProcessor,
&elasticapmResult,
)
_ = streamErr
// TODO record metrics about errors?
var result struct {
Accepted int `json:"accepted"`
Errors []jsonError `json:"errors,omitempty"`
}
result.Accepted = elasticapmResult.Accepted
// TODO process elasticapmResult.Errors, add to result
// TODO process streamErr, conditionally add to result
// TODO process r.Context().Err(), conditionally add to result
w.WriteHeader(statusCode)
_ = json.NewEncoder(w).Encode(&result)
}
}