in aws_xray_sdk/core/sampling/connector.py [0:0]
def fetch_sampling_target(self, rules):
"""
Report the current statistics of sampling rules and
get back the new assgiend quota/TTL froom the X-Ray service.
The call is proxied and signed via X-Ray Daemon.
"""
now = int(time.time())
report_docs = self._generate_reporting_docs(rules, now)
resp = self._xray_client.get_sampling_targets(
SamplingStatisticsDocuments=report_docs
)
new_docs = resp['SamplingTargetDocuments']
targets_mapping = {}
for doc in new_docs:
TTL = self._dt_to_epoch(doc['ReservoirQuotaTTL']) if doc.get('ReservoirQuotaTTL', None) else None
target = {
'rate': doc['FixedRate'],
'quota': doc.get('ReservoirQuota', None),
'TTL': TTL,
'interval': doc.get('Interval', None),
}
targets_mapping[doc['RuleName']] = target
return targets_mapping, self._dt_to_epoch(resp['LastRuleModification'])