aws_xray_sdk/core/sampling/sampler.py (81 lines of code) (raw):
import logging
from random import Random
import time
import threading
from .local.sampler import LocalSampler
from .rule_cache import RuleCache
from .rule_poller import RulePoller
from .target_poller import TargetPoller
from .connector import ServiceConnector
from .reservoir import ReservoirDecision
from aws_xray_sdk import global_sdk_config
log = logging.getLogger(__name__)
class DefaultSampler:
"""Making sampling decisions based on centralized sampling rules defined
by X-Ray control plane APIs. It will fall back to local sampler if
centralized sampling rules are not available.
"""
def __init__(self):
self._local_sampler = LocalSampler()
self._cache = RuleCache()
self._connector = ServiceConnector()
self._rule_poller = RulePoller(self._cache, self._connector)
self._target_poller = TargetPoller(self._cache,
self._rule_poller, self._connector)
self._xray_client = None
self._random = Random()
self._started = False
self._origin = None
self._lock = threading.Lock()
def start(self):
"""
Start rule poller and target poller once X-Ray daemon address
and context manager is in place.
"""
if not global_sdk_config.sdk_enabled():
return
with self._lock:
if not self._started:
self._rule_poller.start()
self._target_poller.start()
self._started = True
def should_trace(self, sampling_req=None):
"""
Return the matched sampling rule name if the sampler finds one
and decide to sample. If no sampling rule matched, it falls back
to the local sampler's ``should_trace`` implementation.
All optional arguments are extracted from incoming requests by
X-Ray middleware to perform path based sampling.
"""
if not global_sdk_config.sdk_enabled():
return False
if not self._started:
self.start() # only front-end that actually uses the sampler spawns poller threads
now = int(time.time())
if sampling_req and not sampling_req.get('service_type', None):
sampling_req['service_type'] = self._origin
elif sampling_req is None:
sampling_req = {'service_type': self._origin}
matched_rule = self._cache.get_matched_rule(sampling_req, now)
if matched_rule:
log.debug('Rule %s is selected to make a sampling decision.', matched_rule.name)
return self._process_matched_rule(matched_rule, now)
else:
log.info('No effective centralized sampling rule match. Fallback to local rules.')
return self._local_sampler.should_trace(sampling_req)
def load_local_rules(self, rules):
"""
Load specified local rules to local fallback sampler.
"""
self._local_sampler.load_local_rules(rules)
def load_settings(self, daemon_config, context, origin=None):
"""
The pollers have dependency on the context manager
of the X-Ray recorder. They will respect the customer
specified xray client to poll sampling rules/targets.
Otherwise they falls back to use the same X-Ray daemon
as the emitter.
"""
self._connector.setup_xray_client(ip=daemon_config.tcp_ip,
port=daemon_config.tcp_port,
client=self.xray_client)
self._connector.context = context
self._origin = origin
def _process_matched_rule(self, rule, now):
# As long as a rule is matched we increment request counter.
rule.increment_request_count()
reservoir = rule.reservoir
sample = True
# We check if we can borrow or take from reservoir first.
decision = reservoir.borrow_or_take(now, rule.can_borrow)
if(decision == ReservoirDecision.BORROW):
rule.increment_borrow_count()
elif (decision == ReservoirDecision.TAKE):
rule.increment_sampled_count()
# Otherwise we compute based on fixed rate of this sampling rule.
elif (self._random.random() <= rule.rate):
rule.increment_sampled_count()
else:
sample = False
if sample:
return rule.name
else:
return False
@property
def xray_client(self):
return self._xray_client
@xray_client.setter
def xray_client(self, v):
self._xray_client = v