func()

in x-pack/apm-server/sampling/processor.go [164:229]


func (p *Processor) processTransaction(event *modelpb.APMEvent) (report, stored bool, _ error) {
	if !event.Transaction.Sampled {
		// (Head-based) unsampled transactions are passed through
		// by the tail sampler.
		p.eventMetrics.headUnsampled.Add(context.Background(), 1)
		return true, false, nil
	}

	traceSampled, err := p.eventStore.IsTraceSampled(event.Trace.Id)
	switch err {
	case nil:
		// Tail-sampling decision has been made: report the transaction
		// if it was sampled.
		report := traceSampled
		if report {
			p.eventMetrics.sampled.Add(context.Background(), 1)
		}
		return report, false, nil
	case eventstorage.ErrNotFound:
		// Tail-sampling decision has not yet been made.
		break
	default:
		return false, false, err
	}

	if event.GetParentId() != "" {
		// Non-root transaction: write to local storage while we wait
		// for a sampling decision.
		return false, true, p.eventStore.WriteTraceEvent(
			event.Trace.Id, event.Transaction.Id, event,
		)
	}

	// Root transaction: apply reservoir sampling.
	//
	// TODO(axw) we should skip reservoir sampling when the matching
	// policy's sampling rate is 100%, immediately index the event
	// and record the trace sampling decision.
	reservoirSampled, err := p.groups.sampleTrace(event)
	if err == errTooManyTraceGroups {
		// Too many trace groups, drop the transaction.
		p.rateLimitedLogger.Warn(`
Tail-sampling service group limit reached, discarding trace events.
This is caused by having many unique service names while relying on
sampling policies without service name specified.
`[1:])
		return false, false, nil
	} else if err != nil {
		return false, false, err
	}

	if !reservoirSampled {
		// Write the non-sampling decision to storage to avoid further
		// writes for the trace ID, and then drop the transaction.
		//
		// This is a local optimisation only. To avoid creating network
		// traffic and load on Elasticsearch for uninteresting root
		// transactions, we do not propagate this to other APM Servers.
		return false, false, p.eventStore.WriteTraceSampled(event.Trace.Id, false)
	}

	// The root transaction was admitted to the sampling reservoir, so we
	// can proceed to write the transaction to storage; we may index it later,
	// after finalising the sampling decision.
	return false, true, p.eventStore.WriteTraceEvent(event.Trace.Id, event.Transaction.Id, event)
}