in tracer.go [872:1316]
func (t *Tracer) loop() {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()
defer close(t.closed)
defer atomic.StoreInt32(&t.active, 0)
var req iochan.ReadRequest
var requestBuf bytes.Buffer
var metadata []byte
var gracePeriod time.Duration = -1
var flushed chan<- struct{}
var requestBufTransactions, requestBufSpans, requestBufErrors, requestBufMetricsets uint64
zlibWriter, _ := zlib.NewWriterLevel(&requestBuf, zlib.BestSpeed)
zlibFlushed := true
zlibClosed := false
iochanReader := iochan.NewReader()
requestBytesRead := 0
requestActive := false
closeRequest := false
flushRequest := false
requestResult := make(chan error, 1)
requestTimer := time.NewTimer(0)
requestTimerActive := false
if !requestTimer.Stop() {
<-requestTimer.C
}
// Run another goroutine to perform the blocking requests,
// communicating with the tracer loop to obtain stream data.
sendStreamRequest := make(chan time.Duration)
done := make(chan struct{})
defer func() {
close(sendStreamRequest)
<-done
}()
go func() {
defer close(done)
jitterRand := rand.New(rand.NewSource(time.Now().UnixNano()))
for gracePeriod := range sendStreamRequest {
if gracePeriod > 0 {
select {
case <-time.After(jitterDuration(gracePeriod, jitterRand, gracePeriodJitter)):
case <-ctx.Done():
}
}
requestResult <- t.transport.SendStream(ctx, iochanReader)
}
}()
refreshServerVersionDeadline := 10 * time.Second
refreshVersionTicker := time.NewTicker(refreshServerVersionDeadline)
defer refreshVersionTicker.Stop()
if t.versionGetter != nil {
go t.maybeRefreshServerVersion(ctx, refreshServerVersionDeadline)
} else {
// If versionGetter is nil, stop the timer.
refreshVersionTicker.Stop()
}
var breakdownMetricsLimitWarningLogged bool
var stats TracerStats
var metrics Metrics
var sentMetrics chan<- struct{}
var gatheringMetrics bool
var metricsTimerStart time.Time
metricsBuffer := ringbuffer.New(t.metricsBufferSize)
gatheredMetrics := make(chan struct{}, 1)
metricsTimer := time.NewTimer(0)
if !metricsTimer.Stop() {
<-metricsTimer.C
}
var lastConfigChange map[string]string
var configChanges <-chan apmconfig.Change
var stopConfigWatcher func()
defer func() {
if stopConfigWatcher != nil {
stopConfigWatcher()
}
}()
cpuProfilingState := newCPUProfilingState(t.profileSender)
heapProfilingState := newHeapProfilingState(t.profileSender)
var cfg tracerConfig
buffer := ringbuffer.New(t.bufferSize)
buffer.Evicted = func(h ringbuffer.BlockHeader) {
switch h.Tag {
case errorBlockTag:
stats.ErrorsDropped++
case spanBlockTag:
stats.SpansDropped++
case transactionBlockTag:
stats.TransactionsDropped++
}
}
modelWriter := modelWriter{
buffer: buffer,
metricsBuffer: metricsBuffer,
cfg: &cfg,
stats: &stats,
}
handleTracerConfigCommand := func(cmd tracerConfigCommand) {
var oldMetricsInterval time.Duration
if cfg.recording {
oldMetricsInterval = cfg.metricsInterval
}
cmd(&cfg)
var metricsInterval, cpuProfileInterval, cpuProfileDuration, heapProfileInterval time.Duration
if cfg.recording {
metricsInterval = cfg.metricsInterval
cpuProfileInterval = cfg.cpuProfileInterval
cpuProfileDuration = cfg.cpuProfileDuration
heapProfileInterval = cfg.heapProfileInterval
}
cpuProfilingState.updateConfig(cpuProfileInterval, cpuProfileDuration)
heapProfilingState.updateConfig(heapProfileInterval, 0)
if !gatheringMetrics && metricsInterval != oldMetricsInterval {
if metricsTimerStart.IsZero() {
if metricsInterval > 0 {
metricsTimer.Reset(metricsInterval)
metricsTimerStart = time.Now()
}
} else {
if metricsInterval <= 0 {
metricsTimerStart = time.Time{}
if !metricsTimer.Stop() {
<-metricsTimer.C
}
} else {
alreadyPassed := time.Since(metricsTimerStart)
if alreadyPassed >= metricsInterval {
metricsTimer.Reset(0)
} else {
metricsTimer.Reset(metricsInterval - alreadyPassed)
}
}
}
}
}
for {
var gatherMetrics bool
select {
case <-t.closing:
cancelContext() // informs transport that EOF is expected
iochanReader.CloseRead(io.EOF)
return
case cmd := <-t.configCommands:
handleTracerConfigCommand(cmd)
continue
case cw := <-t.configWatcher:
if configChanges != nil {
stopConfigWatcher()
t.updateRemoteConfig(cfg.logger, lastConfigChange, nil)
lastConfigChange = nil
configChanges = nil
}
if cw == nil {
continue
}
var configWatcherContext context.Context
var watchParams apmconfig.WatchParams
watchParams.Service.Name = t.service.Name
watchParams.Service.Environment = t.service.Environment
configWatcherContext, stopConfigWatcher = context.WithCancel(ctx)
configChanges = cw.WatchConfig(configWatcherContext, watchParams)
// Silence go vet's "possible context leak" false positive.
// We call a previous stopConfigWatcher before reassigning
// the variable, and we have a defer at the top level of the
// loop method that will call the final stopConfigWatcher
// value on method exit.
_ = stopConfigWatcher
continue
case change, ok := <-configChanges:
if !ok {
configChanges = nil
continue
}
if change.Err != nil {
if cfg.logger != nil {
cfg.logger.Errorf("config request failed: %s", change.Err)
}
} else {
t.updateRemoteConfig(cfg.logger, lastConfigChange, change.Attrs)
lastConfigChange = change.Attrs
handleTracerConfigCommand(func(cfg *tracerConfig) {
cfg.recording = t.instrumentationConfig().recording
})
}
continue
case <-refreshVersionTicker.C:
go t.maybeRefreshServerVersion(ctx, refreshServerVersionDeadline)
case event := <-t.events:
switch event.eventType {
case transactionEvent:
if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) {
if !breakdownMetricsLimitWarningLogged && cfg.logger != nil {
cfg.logger.Warningf("%s", breakdownMetricsLimitWarning)
breakdownMetricsLimitWarningLogged = true
}
}
// Drop unsampled transactions when the APM Server is >= 8.0
drop := t.maybeDropTransaction(
ctx, event.tx.TransactionData, event.tx.Sampled(),
)
if !drop {
modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData)
}
case spanEvent:
modelWriter.writeSpan(event.span.Span, event.span.SpanData)
case errorEvent:
modelWriter.writeError(event.err)
// Flush the buffer to transmit the error immediately.
flushRequest = true
}
case <-requestTimer.C:
requestTimerActive = false
closeRequest = true
case <-metricsTimer.C:
metricsTimerStart = time.Time{}
gatherMetrics = !gatheringMetrics
case sentMetrics = <-t.forceSendMetrics:
if cfg.recording {
if !metricsTimerStart.IsZero() {
if !metricsTimer.Stop() {
<-metricsTimer.C
}
metricsTimerStart = time.Time{}
}
gatherMetrics = !gatheringMetrics
}
case <-gatheredMetrics:
modelWriter.writeMetrics(&metrics)
gatheringMetrics = false
flushRequest = true
if cfg.recording && cfg.metricsInterval > 0 {
metricsTimerStart = time.Now()
metricsTimer.Reset(cfg.metricsInterval)
}
case <-cpuProfilingState.timer.C:
cpuProfilingState.start(ctx, cfg.logger, t.metadataReader())
case <-cpuProfilingState.finished:
cpuProfilingState.resetTimer()
case <-heapProfilingState.timer.C:
heapProfilingState.start(ctx, cfg.logger, t.metadataReader())
case <-heapProfilingState.finished:
heapProfilingState.resetTimer()
case flushed = <-t.forceFlush:
// Drain any objects buffered in the channels.
for n := len(t.events); n > 0; n-- {
event := <-t.events
switch event.eventType {
case transactionEvent:
if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) {
if !breakdownMetricsLimitWarningLogged && cfg.logger != nil {
cfg.logger.Warningf("%s", breakdownMetricsLimitWarning)
breakdownMetricsLimitWarningLogged = true
}
}
// Drop unsampled transactions when the APM Server is >= 8.0
drop := t.maybeDropTransaction(
ctx, event.tx.TransactionData, event.tx.Sampled(),
)
if !drop {
modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData)
}
case spanEvent:
modelWriter.writeSpan(event.span.Span, event.span.SpanData)
case errorEvent:
modelWriter.writeError(event.err)
}
}
if !requestActive && buffer.Len() == 0 && metricsBuffer.Len() == 0 {
flushed <- struct{}{}
continue
}
closeRequest = true
case req = <-iochanReader.C:
case err := <-requestResult:
if err != nil {
stats.Errors.SendStream++
gracePeriod = nextGracePeriod(gracePeriod)
if cfg.logger != nil {
logf := cfg.logger.Debugf
if err, ok := err.(*transport.HTTPError); ok && err.Response.StatusCode == 404 {
// 404 typically means the server is too old, meaning
// the error is due to a misconfigured environment.
logf = cfg.logger.Errorf
}
logf("request failed: %s (next request in ~%s)", err, gracePeriod)
}
} else {
gracePeriod = -1 // Reset grace period after success.
stats.TransactionsSent += requestBufTransactions
stats.SpansSent += requestBufSpans
stats.ErrorsSent += requestBufErrors
if cfg.logger != nil {
s := func(n uint64) string {
if n != 1 {
return "s"
}
return ""
}
cfg.logger.Debugf(
"sent request with %d transaction%s, %d span%s, %d error%s, %d metricset%s",
requestBufTransactions, s(requestBufTransactions),
requestBufSpans, s(requestBufSpans),
requestBufErrors, s(requestBufErrors),
requestBufMetricsets, s(requestBufMetricsets),
)
}
}
if !stats.isZero() {
t.stats.accumulate(stats)
stats = TracerStats{}
}
if sentMetrics != nil && requestBufMetricsets > 0 {
sentMetrics <- struct{}{}
sentMetrics = nil
}
if flushed != nil {
flushed <- struct{}{}
flushed = nil
}
if req.Buf != nil {
// req will be canceled by CloseRead below.
req.Buf = nil
}
iochanReader.CloseRead(io.EOF)
iochanReader = iochan.NewReader()
flushRequest = false
closeRequest = false
requestActive = false
requestBytesRead = 0
requestBuf.Reset()
requestBufTransactions = 0
requestBufSpans = 0
requestBufErrors = 0
requestBufMetricsets = 0
if requestTimerActive {
if !requestTimer.Stop() {
<-requestTimer.C
}
requestTimerActive = false
}
}
if !stats.isZero() {
t.stats.accumulate(stats)
stats = TracerStats{}
}
if gatherMetrics {
gatheringMetrics = true
metrics.disabled = cfg.disabledMetrics
t.gatherMetrics(ctx, cfg.metricsGatherers, &metrics, cfg.logger, gatheredMetrics)
if cfg.logger != nil {
cfg.logger.Debugf("gathering metrics")
}
}
if !requestActive {
if buffer.Len() == 0 && metricsBuffer.Len() == 0 {
continue
}
sendStreamRequest <- gracePeriod
if metadata == nil {
metadata = t.jsonRequestMetadata()
}
zlibWriter.Reset(&requestBuf)
zlibWriter.Write(metadata)
zlibFlushed = false
zlibClosed = false
requestActive = true
requestTimer.Reset(cfg.requestDuration)
requestTimerActive = true
}
if !closeRequest || !zlibClosed {
for requestBytesRead+requestBuf.Len() < cfg.requestSize {
if metricsBuffer.Len() > 0 {
if _, _, err := metricsBuffer.WriteBlockTo(zlibWriter); err == nil {
requestBufMetricsets++
zlibWriter.Write([]byte("\n"))
zlibFlushed = false
if sentMetrics != nil {
// SendMetrics was called: close the request
// off so we can inform the user when the
// metrics have been processed.
closeRequest = true
}
}
continue
}
if buffer.Len() == 0 {
break
}
if h, _, err := buffer.WriteBlockTo(zlibWriter); err == nil {
switch h.Tag {
case transactionBlockTag:
requestBufTransactions++
case spanBlockTag:
requestBufSpans++
case errorBlockTag:
requestBufErrors++
}
zlibWriter.Write([]byte("\n"))
zlibFlushed = false
}
}
if !closeRequest {
closeRequest = requestBytesRead+requestBuf.Len() >= cfg.requestSize
}
}
if closeRequest {
if !zlibClosed {
zlibWriter.Close()
zlibClosed = true
}
} else if flushRequest && !zlibFlushed {
zlibWriter.Flush()
flushRequest = false
zlibFlushed = true
}
if req.Buf == nil || requestBuf.Len() == 0 {
continue
}
const zlibHeaderLen = 2
if requestBytesRead+requestBuf.Len() > zlibHeaderLen {
n, err := requestBuf.Read(req.Buf)
if closeRequest && err == nil && requestBuf.Len() == 0 {
err = io.EOF
}
req.Respond(n, err)
req.Buf = nil
if n > 0 {
requestBytesRead += n
}
}
}
}