in apmproxy/apmserver.go [50:81]
func (c *Client) ForwardApmData(ctx context.Context) error {
if c.IsUnhealthy() {
c.logger.Warn("Failed to start APM data forwarder due to client unhealthy")
return nil
}
var lambdaDataChan chan []byte
for {
select {
case <-ctx.Done():
c.logger.Debug("Invocation context canceled, not processing any more agent data")
return nil
case data := <-c.AgentDataChannel:
if len(data.Data) == 0 {
c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo)
continue
}
if err := c.forwardAgentData(ctx, data); err != nil {
return err
}
if lambdaDataChan == nil {
// With the first successful request to c.forwardAgent Data() metadata should be
// available and processing data from c.LambdaDataChannel can start.
lambdaDataChan = c.LambdaDataChannel
c.logger.Debug("Assigned Lambda data channel")
}
case data := <-lambdaDataChan:
if err := c.forwardLambdaData(ctx, data); err != nil {
return err
}
}
}
}