func()

in tracer.go [872:1316]


func (t *Tracer) loop() {
	ctx, cancelContext := context.WithCancel(context.Background())
	defer cancelContext()
	defer close(t.closed)
	defer atomic.StoreInt32(&t.active, 0)

	var req iochan.ReadRequest
	var requestBuf bytes.Buffer
	var metadata []byte
	var gracePeriod time.Duration = -1
	var flushed chan<- struct{}
	var requestBufTransactions, requestBufSpans, requestBufErrors, requestBufMetricsets uint64
	zlibWriter, _ := zlib.NewWriterLevel(&requestBuf, zlib.BestSpeed)
	zlibFlushed := true
	zlibClosed := false
	iochanReader := iochan.NewReader()
	requestBytesRead := 0
	requestActive := false
	closeRequest := false
	flushRequest := false
	requestResult := make(chan error, 1)
	requestTimer := time.NewTimer(0)
	requestTimerActive := false
	if !requestTimer.Stop() {
		<-requestTimer.C
	}

	// Run another goroutine to perform the blocking requests,
	// communicating with the tracer loop to obtain stream data.
	sendStreamRequest := make(chan time.Duration)
	done := make(chan struct{})
	defer func() {
		close(sendStreamRequest)
		<-done
	}()
	go func() {
		defer close(done)
		jitterRand := rand.New(rand.NewSource(time.Now().UnixNano()))
		for gracePeriod := range sendStreamRequest {
			if gracePeriod > 0 {
				select {
				case <-time.After(jitterDuration(gracePeriod, jitterRand, gracePeriodJitter)):
				case <-ctx.Done():
				}
			}
			requestResult <- t.transport.SendStream(ctx, iochanReader)
		}
	}()

	refreshServerVersionDeadline := 10 * time.Second
	refreshVersionTicker := time.NewTicker(refreshServerVersionDeadline)
	defer refreshVersionTicker.Stop()
	if t.versionGetter != nil {
		go t.maybeRefreshServerVersion(ctx, refreshServerVersionDeadline)
	} else {
		// If versionGetter is nil, stop the timer.
		refreshVersionTicker.Stop()
	}

	var breakdownMetricsLimitWarningLogged bool
	var stats TracerStats
	var metrics Metrics
	var sentMetrics chan<- struct{}
	var gatheringMetrics bool
	var metricsTimerStart time.Time
	metricsBuffer := ringbuffer.New(t.metricsBufferSize)
	gatheredMetrics := make(chan struct{}, 1)
	metricsTimer := time.NewTimer(0)
	if !metricsTimer.Stop() {
		<-metricsTimer.C
	}

	var lastConfigChange map[string]string
	var configChanges <-chan apmconfig.Change
	var stopConfigWatcher func()
	defer func() {
		if stopConfigWatcher != nil {
			stopConfigWatcher()
		}
	}()

	cpuProfilingState := newCPUProfilingState(t.profileSender)
	heapProfilingState := newHeapProfilingState(t.profileSender)

	var cfg tracerConfig
	buffer := ringbuffer.New(t.bufferSize)
	buffer.Evicted = func(h ringbuffer.BlockHeader) {
		switch h.Tag {
		case errorBlockTag:
			stats.ErrorsDropped++
		case spanBlockTag:
			stats.SpansDropped++
		case transactionBlockTag:
			stats.TransactionsDropped++
		}
	}
	modelWriter := modelWriter{
		buffer:        buffer,
		metricsBuffer: metricsBuffer,
		cfg:           &cfg,
		stats:         &stats,
	}

	handleTracerConfigCommand := func(cmd tracerConfigCommand) {
		var oldMetricsInterval time.Duration
		if cfg.recording {
			oldMetricsInterval = cfg.metricsInterval
		}
		cmd(&cfg)
		var metricsInterval, cpuProfileInterval, cpuProfileDuration, heapProfileInterval time.Duration
		if cfg.recording {
			metricsInterval = cfg.metricsInterval
			cpuProfileInterval = cfg.cpuProfileInterval
			cpuProfileDuration = cfg.cpuProfileDuration
			heapProfileInterval = cfg.heapProfileInterval
		}

		cpuProfilingState.updateConfig(cpuProfileInterval, cpuProfileDuration)
		heapProfilingState.updateConfig(heapProfileInterval, 0)
		if !gatheringMetrics && metricsInterval != oldMetricsInterval {
			if metricsTimerStart.IsZero() {
				if metricsInterval > 0 {
					metricsTimer.Reset(metricsInterval)
					metricsTimerStart = time.Now()
				}
			} else {
				if metricsInterval <= 0 {
					metricsTimerStart = time.Time{}
					if !metricsTimer.Stop() {
						<-metricsTimer.C
					}
				} else {
					alreadyPassed := time.Since(metricsTimerStart)
					if alreadyPassed >= metricsInterval {
						metricsTimer.Reset(0)
					} else {
						metricsTimer.Reset(metricsInterval - alreadyPassed)
					}
				}
			}
		}
	}

	for {
		var gatherMetrics bool
		select {
		case <-t.closing:
			cancelContext() // informs transport that EOF is expected
			iochanReader.CloseRead(io.EOF)
			return
		case cmd := <-t.configCommands:
			handleTracerConfigCommand(cmd)
			continue
		case cw := <-t.configWatcher:
			if configChanges != nil {
				stopConfigWatcher()
				t.updateRemoteConfig(cfg.logger, lastConfigChange, nil)
				lastConfigChange = nil
				configChanges = nil
			}
			if cw == nil {
				continue
			}
			var configWatcherContext context.Context
			var watchParams apmconfig.WatchParams
			watchParams.Service.Name = t.service.Name
			watchParams.Service.Environment = t.service.Environment
			configWatcherContext, stopConfigWatcher = context.WithCancel(ctx)
			configChanges = cw.WatchConfig(configWatcherContext, watchParams)
			// Silence go vet's "possible context leak" false positive.
			// We call a previous stopConfigWatcher before reassigning
			// the variable, and we have a defer at the top level of the
			// loop method that will call the final stopConfigWatcher
			// value on method exit.
			_ = stopConfigWatcher
			continue
		case change, ok := <-configChanges:
			if !ok {
				configChanges = nil
				continue
			}
			if change.Err != nil {
				if cfg.logger != nil {
					cfg.logger.Errorf("config request failed: %s", change.Err)
				}
			} else {
				t.updateRemoteConfig(cfg.logger, lastConfigChange, change.Attrs)
				lastConfigChange = change.Attrs
				handleTracerConfigCommand(func(cfg *tracerConfig) {
					cfg.recording = t.instrumentationConfig().recording
				})
			}
			continue
		case <-refreshVersionTicker.C:
			go t.maybeRefreshServerVersion(ctx, refreshServerVersionDeadline)
		case event := <-t.events:
			switch event.eventType {
			case transactionEvent:
				if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) {
					if !breakdownMetricsLimitWarningLogged && cfg.logger != nil {
						cfg.logger.Warningf("%s", breakdownMetricsLimitWarning)
						breakdownMetricsLimitWarningLogged = true
					}
				}
				// Drop unsampled transactions when the APM Server is >= 8.0
				drop := t.maybeDropTransaction(
					ctx, event.tx.TransactionData, event.tx.Sampled(),
				)
				if !drop {
					modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData)
				}
			case spanEvent:
				modelWriter.writeSpan(event.span.Span, event.span.SpanData)
			case errorEvent:
				modelWriter.writeError(event.err)
				// Flush the buffer to transmit the error immediately.
				flushRequest = true
			}
		case <-requestTimer.C:
			requestTimerActive = false
			closeRequest = true
		case <-metricsTimer.C:
			metricsTimerStart = time.Time{}
			gatherMetrics = !gatheringMetrics
		case sentMetrics = <-t.forceSendMetrics:
			if cfg.recording {
				if !metricsTimerStart.IsZero() {
					if !metricsTimer.Stop() {
						<-metricsTimer.C
					}
					metricsTimerStart = time.Time{}
				}
				gatherMetrics = !gatheringMetrics
			}
		case <-gatheredMetrics:
			modelWriter.writeMetrics(&metrics)
			gatheringMetrics = false
			flushRequest = true
			if cfg.recording && cfg.metricsInterval > 0 {
				metricsTimerStart = time.Now()
				metricsTimer.Reset(cfg.metricsInterval)
			}
		case <-cpuProfilingState.timer.C:
			cpuProfilingState.start(ctx, cfg.logger, t.metadataReader())
		case <-cpuProfilingState.finished:
			cpuProfilingState.resetTimer()
		case <-heapProfilingState.timer.C:
			heapProfilingState.start(ctx, cfg.logger, t.metadataReader())
		case <-heapProfilingState.finished:
			heapProfilingState.resetTimer()
		case flushed = <-t.forceFlush:
			// Drain any objects buffered in the channels.
			for n := len(t.events); n > 0; n-- {
				event := <-t.events
				switch event.eventType {
				case transactionEvent:
					if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) {
						if !breakdownMetricsLimitWarningLogged && cfg.logger != nil {
							cfg.logger.Warningf("%s", breakdownMetricsLimitWarning)
							breakdownMetricsLimitWarningLogged = true
						}
					}
					// Drop unsampled transactions when the APM Server is >= 8.0
					drop := t.maybeDropTransaction(
						ctx, event.tx.TransactionData, event.tx.Sampled(),
					)
					if !drop {
						modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData)
					}
				case spanEvent:
					modelWriter.writeSpan(event.span.Span, event.span.SpanData)
				case errorEvent:
					modelWriter.writeError(event.err)
				}
			}
			if !requestActive && buffer.Len() == 0 && metricsBuffer.Len() == 0 {
				flushed <- struct{}{}
				continue
			}
			closeRequest = true
		case req = <-iochanReader.C:
		case err := <-requestResult:
			if err != nil {
				stats.Errors.SendStream++
				gracePeriod = nextGracePeriod(gracePeriod)
				if cfg.logger != nil {
					logf := cfg.logger.Debugf
					if err, ok := err.(*transport.HTTPError); ok && err.Response.StatusCode == 404 {
						// 404 typically means the server is too old, meaning
						// the error is due to a misconfigured environment.
						logf = cfg.logger.Errorf
					}
					logf("request failed: %s (next request in ~%s)", err, gracePeriod)
				}
			} else {
				gracePeriod = -1 // Reset grace period after success.
				stats.TransactionsSent += requestBufTransactions
				stats.SpansSent += requestBufSpans
				stats.ErrorsSent += requestBufErrors
				if cfg.logger != nil {
					s := func(n uint64) string {
						if n != 1 {
							return "s"
						}
						return ""
					}
					cfg.logger.Debugf(
						"sent request with %d transaction%s, %d span%s, %d error%s, %d metricset%s",
						requestBufTransactions, s(requestBufTransactions),
						requestBufSpans, s(requestBufSpans),
						requestBufErrors, s(requestBufErrors),
						requestBufMetricsets, s(requestBufMetricsets),
					)
				}
			}
			if !stats.isZero() {
				t.stats.accumulate(stats)
				stats = TracerStats{}
			}
			if sentMetrics != nil && requestBufMetricsets > 0 {
				sentMetrics <- struct{}{}
				sentMetrics = nil
			}
			if flushed != nil {
				flushed <- struct{}{}
				flushed = nil
			}
			if req.Buf != nil {
				// req will be canceled by CloseRead below.
				req.Buf = nil
			}
			iochanReader.CloseRead(io.EOF)
			iochanReader = iochan.NewReader()
			flushRequest = false
			closeRequest = false
			requestActive = false
			requestBytesRead = 0
			requestBuf.Reset()
			requestBufTransactions = 0
			requestBufSpans = 0
			requestBufErrors = 0
			requestBufMetricsets = 0
			if requestTimerActive {
				if !requestTimer.Stop() {
					<-requestTimer.C
				}
				requestTimerActive = false
			}
		}

		if !stats.isZero() {
			t.stats.accumulate(stats)
			stats = TracerStats{}
		}

		if gatherMetrics {
			gatheringMetrics = true
			metrics.disabled = cfg.disabledMetrics
			t.gatherMetrics(ctx, cfg.metricsGatherers, &metrics, cfg.logger, gatheredMetrics)
			if cfg.logger != nil {
				cfg.logger.Debugf("gathering metrics")
			}
		}

		if !requestActive {
			if buffer.Len() == 0 && metricsBuffer.Len() == 0 {
				continue
			}
			sendStreamRequest <- gracePeriod
			if metadata == nil {
				metadata = t.jsonRequestMetadata()
			}
			zlibWriter.Reset(&requestBuf)
			zlibWriter.Write(metadata)
			zlibFlushed = false
			zlibClosed = false
			requestActive = true
			requestTimer.Reset(cfg.requestDuration)
			requestTimerActive = true
		}

		if !closeRequest || !zlibClosed {
			for requestBytesRead+requestBuf.Len() < cfg.requestSize {
				if metricsBuffer.Len() > 0 {
					if _, _, err := metricsBuffer.WriteBlockTo(zlibWriter); err == nil {
						requestBufMetricsets++
						zlibWriter.Write([]byte("\n"))
						zlibFlushed = false
						if sentMetrics != nil {
							// SendMetrics was called: close the request
							// off so we can inform the user when the
							// metrics have been processed.
							closeRequest = true
						}
					}
					continue
				}
				if buffer.Len() == 0 {
					break
				}
				if h, _, err := buffer.WriteBlockTo(zlibWriter); err == nil {
					switch h.Tag {
					case transactionBlockTag:
						requestBufTransactions++
					case spanBlockTag:
						requestBufSpans++
					case errorBlockTag:
						requestBufErrors++
					}
					zlibWriter.Write([]byte("\n"))
					zlibFlushed = false
				}
			}
			if !closeRequest {
				closeRequest = requestBytesRead+requestBuf.Len() >= cfg.requestSize
			}
		}
		if closeRequest {
			if !zlibClosed {
				zlibWriter.Close()
				zlibClosed = true
			}
		} else if flushRequest && !zlibFlushed {
			zlibWriter.Flush()
			flushRequest = false
			zlibFlushed = true
		}

		if req.Buf == nil || requestBuf.Len() == 0 {
			continue
		}
		const zlibHeaderLen = 2
		if requestBytesRead+requestBuf.Len() > zlibHeaderLen {
			n, err := requestBuf.Read(req.Buf)
			if closeRequest && err == nil && requestBuf.Len() == 0 {
				err = io.EOF
			}
			req.Respond(n, err)
			req.Buf = nil
			if n > 0 {
				requestBytesRead += n
			}
		}
	}
}