skywalking/profile/profile_service.py (122 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from concurrent.futures import ThreadPoolExecutor from queue import Queue from threading import Timer, RLock, Lock from typing import Tuple from skywalking.agent import agent from skywalking.loggings import logger, logger_debug_enabled from skywalking.profile.profile_constants import ProfileConstants from skywalking.profile.profile_context import ProfileTaskExecutionContext from skywalking.profile.profile_status import ProfileStatusReference from skywalking.profile.profile_task import ProfileTask from skywalking.trace.context import SpanContext from skywalking.utils.atomic_ref import AtomicRef from skywalking.utils.time import current_milli_time class Scheduler: @staticmethod def schedule(milliseconds, func, *args, **kwargs): seconds = milliseconds / 1000 if seconds < 0: seconds = 0 t = Timer(seconds, func, *args, **kwargs) t.daemon = True t.start() class ProfileTaskExecutionService: MINUTE_TO_MILLIS = 60000 def __init__(self): # Queue is thread safe self._profile_task_list = Queue() # type: Queue # queue_lock for making sure complex operation of this profile_task_list is thread safe self.queue_lock = Lock() self._last_command_create_time = -1 # type: int # single thread executor self.profile_executor = ThreadPoolExecutor(thread_name_prefix='profile-executor', max_workers=1) self.task_execution_context = AtomicRef(None) self.profile_task_scheduler = Scheduler() # rlock for process_profile_task and stop_current_profile_task self._rlock = RLock() def remove_from_profile_task_list(self, task: ProfileTask) -> bool: """ Remove a task from profile_task_list in a thread safe state """ with self.queue_lock: item_left = [] result = False while not self._profile_task_list.empty(): item = self._profile_task_list.get() if item == task: result = True # not put in item_list for removing it continue item_left.append(item) for item in item_left: self._profile_task_list.put(item) return result def get_last_command_create_time(self) -> int: return self._last_command_create_time def add_profile_task(self, task: ProfileTask): # update last command create time, which will be used in command query if task.create_time > self._last_command_create_time: self._last_command_create_time = task.create_time # check profile task object success, error_reason = self._check_profile_task(task) if not success: logger.warning('check command error, cannot process this profile task. reason: %s', error_reason) return # add task to list self._profile_task_list.put(task) delay_millis = task.start_time - current_milli_time() # schedule to start task self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task]) def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference: execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext if execution_context is None: return ProfileStatusReference.create_with_none() return execution_context.attempt_profiling(context, segment_id, first_span_opname) def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str): """ Re-check current trace need profiling, in case that third-party plugins change the operation name. """ execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext if execution_context is None: return execution_context.profiling_recheck(trace_context, segment_id, first_span_opname) # using reentrant lock for process_profile_task and stop_current_profile_task, # to make sure thread safe. def process_profile_task(self, task: ProfileTask): with self._rlock: # make sure prev profile task already stopped self.stop_current_profile_task(self.task_execution_context.get()) # make stop task schedule and task context current_context = ProfileTaskExecutionContext(task) self.task_execution_context.set(current_context) # start profiling this task current_context.start_profiling() if logger_debug_enabled: logger.debug('profile task [%s] for endpoint [%s] started', task.task_id, task.first_span_op_name) millis = task.duration * self.MINUTE_TO_MILLIS self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context]) def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext): with self._rlock: # need_stop is None or task_execution_context is not need_stop context if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None): return need_stop.stop_profiling() if logger_debug_enabled: logger.debug('profile task [%s] for endpoint [%s] stopped', need_stop.task.task_id, need_stop.task.first_span_op_name) self.remove_from_profile_task_list(need_stop.task) # notify profiling task has finished agent.notify_profile_finish(need_stop.task) def _check_profile_task(self, task: ProfileTask) -> Tuple[bool, str]: try: # endpoint name if len(task.first_span_op_name) == 0: return (False, f'endpoint name [{task.first_span_op_name}] error, ' f'should be str and not empty') # duration if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE: return (False, f'monitor duration must be greater ' f'than {ProfileConstants.TASK_DURATION_MIN_MINUTE} minutes') if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE: return (False, f'monitor duration must be less ' f'than {ProfileConstants.TASK_DURATION_MAX_MINUTE} minutes') # min duration threshold if task.min_duration_threshold < 0: return False, 'min duration threshold must be greater than or equals zero' # dump period if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS: return (False, f'dump period must be greater than or equals to ' f'{ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS} milliseconds') # max sampling count if task.max_sampling_count <= 0: return False, 'max sampling count must be greater than zero' if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT: return (False, f'max sampling count must be less than ' f'{ProfileConstants.TASK_MAX_SAMPLING_COUNT}') # check task queue task_finish_time = self._cal_profile_task_finish_time(task) # lock the self._profile_task_list.queue when check the item in it, avoid concurrency errors with self._profile_task_list.mutex: for profile_task in self._profile_task_list.queue: # type: ProfileTask # if the end time of the task to be added is during the execution of any data, means is a error data if task.start_time <= task_finish_time <= self._cal_profile_task_finish_time(profile_task): return (False, f'there already have processing task in time range, ' f'could not add a new task again. processing task ' f'monitor endpoint name: {profile_task.first_span_op_name}') return True, '' except TypeError: return False, 'ProfileTask attributes have a type error' def _cal_profile_task_finish_time(self, task: ProfileTask) -> int: return task.start_time + task.duration * self.MINUTE_TO_MILLIS