skywalking/sampling/sampling_service.py (60 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from threading import Lock, Thread import time from typing import Set from skywalking import config from skywalking.log import logger import asyncio class SamplingServiceBase: def __init__(self): self.sampling_factor = 0 @property def reset_sampling_factor_interval(self) -> int: return 3 @property def can_sampling(self): return self.sampling_factor < config.sample_n_per_3_secs def _try_sampling(self) -> bool: if self.can_sampling: self._incr_sampling_factor() return True logger.debug('%s try_sampling return false, sampling_factor: %d', self.__class__.__name__, self.sampling_factor) return False def _set_sampling_factor(self, val: int): logger.debug('Set sampling factor to %d', val) self.sampling_factor = val def _incr_sampling_factor(self): self.sampling_factor += 1 class SamplingService(Thread, SamplingServiceBase): def __init__(self): Thread.__init__(self, name='SamplingService', daemon=True) SamplingServiceBase.__init__(self) self.lock = Lock() def run(self): logger.debug('Started sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs) while True: self.reset_sampling_factor() time.sleep(self.reset_sampling_factor_interval) def try_sampling(self) -> bool: with self.lock: return super()._try_sampling() def force_sampled(self) -> None: with self.lock: super()._incr_sampling_factor() def reset_sampling_factor(self) -> None: with self.lock: super()._set_sampling_factor(0) class SamplingServiceAsync(SamplingServiceBase): def __init__(self): super().__init__() self.strong_ref_set: Set[asyncio.Task[None]] = set() async def start(self): logger.debug('Started async sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs) while True: await self.reset_sampling_factor() await asyncio.sleep(self.reset_sampling_factor_interval) def try_sampling(self) -> bool: return super()._try_sampling() def force_sampled(self): super()._incr_sampling_factor() async def reset_sampling_factor(self): super()._set_sampling_factor(0)