func()

in x-pack/apm-server/sampling/processor.go [279:499]


func (p *Processor) Run() error {
	defer func() {
		p.stopMu.Lock()
		defer p.stopMu.Unlock()
		select {
		case <-p.stopped:
		default:
			close(p.stopped)
		}
	}()

	// NOTE(axw) the user can configure the tail-sampling flush interval,
	// but cannot directly control the bulk indexing flush interval. The
	// bulk indexing is expected to complete soon after the tail-sampling
	// flush interval.
	bulkIndexerFlushInterval := 5 * time.Second
	if bulkIndexerFlushInterval > p.config.FlushInterval {
		bulkIndexerFlushInterval = p.config.FlushInterval
	}

	initialSubscriberPosition := readSubscriberPosition(p.logger, p.config.DB)
	subscriberPositions := make(chan pubsub.SubscriberPosition)
	pubsub, err := pubsub.New(pubsub.Config{
		ServerID:   p.config.UUID,
		Client:     p.config.Elasticsearch,
		DataStream: pubsub.DataStreamConfig(p.config.SampledTracesDataStream),
		Logger:     p.logger,

		// Issue pubsub subscriber search requests at twice the frequency
		// of publishing, so each server observes each other's sampled
		// trace IDs soon after they are published.
		SearchInterval: p.config.FlushInterval / 2,
		FlushInterval:  bulkIndexerFlushInterval,
	})
	if err != nil {
		return err
	}

	remoteSampledTraceIDs := make(chan string)
	localSampledTraceIDs := make(chan string)
	publishSampledTraceIDs := make(chan string)
	gracefulContext, cancelGracefulContext := context.WithCancel(context.Background())
	defer cancelGracefulContext()
	var g errgroup.Group
	g.Go(func() error {
		// Write subscriber position to a file on disk, to support resuming
		// on apm-server restart without reprocessing all indices. We trigger
		// the graceful shutdown from this goroutine to ensure we do not
		// write any subscriber positions after Stop is called, and risk
		// having a new subscriber miss events.
		for {
			select {
			case <-p.stopping:
				time.AfterFunc(shutdownGracePeriod, cancelGracefulContext)
				return context.Canceled
			case pos := <-subscriberPositions:
				if err := writeSubscriberPosition(p.config.DB, pos); err != nil {
					p.rateLimitedLogger.With(logp.Error(err)).With(logp.Reflect("position", pos)).Warnf(
						"failed to write subscriber position: %s", err,
					)
				}
			}
		}
	})
	g.Go(func() error {
		return p.config.DB.Run(p.stopping, p.config.TTL)
	})
	g.Go(func() error {
		// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
		// Stop is called. But it is possible that both old and new subscriber goroutines
		// run concurrently, before the old one eventually receives the Stop call.
		// The next subscriber will pick up from the previous position.
		defer close(remoteSampledTraceIDs)
		defer close(subscriberPositions)
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
		go func() {
			defer cancel()
			select {
			case <-p.stopping:
			case <-p.stopped:
			}

		}()
		return pubsub.SubscribeSampledTraceIDs(
			ctx, initialSubscriberPosition, remoteSampledTraceIDs, subscriberPositions,
		)
	})
	g.Go(func() error {
		// Publish locally sampled trace IDs to Elasticsearch. This is cancelled when
		// publishSampledTraceIDs is closed, after the final reservoir flush.
		return pubsub.PublishSampledTraceIDs(gracefulContext, publishSampledTraceIDs)
	})
	g.Go(func() error {
		ticker := time.NewTicker(p.config.FlushInterval)
		defer ticker.Stop()
		var traceIDs []string

		// Close publishSampledTraceIDs and localSampledTraceIDs after returning,
		// which implies that either all decisions have been published or the grace
		// period has elapsed. This will unblock the PublishSampledTraceIDs call above,
		// and the event indexing goroutine below.
		defer close(publishSampledTraceIDs)
		defer close(localSampledTraceIDs)

		publishDecisions := func() error {
			p.logger.Debug("finalizing local sampling reservoirs")
			traceIDs = p.groups.finalizeSampledTraces(traceIDs)
			if len(traceIDs) == 0 {
				return nil
			}
			var g errgroup.Group
			g.Go(func() error { return sendTraceIDs(gracefulContext, publishSampledTraceIDs, traceIDs) })
			g.Go(func() error { return sendTraceIDs(gracefulContext, localSampledTraceIDs, traceIDs) })
			if err := g.Wait(); err != nil {
				return err
			}
			traceIDs = traceIDs[:0]
			return nil
		}

		for {
			select {
			case <-p.stopping:
				return publishDecisions()
			case <-ticker.C:
				if err := publishDecisions(); err != nil {
					return err
				}
			}
		}
	})
	g.Go(func() error {
		var events modelpb.Batch
		// TODO(axw) pace the publishing over the flush interval?
		// Alternatively we can rely on backpressure from the reporter,
		// removing the artificial one second timeout from publisher code
		// and just waiting as long as it takes here.
		remoteSampledTraceIDs := remoteSampledTraceIDs
		localSampledTraceIDs := localSampledTraceIDs
		for {
			if remoteSampledTraceIDs == nil && localSampledTraceIDs == nil {
				// The pubsub subscriber and reservoir finalizer have
				// both stopped, so there's nothing else to do.
				return nil
			}
			var remoteDecision bool
			var traceID string
			var ok bool
			select {
			case <-gracefulContext.Done():
				return gracefulContext.Err()
			case traceID, ok = <-remoteSampledTraceIDs:
				if !ok {
					remoteSampledTraceIDs = nil
					continue
				}
				p.logger.Debug("received remotely sampled trace ID")
				remoteDecision = true
			case traceID, ok = <-localSampledTraceIDs:
				if !ok {
					localSampledTraceIDs = nil
					continue
				}
			}

			if err := p.eventStore.WriteTraceSampled(traceID, true); err != nil {
				p.rateLimitedLogger.Warnf(
					"received error writing sampled trace: %s", err,
				)
			}

			events = events[:0]
			if err := p.eventStore.ReadTraceEvents(traceID, &events); err != nil {
				p.rateLimitedLogger.Warnf(
					"received error reading trace events: %s", err,
				)
				continue
			}
			if n := len(events); n > 0 {
				p.logger.Debugf("reporting %d events", n)
				if remoteDecision {
					// Remote decisions may be received multiple times,
					// e.g. if this server restarts and resubscribes to
					// remote sampling decisions before they have been
					// deleted. We delete events from local storage so
					// we don't publish duplicates; delivery is therefore
					// at-most-once, not guaranteed.
					//
					// TODO(carsonip): pebble supports range deletes and may be better than
					// deleting events separately, but as we do not use transactions, it is
					// possible to race and delete something that is not read.
					for _, event := range events {
						switch event.Type() {
						case modelpb.TransactionEventType:
							if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Transaction.Id); err != nil {
								p.logger.With(logp.Error(err)).Warn("failed to delete transaction from local storage")
							}
						case modelpb.SpanEventType:
							if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Span.Id); err != nil {
								p.logger.With(logp.Error(err)).Warn("failed to delete span from local storage")
							}
						}
					}
				}
				p.eventMetrics.sampled.Add(gracefulContext, int64(len(events)))
				if err := p.config.BatchProcessor.ProcessBatch(gracefulContext, &events); err != nil {
					p.logger.With(logp.Error(err)).Warn("failed to report events")
				}

				for i := range events {
					events[i] = nil // not required but ensure that there is no ref to the freed event
				}
			}
		}
	})
	if err := g.Wait(); err != nil && err != context.Canceled {
		return err
	}
	return nil
}