in x-pack/apm-server/sampling/processor.go [164:229]
func (p *Processor) processTransaction(event *modelpb.APMEvent) (report, stored bool, _ error) {
if !event.Transaction.Sampled {
// (Head-based) unsampled transactions are passed through
// by the tail sampler.
p.eventMetrics.headUnsampled.Add(context.Background(), 1)
return true, false, nil
}
traceSampled, err := p.eventStore.IsTraceSampled(event.Trace.Id)
switch err {
case nil:
// Tail-sampling decision has been made: report the transaction
// if it was sampled.
report := traceSampled
if report {
p.eventMetrics.sampled.Add(context.Background(), 1)
}
return report, false, nil
case eventstorage.ErrNotFound:
// Tail-sampling decision has not yet been made.
break
default:
return false, false, err
}
if event.GetParentId() != "" {
// Non-root transaction: write to local storage while we wait
// for a sampling decision.
return false, true, p.eventStore.WriteTraceEvent(
event.Trace.Id, event.Transaction.Id, event,
)
}
// Root transaction: apply reservoir sampling.
//
// TODO(axw) we should skip reservoir sampling when the matching
// policy's sampling rate is 100%, immediately index the event
// and record the trace sampling decision.
reservoirSampled, err := p.groups.sampleTrace(event)
if err == errTooManyTraceGroups {
// Too many trace groups, drop the transaction.
p.rateLimitedLogger.Warn(`
Tail-sampling service group limit reached, discarding trace events.
This is caused by having many unique service names while relying on
sampling policies without service name specified.
`[1:])
return false, false, nil
} else if err != nil {
return false, false, err
}
if !reservoirSampled {
// Write the non-sampling decision to storage to avoid further
// writes for the trace ID, and then drop the transaction.
//
// This is a local optimisation only. To avoid creating network
// traffic and load on Elasticsearch for uninteresting root
// transactions, we do not propagate this to other APM Servers.
return false, false, p.eventStore.WriteTraceSampled(event.Trace.Id, false)
}
// The root transaction was admitted to the sampling reservoir, so we
// can proceed to write the transaction to storage; we may index it later,
// after finalising the sampling decision.
return false, true, p.eventStore.WriteTraceEvent(event.Trace.Id, event.Transaction.Id, event)
}