processor/tailsamplingprocessor/processor.go (514 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
import (
"context"
"fmt"
"math"
"runtime"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/idbatcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
)
// policy combines a sampling policy evaluator with the destinations to be
// used for that policy.
type policy struct {
// name used to identify this policy instance.
name string
// evaluator that decides if a trace is sampled or not by this policy instance.
evaluator sampling.PolicyEvaluator
// attribute to use in the telemetry to denote the policy.
attribute metric.MeasurementOption
}
// tailSamplingSpanProcessor handles the incoming trace data and uses the given sampling
// policy to sample traces.
type tailSamplingSpanProcessor struct {
ctx context.Context
set processor.Settings
telemetry *metadata.TelemetryBuilder
logger *zap.Logger
nextConsumer consumer.Traces
maxNumTraces uint64
policies []*policy
idToTrace sync.Map
policyTicker timeutils.TTicker
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
sampledIDCache cache.Cache[bool]
nonSampledIDCache cache.Cache[bool]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64
recordPolicy bool
setPolicyMux sync.Mutex
pendingPolicy []PolicyCfg
}
// spanAndScope a structure for holding information about span and its instrumentation scope.
// required for preserving the instrumentation library information while sampling.
// We use pointers there to fast find the span in the map.
type spanAndScope struct {
span *ptrace.Span
instrumentationScope *pcommon.InstrumentationScope
}
var (
attrSampledTrue = metric.WithAttributes(attribute.String("sampled", "true"))
attrSampledFalse = metric.WithAttributes(attribute.String("sampled", "false"))
decisionToAttribute = map[sampling.Decision]metric.MeasurementOption{
sampling.Sampled: attrSampledTrue,
sampling.NotSampled: attrSampledFalse,
sampling.InvertNotSampled: attrSampledFalse,
sampling.InvertSampled: attrSampledTrue,
}
)
type Option func(*tailSamplingSpanProcessor)
// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
// configuration.
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config) (processor.Traces, error) {
telemetrySettings := set.TelemetrySettings
telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings)
if err != nil {
return nil, err
}
nopCache := cache.NewNopDecisionCache[bool]()
sampledDecisions := nopCache
nonSampledDecisions := nopCache
if cfg.DecisionCache.SampledCacheSize > 0 {
sampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.SampledCacheSize)
if err != nil {
return nil, err
}
}
if cfg.DecisionCache.NonSampledCacheSize > 0 {
nonSampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.NonSampledCacheSize)
if err != nil {
return nil, err
}
}
tsp := &tailSamplingSpanProcessor{
ctx: ctx,
set: set,
telemetry: telemetry,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
nonSampledIDCache: nonSampledDecisions,
logger: telemetrySettings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
}
tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}
for _, opt := range cfg.Options {
opt(tsp)
}
if tsp.tickerFrequency == 0 {
tsp.tickerFrequency = time.Second
}
if tsp.policies == nil {
err := tsp.loadSamplingPolicy(cfg.PolicyCfgs)
if err != nil {
return nil, err
}
}
if tsp.decisionBatcher == nil {
// this will start a goroutine in the background, so we run it only if everything went
// well in creating the policies
numDecisionBatches := math.Max(1, cfg.DecisionWait.Seconds())
inBatcher, err := idbatcher.New(uint64(numDecisionBatches), cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
if err != nil {
return nil, err
}
tsp.decisionBatcher = inBatcher
}
return tsp, nil
}
// withDecisionBatcher sets the batcher used to batch trace IDs for policy evaluation.
func withDecisionBatcher(batcher idbatcher.Batcher) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.decisionBatcher = batcher
}
}
// withPolicies sets the sampling policies to be used by the processor.
func withPolicies(policies []*policy) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.policies = policies
}
}
// withTickerFrequency sets the frequency at which the processor will evaluate the sampling policies.
func withTickerFrequency(frequency time.Duration) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.tickerFrequency = frequency
}
}
// WithSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func WithSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.sampledIDCache = c
}
}
// WithNonSampledDecisionCache sets the cache which the processor uses to store recently non-sampled trace IDs.
func WithNonSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.nonSampledIDCache = c
}
}
func withRecordPolicy() Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.recordPolicy = true
}
}
func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case Composite:
return getNewCompositePolicy(settings, &cfg.CompositeCfg)
case And:
return getNewAndPolicy(settings, &cfg.AndCfg)
default:
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
}
}
func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedPolicyCfg) (sampling.PolicyEvaluator, error) {
settings.Logger = settings.Logger.With(zap.Any("policy", cfg.Type))
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(settings), nil
case Latency:
lfCfg := cfg.LatencyCfg
return sampling.NewLatency(settings, lfCfg.ThresholdMs, lfCfg.UpperThresholdmsMs), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
minValue := nafCfg.MinValue
maxValue := nafCfg.MaxValue
return sampling.NewNumericAttributeFilter(settings, nafCfg.Key, &minValue, &maxValue, nafCfg.InvertMatch), nil
case Probabilistic:
pCfg := cfg.ProbabilisticCfg
return sampling.NewProbabilisticSampler(settings, pCfg.HashSalt, pCfg.SamplingPercentage), nil
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(settings, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize, safCfg.InvertMatch), nil
case StatusCode:
scfCfg := cfg.StatusCodeCfg
return sampling.NewStatusCodeFilter(settings, scfCfg.StatusCodes)
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(settings, rlfCfg.SpansPerSecond), nil
case SpanCount:
spCfg := cfg.SpanCountCfg
return sampling.NewSpanCount(settings, spCfg.MinSpans, spCfg.MaxSpans), nil
case TraceState:
tsfCfg := cfg.TraceStateCfg
return sampling.NewTraceStateFilter(settings, tsfCfg.Key, tsfCfg.Values), nil
case BooleanAttribute:
bafCfg := cfg.BooleanAttributeCfg
return sampling.NewBooleanAttributeFilter(settings, bafCfg.Key, bafCfg.Value, bafCfg.InvertMatch), nil
case OTTLCondition:
ottlfCfg := cfg.OTTLConditionCfg
return sampling.NewOTTLConditionFilter(settings, ottlfCfg.SpanConditions, ottlfCfg.SpanEventConditions, ottlfCfg.ErrorMode)
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
}
type policyMetrics struct {
idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64
}
func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error {
telemetrySettings := tsp.set.TelemetrySettings
componentID := tsp.set.ID.Name()
cLen := len(cfgs)
policies := make([]*policy, 0, cLen)
policyNames := make(map[string]struct{}, cLen)
for _, cfg := range cfgs {
if cfg.Name == "" {
return fmt.Errorf("policy name cannot be empty")
}
if _, exists := policyNames[cfg.Name]; exists {
return fmt.Errorf("duplicate policy name %q", cfg.Name)
}
policyNames[cfg.Name] = struct{}{}
eval, err := getPolicyEvaluator(telemetrySettings, &cfg)
if err != nil {
return fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err)
}
uniquePolicyName := cfg.Name
if componentID != "" {
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, cfg.Name)
}
policies = append(policies, &policy{
name: cfg.Name,
evaluator: eval,
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
})
}
tsp.policies = policies
tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(policies)))
return nil
}
func (tsp *tailSamplingSpanProcessor) SetSamplingPolicy(cfgs []PolicyCfg) {
tsp.logger.Debug("Setting pending sampling policy", zap.Int("pending.len", len(cfgs)))
tsp.setPolicyMux.Lock()
defer tsp.setPolicyMux.Unlock()
tsp.pendingPolicy = cfgs
}
func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
tsp.setPolicyMux.Lock()
defer tsp.setPolicyMux.Unlock()
// Nothing pending, do nothing.
pLen := len(tsp.pendingPolicy)
if pLen == 0 {
return
}
tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen))
err := tsp.loadSamplingPolicy(tsp.pendingPolicy)
// Empty pending regardless of error. If policy is invalid, it will fail on
// every tick, no need to do extra work and flood the log with errors.
tsp.pendingPolicy = nil
if err != nil {
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
tsp.logger.Debug("Continuing to use the previously loaded sampling policy")
}
}
func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
tsp.logger.Debug("Sampling Policy Evaluation ticked")
tsp.loadPendingSamplingPolicy()
ctx := context.Background()
metrics := policyMetrics{}
startTime := time.Now()
batch, _ := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch()
batchLen := len(batch)
for _, id := range batch {
d, ok := tsp.idToTrace.Load(id)
if !ok {
metrics.idNotFoundOnMapCount++
continue
}
trace := d.(*sampling.TraceData)
trace.DecisionTime = time.Now()
decision := tsp.makeDecision(id, trace, &metrics)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Millisecond))
tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision])
// Sampled or not, remove the batches
trace.Lock()
allSpans := trace.ReceivedBatches
trace.FinalDecision = decision
trace.ReceivedBatches = ptrace.NewTraces()
trace.Unlock()
switch decision {
case sampling.Sampled:
tsp.releaseSampledTrace(ctx, id, allSpans)
case sampling.NotSampled:
tsp.releaseNotSampledTrace(id)
}
}
tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load()))
tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
tsp.logger.Debug("Sampling policy evaluation completed",
zap.Int("batch.len", batchLen),
zap.Int64("sampled", metrics.decisionSampled),
zap.Int64("notSampled", metrics.decisionNotSampled),
zap.Int64("droppedPriorToEvaluation", metrics.idNotFoundOnMapCount),
zap.Int64("policyEvaluationErrors", metrics.evaluateErrorCount),
)
}
func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision {
finalDecision := sampling.NotSampled
samplingDecisions := map[sampling.Decision]*policy{
sampling.Error: nil,
sampling.Sampled: nil,
sampling.NotSampled: nil,
sampling.InvertSampled: nil,
sampling.InvertNotSampled: nil,
}
ctx := context.Background()
startTime := time.Now()
// Check all policies before making a final decision.
for _, p := range tsp.policies {
decision, err := p.evaluator.Evaluate(ctx, id, trace)
latency := time.Since(startTime)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(latency/time.Microsecond), p.attribute)
if err != nil {
if samplingDecisions[sampling.Error] == nil {
samplingDecisions[sampling.Error] = p
}
metrics.evaluateErrorCount++
tsp.logger.Debug("Sampling policy error", zap.Error(err))
continue
}
tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision])
if telemetry.IsMetricStatCountSpansSampledEnabled() {
tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
}
// We associate the first policy with the sampling decision to understand what policy sampled a span
if samplingDecisions[decision] == nil {
samplingDecisions[decision] = p
}
}
var sampledPolicy *policy
// InvertNotSampled takes precedence over any other decision
switch {
case samplingDecisions[sampling.InvertNotSampled] != nil:
finalDecision = sampling.NotSampled
case samplingDecisions[sampling.Sampled] != nil:
finalDecision = sampling.Sampled
sampledPolicy = samplingDecisions[sampling.Sampled]
case samplingDecisions[sampling.InvertSampled] != nil && samplingDecisions[sampling.NotSampled] == nil:
finalDecision = sampling.Sampled
sampledPolicy = samplingDecisions[sampling.InvertSampled]
}
if tsp.recordPolicy && sampledPolicy != nil {
sampling.SetAttrOnScopeSpans(trace, "tailsampling.policy", sampledPolicy.name)
}
switch finalDecision {
case sampling.Sampled:
metrics.decisionSampled++
case sampling.NotSampled:
metrics.decisionNotSampled++
}
return finalDecision
}
// ConsumeTraces is required by the processor.Traces interface.
func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
resourceSpans := td.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
tsp.processTraces(resourceSpans.At(i))
}
return nil
}
func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceID][]spanAndScope {
idToSpans := make(map[pcommon.TraceID][]spanAndScope)
ilss := resourceSpans.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
scope := ilss.At(j)
spans := scope.Spans()
is := scope.Scope()
spansLen := spans.Len()
for k := 0; k < spansLen; k++ {
span := spans.At(k)
key := span.TraceID()
idToSpans[key] = append(idToSpans[key], spanAndScope{
span: &span,
instrumentationScope: &is,
})
}
}
return idToSpans
}
func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.ResourceSpans) {
currTime := time.Now()
// Group spans per their traceId to minimize contention on idToTrace
idToSpansAndScope := tsp.groupSpansByTraceKey(resourceSpans)
var newTraceIDs int64
for id, spans := range idToSpansAndScope {
// If the trace ID is in the sampled cache, short circuit the decision
if _, ok := tsp.sampledIDCache.Get(id); ok {
tsp.logger.Debug("Trace ID is in the sampled cache", zap.Stringer("id", id))
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
metric.WithAttributeSet(attribute.NewSet())
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.
Add(tsp.ctx, int64(len(spans)), attrSampledTrue)
continue
}
// If the trace ID is in the non-sampled cache, short circuit the decision
if _, ok := tsp.nonSampledIDCache.Get(id); ok {
tsp.logger.Debug("Trace ID is in the non-sampled cache", zap.Stringer("id", id))
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.
Add(tsp.ctx, int64(len(spans)), attrSampledFalse)
continue
}
lenSpans := int64(len(spans))
d, loaded := tsp.idToTrace.Load(id)
if !loaded {
spanCount := &atomic.Int64{}
spanCount.Store(lenSpans)
td := &sampling.TraceData{
ArrivalTime: currTime,
SpanCount: spanCount,
ReceivedBatches: ptrace.NewTraces(),
}
if d, loaded = tsp.idToTrace.LoadOrStore(id, td); !loaded {
newTraceIDs++
tsp.decisionBatcher.AddToCurrentBatch(id)
tsp.numTracesOnMap.Add(1)
postDeletion := false
for !postDeletion {
select {
case tsp.deleteChan <- id:
postDeletion = true
default:
traceKeyToDrop := <-tsp.deleteChan
tsp.dropTrace(traceKeyToDrop, currTime)
}
}
}
}
actualData := d.(*sampling.TraceData)
if loaded {
actualData.SpanCount.Add(lenSpans)
}
actualData.Lock()
finalDecision := actualData.FinalDecision
if finalDecision == sampling.Unspecified {
// If the final decision hasn't been made, add the new spans under the lock.
appendToTraces(actualData.ReceivedBatches, resourceSpans, spans)
actualData.Unlock()
continue
}
actualData.Unlock()
switch finalDecision {
case sampling.Sampled:
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
case sampling.NotSampled:
tsp.releaseNotSampledTrace(id)
default:
tsp.logger.Warn("Unexpected sampling decision", zap.Int("decision", int(finalDecision)))
}
if !actualData.DecisionTime.IsZero() {
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
}
}
tsp.telemetry.ProcessorTailSamplingNewTraceIDReceived.Add(tsp.ctx, newTraceIDs)
}
func (tsp *tailSamplingSpanProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// Start is invoked during service startup.
func (tsp *tailSamplingSpanProcessor) Start(context.Context, component.Host) error {
tsp.policyTicker.Start(tsp.tickerFrequency)
return nil
}
// Shutdown is invoked during service shutdown.
func (tsp *tailSamplingSpanProcessor) Shutdown(context.Context) error {
tsp.decisionBatcher.Stop()
tsp.policyTicker.Stop()
return nil
}
func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletionTime time.Time) {
var trace *sampling.TraceData
if d, ok := tsp.idToTrace.Load(traceID); ok {
trace = d.(*sampling.TraceData)
tsp.idToTrace.Delete(traceID)
// Subtract one from numTracesOnMap per https://godoc.org/sync/atomic#AddUint64
tsp.numTracesOnMap.Add(^uint64(0))
}
if trace == nil {
tsp.logger.Debug("Attempt to delete trace ID not on table", zap.Stringer("id", traceID))
return
}
tsp.telemetry.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))
}
// releaseSampledTrace sends the trace data to the next consumer. It
// additionally adds the trace ID to the cache of sampled trace IDs. If the
// trace ID is cached, it deletes the spans from the internal map.
func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td ptrace.Traces) {
tsp.sampledIDCache.Put(id, true)
if err := tsp.nextConsumer.ConsumeTraces(ctx, td); err != nil {
tsp.logger.Warn(
"Error sending spans to destination",
zap.Error(err))
}
_, ok := tsp.sampledIDCache.Get(id)
if ok {
tsp.dropTrace(id, time.Now())
}
}
// releaseNotSampledTrace adds the trace ID to the cache of not sampled trace
// IDs. If the trace ID is cached, it deletes the spans from the internal map.
func (tsp *tailSamplingSpanProcessor) releaseNotSampledTrace(id pcommon.TraceID) {
tsp.nonSampledIDCache.Put(id, true)
_, ok := tsp.nonSampledIDCache.Get(id)
if ok {
tsp.dropTrace(id, time.Now())
}
}
func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) {
rs := dest.ResourceSpans().AppendEmpty()
rss.Resource().CopyTo(rs.Resource())
scopePointerToNewScope := make(map[*pcommon.InstrumentationScope]*ptrace.ScopeSpans)
for _, spanAndScope := range spanAndScopes {
// If the scope of the spanAndScope is not in the map, add it to the map and the destination.
if scope, ok := scopePointerToNewScope[spanAndScope.instrumentationScope]; !ok {
is := rs.ScopeSpans().AppendEmpty()
spanAndScope.instrumentationScope.CopyTo(is.Scope())
scopePointerToNewScope[spanAndScope.instrumentationScope] = &is
sp := is.Spans().AppendEmpty()
spanAndScope.span.CopyTo(sp)
} else {
sp := scope.Spans().AppendEmpty()
spanAndScope.span.CopyTo(sp)
}
}
}