func newTailSamplingProcessor()

in x-pack/apm-server/main.go [103:155]


func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, error) {
	tailSamplingConfig := args.Config.Sampling.Tail
	es, err := args.NewElasticsearchClient(tailSamplingConfig.ESConfig)
	if err != nil {
		return nil, fmt.Errorf("failed to create Elasticsearch client for tail-sampling: %w", err)
	}

	storageDir := paths.Resolve(paths.Data, tailSamplingStorageDir)
	db, err := getDB(storageDir, args.MeterProvider, args.Logger)
	if err != nil {
		return nil, fmt.Errorf("failed to get tail-sampling database: %w", err)
	}

	policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
	for i, in := range tailSamplingConfig.Policies {
		policies[i] = sampling.Policy{
			PolicyCriteria: sampling.PolicyCriteria{
				ServiceName:        in.Service.Name,
				ServiceEnvironment: in.Service.Environment,
				TraceName:          in.Trace.Name,
				TraceOutcome:       in.Trace.Outcome,
			},
			SampleRate: in.SampleRate,
		}
	}

	return sampling.NewProcessor(sampling.Config{
		BatchProcessor: args.BatchProcessor,
		MeterProvider:  args.MeterProvider,
		LocalSamplingConfig: sampling.LocalSamplingConfig{
			FlushInterval:         tailSamplingConfig.Interval,
			MaxDynamicServices:    1000,
			Policies:              policies,
			IngestRateDecayFactor: tailSamplingConfig.IngestRateDecayFactor,
		},
		RemoteSamplingConfig: sampling.RemoteSamplingConfig{
			CompressionLevel: tailSamplingConfig.ESConfig.CompressionLevel,
			Elasticsearch:    es,
			SampledTracesDataStream: sampling.DataStreamConfig{
				Type:      "traces",
				Dataset:   "apm.sampled",
				Namespace: args.Namespace,
			},
			UUID: samplerUUID.String(),
		},
		StorageConfig: sampling.StorageConfig{
			DB:                    db,
			Storage:               db.NewReadWriter(tailSamplingConfig.StorageLimitParsed, tailSamplingConfig.DiskUsageThreshold),
			TTL:                   tailSamplingConfig.TTL,
			DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
		},
	}, args.Logger)
}