in x-pack/apm-server/main.go [103:155]
func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, error) {
tailSamplingConfig := args.Config.Sampling.Tail
es, err := args.NewElasticsearchClient(tailSamplingConfig.ESConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Elasticsearch client for tail-sampling: %w", err)
}
storageDir := paths.Resolve(paths.Data, tailSamplingStorageDir)
db, err := getDB(storageDir, args.MeterProvider, args.Logger)
if err != nil {
return nil, fmt.Errorf("failed to get tail-sampling database: %w", err)
}
policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
for i, in := range tailSamplingConfig.Policies {
policies[i] = sampling.Policy{
PolicyCriteria: sampling.PolicyCriteria{
ServiceName: in.Service.Name,
ServiceEnvironment: in.Service.Environment,
TraceName: in.Trace.Name,
TraceOutcome: in.Trace.Outcome,
},
SampleRate: in.SampleRate,
}
}
return sampling.NewProcessor(sampling.Config{
BatchProcessor: args.BatchProcessor,
MeterProvider: args.MeterProvider,
LocalSamplingConfig: sampling.LocalSamplingConfig{
FlushInterval: tailSamplingConfig.Interval,
MaxDynamicServices: 1000,
Policies: policies,
IngestRateDecayFactor: tailSamplingConfig.IngestRateDecayFactor,
},
RemoteSamplingConfig: sampling.RemoteSamplingConfig{
CompressionLevel: tailSamplingConfig.ESConfig.CompressionLevel,
Elasticsearch: es,
SampledTracesDataStream: sampling.DataStreamConfig{
Type: "traces",
Dataset: "apm.sampled",
Namespace: args.Namespace,
},
UUID: samplerUUID.String(),
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed, tailSamplingConfig.DiskUsageThreshold),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
}, args.Logger)
}