skywalking/agent/__init__.py (434 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import asyncio import atexit import functools import os import sys from queue import Full, Queue from threading import Event, Thread from typing import TYPE_CHECKING, Optional from skywalking import config, loggings, meter, plugins, profile, sampling from skywalking.agent.protocol import Protocol, ProtocolAsync from skywalking.command import command_service, command_service_async from skywalking.loggings import logger from skywalking.profile.profile_task import ProfileTask from skywalking.profile.snapshot import TracingThreadSnapshot from skywalking.protocol.language_agent.Meter_pb2 import MeterData from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.utils.singleton import Singleton if TYPE_CHECKING: from skywalking.trace.context import Segment if config.agent_asyncio_enhancement: import uvloop uvloop.install() def report_with_backoff(reporter_name, init_wait): """ An exponential backoff for retrying reporters. """ def backoff_decorator(func): @functools.wraps(func) def backoff_wrapper(self, *args, **kwargs): wait = base = init_wait while not self._finished.is_set(): try: flag = func(self, *args, **kwargs) # for segment/log reporter, if the queue not empty(return True), we should keep reporter working # for other cases(return false or None), reset to base wait time on success wait = 0 if flag else base except Exception: # noqa wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum logger.exception(f'Exception in {reporter_name} service in pid {os.getpid()}, ' f'retry in {wait} seconds') self._finished.wait(wait) logger.info('finished reporter thread') return backoff_wrapper return backoff_decorator def report_with_backoff_async(reporter_name, init_wait): """ An exponential async backoff for retrying reporters. """ def backoff_decorator(func): @functools.wraps(func) async def backoff_wrapper(self, *args, **kwargs): wait = base = init_wait while not self._finished.is_set(): try: flag = await func(self, *args, **kwargs) # for segment/log reporter, if the queue not empty(return True), we should keep reporter working # for other cases(return false or None), reset to base wait time on success wait = 0 if flag else base except Exception: # noqa wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum logger.exception(f'Exception in {reporter_name} service in pid {os.getpid()}, ' f'retry in {wait} seconds') await asyncio.sleep(wait) logger.info('finished reporter coroutine') return backoff_wrapper return backoff_decorator class SkyWalkingAgent(Singleton): """ The main singleton class and entrypoint of SkyWalking Python Agent. Upon fork(), original instance rebuild everything (queues, threads, instrumentation) by calling the fork handlers in the class instance. """ __started: bool = False # shared by all instances def __init__(self): """ Protocol is one of gRPC, HTTP and Kafka that provides clients to reporters to communicate with OAP backend. """ self.started_pid = None self.__protocol: Optional[Protocol] = None self._finished: Optional[Event] = None def __bootstrap(self): # when forking, already instrumented modules must not be instrumented again # otherwise it will cause double instrumentation! (we should provide an un-instrument method) # Initialize queues for segment, log, meter and profiling snapshots self.__init_queues() if config.agent_protocol == 'grpc': from skywalking.agent.protocol.grpc import GrpcProtocol self.__protocol = GrpcProtocol() elif config.agent_protocol == 'http': from skywalking.agent.protocol.http import HttpProtocol self.__protocol = HttpProtocol() elif config.agent_protocol == 'kafka': from skywalking.agent.protocol.kafka import KafkaProtocol self.__protocol = KafkaProtocol() # Start reporter threads and register queues self.__init_threading() def __init_queues(self) -> None: """ This method initializes all the queues for the agent and reporters. """ self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size) self.__log_queue: Optional[Queue] = None self.__meter_queue: Optional[Queue] = None self.__snapshot_queue: Optional[Queue] = None if config.agent_meter_reporter_active: self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size) if config.agent_log_reporter_active: self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size) if config.agent_profile_active: self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size) def __init_threading(self) -> None: """ This method initializes all the threads for the agent and reporters. Upon os.fork(), callback will reinitialize threads and queues by calling this method Heartbeat thread is started by default. Segment reporter thread and segment queue is created by default. All other queues and threads depends on user configuration. """ self._finished = Event() __heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True) __heartbeat_thread.start() __segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True) __segment_report_thread.start() if config.agent_meter_reporter_active: __meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True) __meter_report_thread.start() if config.agent_pvm_meter_reporter_active: from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource from skywalking.meter.pvm.gc_data import GCDataSource from skywalking.meter.pvm.mem_usage import MEMUsageDataSource from skywalking.meter.pvm.thread_data import ThreadDataSource MEMUsageDataSource().register() CPUUsageDataSource().register() GCDataSource().register() ThreadDataSource().register() if config.agent_log_reporter_active: __log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True) __log_report_thread.start() if config.agent_profile_active: # Now only profiler receives commands from OAP __command_dispatch_thread = Thread(name='CommandDispatchThread', target=self.__command_dispatch, daemon=True) __command_dispatch_thread.start() __query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command, daemon=True) __query_profile_thread.start() __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=self.__send_profile_snapshot, daemon=True) __send_profile_thread.start() @staticmethod # for now def __fork_before() -> None: """ This handles explicit fork() calls. The child process will not have a running thread, so we need to revive all of them. The parent process will continue to run as normal. This does not affect pre-forking server support, which are handled separately. """ # possible deadlock would be introduced if some queue is in use when fork() is called and # therefore child process will inherit a locked queue. To avoid this and have side benefit # of a clean queue in child process (prevent duplicated reporting), we simply restart the agent and # reinitialize all queues and threads. logger.warning('SkyWalking Python agent fork support is currently experimental, ' 'please report issues if you encounter any.') @staticmethod # for now def __fork_after_in_parent() -> None: """ Something to do after fork() in parent process """ ... def __fork_after_in_child(self) -> None: """ Simply restart the agent after we detect a fork() call """ # This will be used by os.fork() called by application and also Gunicorn, not uWSGI # otherwise we assume a fork() happened, give it a new service instance name logger.info('New process detected, re-initializing SkyWalking Python agent') # Note: this is for experimental change, default config should never reach here # Fork support is controlled by config.agent_fork_support :default: False # Important: This does not impact pre-forking server support (uwsgi, gunicorn, etc...) # This is only for explicit long-running fork() calls. config.agent_instance_name = f'{config.agent_instance_name}-child({os.getpid()})' self.start() logger.info(f'Agent spawned as {config.agent_instance_name} for service {config.agent_name}.') def start(self) -> None: """ Start would be called by user or os.register_at_fork() callback Start will proceed if and only if the agent is not started in the current process. When os.fork(), the service instance should be changed to a new one by appending pid. """ loggings.init() if sys.version_info < (3, 7): # agent may or may not work for Python 3.6 and below # since 3.6 is EOL, we will not officially support it logger.warning('SkyWalking Python agent does not support Python 3.6 and below, ' 'please upgrade to Python 3.7 or above.') # Below is required for grpcio to work with fork() # https://github.com/grpc/grpc/blob/master/doc/fork_support.md if config.agent_protocol == 'grpc' and config.agent_experimental_fork_support: python_major_version: tuple = sys.version_info[:2] if python_major_version == (3, 7): logger.warning('gRPC fork support may cause hanging on Python 3.7 ' 'when used together with gRPC and subprocess lib' 'See: https://github.com/grpc/grpc/issues/18075.' 'Please consider upgrade to Python 3.8+, ' 'or use HTTP/Kafka protocol, or disable experimental fork support ' 'if your application did not start successfully.') os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' os.environ['GRPC_POLL_STRATEGY'] = 'poll' if not self.__started: # if not already started, start the agent config.finalize() # Must be finalized exactly once self.__started = True logger.info(f'SkyWalking sync agent instance {config.agent_instance_name} starting in pid-{os.getpid()}.') # Install log reporter core if config.agent_log_reporter_active: from skywalking import log log.install() # Here we install all other lib plugins on first time start (parent process) plugins.install() elif self.__started and os.getpid() == self.started_pid: # if already started, and this is the same process, raise an error raise RuntimeError('SkyWalking Python agent has already been started in this process, ' 'did you call start more than once in your code + sw-python CLI? ' 'If you already use sw-python CLI, you should remove the manual start(), vice versa.') # Else there's a new process (after fork()), we will restart the agent in the new process self.started_pid = os.getpid() flag = False try: from gevent import monkey flag = monkey.is_module_patched('socket') except ModuleNotFoundError: logger.debug("it was found that no gevent was used, if you don't use, please ignore.") if flag: import grpc.experimental.gevent as grpc_gevent grpc_gevent.init_gevent() if config.agent_profile_active: profile.init() if config.agent_meter_reporter_active: meter.init(force=True) # force re-init after fork() if config.sample_n_per_3_secs > 0: sampling.init(force=True) self.__bootstrap() # calls init_threading atexit.register(self.__fini) if config.agent_experimental_fork_support: if hasattr(os, 'register_at_fork'): os.register_at_fork(before=self.__fork_before, after_in_parent=self.__fork_after_in_parent, after_in_child=self.__fork_after_in_child) def __fini(self): """ This method is called when the agent is shutting down. Clean up all the queues and threads. """ self.__protocol.report_segment(self.__segment_queue, False) self.__segment_queue.join() if config.agent_log_reporter_active: self.__protocol.report_log(self.__log_queue, False) self.__log_queue.join() if config.agent_profile_active: self.__protocol.report_snapshot(self.__snapshot_queue, False) self.__snapshot_queue.join() if config.agent_meter_reporter_active: self.__protocol.report_meter(self.__meter_queue, False) self.__meter_queue.join() self._finished.set() def stop(self) -> None: """ Stops the agent and reset the started flag. """ atexit.unregister(self.__fini) self.__fini() self.__started = False @report_with_backoff(reporter_name='heartbeat', init_wait=config.agent_collector_heartbeat_period) def __heartbeat(self) -> None: self.__protocol.heartbeat() # segment/log init_wait is set to 0.02 to prevent threads from hogging the cpu too much # The value of 0.02(20 ms) is set to be consistent with the queue delay of the Java agent @report_with_backoff(reporter_name='segment', init_wait=0.02) def __report_segment(self) -> bool: """Returns True if the queue is not empty""" queue_not_empty_flag = not self.__segment_queue.empty() if queue_not_empty_flag: self.__protocol.report_segment(self.__segment_queue) return queue_not_empty_flag @report_with_backoff(reporter_name='log', init_wait=0.02) def __report_log(self) -> bool: """Returns True if the queue is not empty""" queue_not_empty_flag = not self.__log_queue.empty() if queue_not_empty_flag: self.__protocol.report_log(self.__log_queue) return queue_not_empty_flag @report_with_backoff(reporter_name='meter', init_wait=config.agent_meter_reporter_period) def __report_meter(self) -> None: if not self.__meter_queue.empty(): self.__protocol.report_meter(self.__meter_queue) @report_with_backoff(reporter_name='profile_snapshot', init_wait=0.5) def __send_profile_snapshot(self) -> None: if not self.__snapshot_queue.empty(): self.__protocol.report_snapshot(self.__snapshot_queue) @report_with_backoff(reporter_name='query_profile_command', init_wait=config.agent_collector_get_profile_task_interval) def __query_profile_command(self) -> None: self.__protocol.query_profile_commands() @staticmethod def __command_dispatch() -> None: # command dispatch will stuck when there are no commands command_service.dispatch() def is_segment_queue_full(self): return self.__segment_queue.full() def archive_segment(self, segment: 'Segment'): try: # unlike checking __queue.full() then inserting, this is atomic self.__segment_queue.put(segment, block=False) except Full: logger.warning('the queue is full, the segment will be abandoned') def archive_log(self, log_data: 'LogData'): try: self.__log_queue.put(log_data, block=False) except Full: logger.warning('the queue is full, the log will be abandoned') def archive_meter(self, meter_data: 'MeterData'): try: self.__meter_queue.put(meter_data, block=False) except Full: logger.warning('the queue is full, the meter will be abandoned') def add_profiling_snapshot(self, snapshot: TracingThreadSnapshot): try: self.__snapshot_queue.put(snapshot) except Full: logger.warning('the snapshot queue is full, the snapshot will be abandoned') def notify_profile_finish(self, task: ProfileTask): try: self.__protocol.notify_profile_task_finish(task) except Exception as e: logger.error(f'notify profile task finish to backend fail. {str(e)}') class SkyWalkingAgentAsync(Singleton): __started: bool = False # shared by all instances def __init__(self): """ ProtocolAsync is one of gRPC, HTTP and Kafka that provides async clients to reporters to communicate with OAP backend. """ self.started_pid = None self.__protocol: Optional[ProtocolAsync] = None self._finished: Optional[asyncio.Event] = None # Initialize asyncio queues for segment, log, meter and profiling snapshots self.__segment_queue: Optional[asyncio.Queue] = None self.__log_queue: Optional[asyncio.Queue] = None self.__meter_queue: Optional[asyncio.Queue] = None self.__snapshot_queue: Optional[asyncio.Queue] = None self.event_loop_thread: Optional[Thread] = None def __bootstrap(self): if config.agent_protocol == 'grpc': from skywalking.agent.protocol.grpc_aio import GrpcProtocolAsync self.__protocol = GrpcProtocolAsync() elif config.agent_protocol == 'http': from skywalking.agent.protocol.http_aio import HttpProtocolAsync self.__protocol = HttpProtocolAsync() elif config.agent_protocol == 'kafka': from skywalking.agent.protocol.kafka_aio import KafkaProtocolAsync self.__protocol = KafkaProtocolAsync() else: raise ValueError(f'Unsupported protocol: {config.agent_protocol}') logger.info(f'You are using {config.agent_protocol} protocol to communicate with OAP backend') # Start reporter's asyncio coroutines and register queues self.__init_coroutine() def __init_coroutine(self) -> None: """ This method initializes all asyncio coroutines for the agent and reporters. Heartbeat task is started by default. Segment reporter task and segment queue is created by default. All other queues and tasks depends on user configuration. """ self.background_coroutines = set() self.background_coroutines.add(self.__heartbeat()) self.background_coroutines.add(self.__report_segment()) if config.agent_meter_reporter_active: self.background_coroutines.add(self.__report_meter()) if config.agent_pvm_meter_reporter_active: from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource from skywalking.meter.pvm.gc_data import GCDataSource from skywalking.meter.pvm.mem_usage import MEMUsageDataSource from skywalking.meter.pvm.thread_data import ThreadDataSource MEMUsageDataSource().register() CPUUsageDataSource().register() GCDataSource().register() ThreadDataSource().register() if config.agent_log_reporter_active: self.background_coroutines.add(self.__report_log()) if config.agent_profile_active: self.background_coroutines.add(self.__command_dispatch()) self.background_coroutines.add(self.__query_profile_command()) self.background_coroutines.add(self.__send_profile_snapshot()) async def __start_event_loop_async(self) -> None: self.loop = asyncio.get_running_loop() # always get the current running loop first # asyncio Queue should be created after the creation of event loop self.__segment_queue = asyncio.Queue(maxsize=config.agent_trace_reporter_max_buffer_size) if config.agent_meter_reporter_active: self.__meter_queue = asyncio.Queue(maxsize=config.agent_meter_reporter_max_buffer_size) if config.agent_log_reporter_active: self.__log_queue = asyncio.Queue(maxsize=config.agent_log_reporter_max_buffer_size) if config.agent_profile_active: self.__snapshot_queue = asyncio.Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size) # initialize background coroutines self.background_coroutines = set() self.background_tasks = set() self._finished = asyncio.Event() # Install log reporter core if config.agent_log_reporter_active: from skywalking import log log.install() if config.agent_meter_reporter_active: # meter.init(force=True) await meter.init_async() if config.sample_n_per_3_secs > 0: await sampling.init_async() self.__bootstrap() # gather all coroutines logger.debug('All background coroutines started') await asyncio.gather(*self.background_coroutines) def __start_event_loop(self) -> None: try: asyncio.run(self.__start_event_loop_async()) except asyncio.CancelledError: logger.info('Python agent asyncio event loop is closed') except Exception as e: logger.error(f'Error in Python agent asyncio event loop: {e}') finally: if self._finished is not None: self._finished.set() def start(self) -> None: loggings.init() if sys.version_info < (3, 7): # agent may or may not work for Python 3.6 and below # since 3.6 is EOL, we will not officially support it logger.warning('SkyWalking Python agent does not support Python 3.6 and below, ' 'please upgrade to Python 3.7 or above.') if not self.__started: # if not already started, start the agent config.finalize() # Must be finalized exactly once self.__started = True logger.info(f'SkyWalking async agent instance {config.agent_instance_name} starting in pid-{os.getpid()}.') # Here we install all other lib plugins on first time start (parent process) plugins.install() elif self.__started and os.getpid() == self.started_pid: # if already started, and this is the same process, raise an error raise RuntimeError('SkyWalking Python agent has already been started in this process, ' 'did you call start more than once in your code + sw-python CLI? ' 'If you already use sw-python CLI, you should remove the manual start(), vice versa.') self.started_pid = os.getpid() atexit.register(self.__fini) # still init profile here, since it is using threading rather than asyncio if config.agent_profile_active: profile.init() self.event_loop_thread = Thread(name='event_loop_thread', target=self.__start_event_loop, daemon=True) self.event_loop_thread.start() async def __fini_async(self): """ This method is called when the agent is shutting down. Clean up all the queues and stop all the asyncio tasks. """ if self._finished is not None: self._finished.set() queue_join_coroutine_list = [self.__segment_queue.join()] if config.agent_log_reporter_active: queue_join_coroutine_list.append(self.__log_queue.join()) if config.agent_profile_active: queue_join_coroutine_list.append(self.__snapshot_queue.join()) if config.agent_meter_reporter_active: queue_join_coroutine_list.append(self.__meter_queue.join()) await asyncio.gather(*queue_join_coroutine_list, return_exceptions=True) # clean queues # cancel all tasks all_tasks = asyncio.all_tasks(self.loop) for task in all_tasks: task.cancel() def __fini(self): if not self.loop.is_closed(): asyncio.run_coroutine_threadsafe(self.__fini_async(), self.loop) self.event_loop_thread.join() logger.info('Finished Python agent event_loop thread') # TODO: Unhandled error in sys.excepthook https://github.com/pytest-dev/execnet/issues/30 def stop(self) -> None: """ Stops the agent and reset the started flag. """ atexit.unregister(self.__fini) self.__fini() self.__started = False @report_with_backoff_async(reporter_name='heartbeat', init_wait=config.agent_collector_heartbeat_period) async def __heartbeat(self) -> None: await self.__protocol.heartbeat() @report_with_backoff_async(reporter_name='segment', init_wait=0.02) async def __report_segment(self) -> bool: """Returns True if the queue is not empty""" queue_not_empty_flag = not self.__segment_queue.empty() if queue_not_empty_flag: await self.__protocol.report_segment(self.__segment_queue) return queue_not_empty_flag @report_with_backoff_async(reporter_name='log', init_wait=0.02) async def __report_log(self) -> bool: """Returns True if the queue is not empty""" queue_not_empty_flag = not self.__log_queue.empty() if queue_not_empty_flag: await self.__protocol.report_log(self.__log_queue) return queue_not_empty_flag @report_with_backoff_async(reporter_name='meter', init_wait=config.agent_meter_reporter_period) async def __report_meter(self) -> None: if not self.__meter_queue.empty(): await self.__protocol.report_meter(self.__meter_queue) @report_with_backoff_async(reporter_name='profile_snapshot', init_wait=0.5) async def __send_profile_snapshot(self) -> None: if not self.__snapshot_queue.empty(): await self.__protocol.report_snapshot(self.__snapshot_queue) @report_with_backoff_async( reporter_name='query_profile_command', init_wait=config.agent_collector_get_profile_task_interval) async def __query_profile_command(self) -> None: await self.__protocol.query_profile_commands() @staticmethod async def __command_dispatch() -> None: # command dispatch will stuck when there are no commands await command_service_async.dispatch() def __asyncio_queue_put_nowait(self, q: asyncio.Queue, queue_name: str, item): try: q.put_nowait(item) except asyncio.QueueFull: logger.warning(f'the {queue_name} queue is full, the item will be abandoned') def is_segment_queue_full(self): return self.__segment_queue.full() def archive_segment(self, segment: 'Segment'): if not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.__asyncio_queue_put_nowait, self.__segment_queue, 'segment', segment) def archive_log(self, log_data: 'LogData'): if not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.__asyncio_queue_put_nowait, self.__log_queue, 'log', log_data) def archive_meter(self, meter_data: 'MeterData'): if not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.__asyncio_queue_put_nowait, self.__meter_queue, 'meter', meter_data) async def archive_meter_async(self, meter_data: 'MeterData'): try: self.__meter_queue.put_nowait(meter_data) except asyncio.QueueFull: logger.warning('the meter queue is full, the item will be abandoned') def add_profiling_snapshot(self, snapshot: TracingThreadSnapshot): self.loop.call_soon_threadsafe(self.__asyncio_queue_put_nowait, self.__snapshot_queue, 'snapshot', snapshot) def notify_profile_finish(self, task: ProfileTask): try: asyncio.run_coroutine_threadsafe(self.__protocol.notify_profile_task_finish(task), self.loop) except Exception as e: logger.error(f'notify profile task finish to backend fail. {e}') # Export for user (backwards compatibility) # so users still use `from skywalking import agent` agent = SkyWalkingAgentAsync() if config.agent_asyncio_enhancement else SkyWalkingAgent() start = agent.start