func()

in apmproxy/receiver.go [126:179]


func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		c.logger.Debug("Handling APM Data Intake")
		rawBytes, err := io.ReadAll(r.Body)
		defer r.Body.Close()
		if err != nil {
			c.logger.Errorf("Could not read agent intake request body: %v", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		agentFlushed := r.URL.Query().Get("flushed") == "true"

		if len(rawBytes) != 0 {
			agentData := accumulator.APMData{
				Data:            rawBytes,
				ContentEncoding: r.Header.Get("Content-Encoding"),
				AgentInfo:       r.UserAgent(),
			}

			select {
			case c.AgentDataChannel <- agentData:
			default:
				c.logger.Warnf("Channel full: dropping a subset of agent data")
			}
		} else {
			c.logger.Debugf("Received empy request from '%s'", r.UserAgent())
		}

		if agentFlushed {
			c.flushMutex.Lock()

			select {
			case <-c.flushCh:
				// the channel is closed.
				// the extension received at least a flush request already but the
				// data have not been flushed yet.
				// We can reuse the closed channel.
			default:
				// no pending flush requests
				// close the channel to signal a flush request has
				// been received.
				close(c.flushCh)
			}

			c.flushMutex.Unlock()
		}

		w.WriteHeader(http.StatusAccepted)
		if _, err = w.Write([]byte("ok")); err != nil {
			c.logger.Errorf("Failed to send intake response to APM agent : %v", err)
		}
	}
}