in apmproxy/receiver.go [126:179]
func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
c.logger.Debug("Handling APM Data Intake")
rawBytes, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
c.logger.Errorf("Could not read agent intake request body: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
agentFlushed := r.URL.Query().Get("flushed") == "true"
if len(rawBytes) != 0 {
agentData := accumulator.APMData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
AgentInfo: r.UserAgent(),
}
select {
case c.AgentDataChannel <- agentData:
default:
c.logger.Warnf("Channel full: dropping a subset of agent data")
}
} else {
c.logger.Debugf("Received empy request from '%s'", r.UserAgent())
}
if agentFlushed {
c.flushMutex.Lock()
select {
case <-c.flushCh:
// the channel is closed.
// the extension received at least a flush request already but the
// data have not been flushed yet.
// We can reuse the closed channel.
default:
// no pending flush requests
// close the channel to signal a flush request has
// been received.
close(c.flushCh)
}
c.flushMutex.Unlock()
}
w.WriteHeader(http.StatusAccepted)
if _, err = w.Write([]byte("ok")); err != nil {
c.logger.Errorf("Failed to send intake response to APM agent : %v", err)
}
}
}