in x-pack/apm-server/sampling/processor.go [279:499]
func (p *Processor) Run() error {
defer func() {
p.stopMu.Lock()
defer p.stopMu.Unlock()
select {
case <-p.stopped:
default:
close(p.stopped)
}
}()
// NOTE(axw) the user can configure the tail-sampling flush interval,
// but cannot directly control the bulk indexing flush interval. The
// bulk indexing is expected to complete soon after the tail-sampling
// flush interval.
bulkIndexerFlushInterval := 5 * time.Second
if bulkIndexerFlushInterval > p.config.FlushInterval {
bulkIndexerFlushInterval = p.config.FlushInterval
}
initialSubscriberPosition := readSubscriberPosition(p.logger, p.config.DB)
subscriberPositions := make(chan pubsub.SubscriberPosition)
pubsub, err := pubsub.New(pubsub.Config{
ServerID: p.config.UUID,
Client: p.config.Elasticsearch,
DataStream: pubsub.DataStreamConfig(p.config.SampledTracesDataStream),
Logger: p.logger,
// Issue pubsub subscriber search requests at twice the frequency
// of publishing, so each server observes each other's sampled
// trace IDs soon after they are published.
SearchInterval: p.config.FlushInterval / 2,
FlushInterval: bulkIndexerFlushInterval,
})
if err != nil {
return err
}
remoteSampledTraceIDs := make(chan string)
localSampledTraceIDs := make(chan string)
publishSampledTraceIDs := make(chan string)
gracefulContext, cancelGracefulContext := context.WithCancel(context.Background())
defer cancelGracefulContext()
var g errgroup.Group
g.Go(func() error {
// Write subscriber position to a file on disk, to support resuming
// on apm-server restart without reprocessing all indices. We trigger
// the graceful shutdown from this goroutine to ensure we do not
// write any subscriber positions after Stop is called, and risk
// having a new subscriber miss events.
for {
select {
case <-p.stopping:
time.AfterFunc(shutdownGracePeriod, cancelGracefulContext)
return context.Canceled
case pos := <-subscriberPositions:
if err := writeSubscriberPosition(p.config.DB, pos); err != nil {
p.rateLimitedLogger.With(logp.Error(err)).With(logp.Reflect("position", pos)).Warnf(
"failed to write subscriber position: %s", err,
)
}
}
}
})
g.Go(func() error {
return p.config.DB.Run(p.stopping, p.config.TTL)
})
g.Go(func() error {
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
// Stop is called. But it is possible that both old and new subscriber goroutines
// run concurrently, before the old one eventually receives the Stop call.
// The next subscriber will pick up from the previous position.
defer close(remoteSampledTraceIDs)
defer close(subscriberPositions)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer cancel()
select {
case <-p.stopping:
case <-p.stopped:
}
}()
return pubsub.SubscribeSampledTraceIDs(
ctx, initialSubscriberPosition, remoteSampledTraceIDs, subscriberPositions,
)
})
g.Go(func() error {
// Publish locally sampled trace IDs to Elasticsearch. This is cancelled when
// publishSampledTraceIDs is closed, after the final reservoir flush.
return pubsub.PublishSampledTraceIDs(gracefulContext, publishSampledTraceIDs)
})
g.Go(func() error {
ticker := time.NewTicker(p.config.FlushInterval)
defer ticker.Stop()
var traceIDs []string
// Close publishSampledTraceIDs and localSampledTraceIDs after returning,
// which implies that either all decisions have been published or the grace
// period has elapsed. This will unblock the PublishSampledTraceIDs call above,
// and the event indexing goroutine below.
defer close(publishSampledTraceIDs)
defer close(localSampledTraceIDs)
publishDecisions := func() error {
p.logger.Debug("finalizing local sampling reservoirs")
traceIDs = p.groups.finalizeSampledTraces(traceIDs)
if len(traceIDs) == 0 {
return nil
}
var g errgroup.Group
g.Go(func() error { return sendTraceIDs(gracefulContext, publishSampledTraceIDs, traceIDs) })
g.Go(func() error { return sendTraceIDs(gracefulContext, localSampledTraceIDs, traceIDs) })
if err := g.Wait(); err != nil {
return err
}
traceIDs = traceIDs[:0]
return nil
}
for {
select {
case <-p.stopping:
return publishDecisions()
case <-ticker.C:
if err := publishDecisions(); err != nil {
return err
}
}
}
})
g.Go(func() error {
var events modelpb.Batch
// TODO(axw) pace the publishing over the flush interval?
// Alternatively we can rely on backpressure from the reporter,
// removing the artificial one second timeout from publisher code
// and just waiting as long as it takes here.
remoteSampledTraceIDs := remoteSampledTraceIDs
localSampledTraceIDs := localSampledTraceIDs
for {
if remoteSampledTraceIDs == nil && localSampledTraceIDs == nil {
// The pubsub subscriber and reservoir finalizer have
// both stopped, so there's nothing else to do.
return nil
}
var remoteDecision bool
var traceID string
var ok bool
select {
case <-gracefulContext.Done():
return gracefulContext.Err()
case traceID, ok = <-remoteSampledTraceIDs:
if !ok {
remoteSampledTraceIDs = nil
continue
}
p.logger.Debug("received remotely sampled trace ID")
remoteDecision = true
case traceID, ok = <-localSampledTraceIDs:
if !ok {
localSampledTraceIDs = nil
continue
}
}
if err := p.eventStore.WriteTraceSampled(traceID, true); err != nil {
p.rateLimitedLogger.Warnf(
"received error writing sampled trace: %s", err,
)
}
events = events[:0]
if err := p.eventStore.ReadTraceEvents(traceID, &events); err != nil {
p.rateLimitedLogger.Warnf(
"received error reading trace events: %s", err,
)
continue
}
if n := len(events); n > 0 {
p.logger.Debugf("reporting %d events", n)
if remoteDecision {
// Remote decisions may be received multiple times,
// e.g. if this server restarts and resubscribes to
// remote sampling decisions before they have been
// deleted. We delete events from local storage so
// we don't publish duplicates; delivery is therefore
// at-most-once, not guaranteed.
//
// TODO(carsonip): pebble supports range deletes and may be better than
// deleting events separately, but as we do not use transactions, it is
// possible to race and delete something that is not read.
for _, event := range events {
switch event.Type() {
case modelpb.TransactionEventType:
if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Transaction.Id); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to delete transaction from local storage")
}
case modelpb.SpanEventType:
if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Span.Id); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to delete span from local storage")
}
}
}
}
p.eventMetrics.sampled.Add(gracefulContext, int64(len(events)))
if err := p.config.BatchProcessor.ProcessBatch(gracefulContext, &events); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to report events")
}
for i := range events {
events[i] = nil // not required but ensure that there is no ref to the freed event
}
}
}
})
if err := g.Wait(); err != nil && err != context.Canceled {
return err
}
return nil
}