elasticapm/transport/base.py (268 lines of code) (raw):

# -*- coding: utf-8 -*- # BSD 3-Clause License # # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import gzip import io import os import queue as _queue import random import sys import threading import time import timeit from collections import defaultdict from elasticapm.utils import json_encoder from elasticapm.utils.logging import get_logger from elasticapm.utils.threading import ThreadManager logger = get_logger("elasticapm.transport") class Transport(ThreadManager): """ All transport implementations need to subclass this class You must implement a send method.. """ async_mode = False def __init__( self, client, compress_level=5, json_serializer=json_encoder.dumps, queue_chill_count=500, queue_chill_time=1.0, processors=None, **kwargs ) -> None: """ Create a new Transport instance :param compress_level: GZip compress level. If zero, no GZip compression will be used :param json_serializer: serializer to use for JSON encoding :param kwargs: """ self.client = client self.state = TransportState() self._metadata = None self._compress_level = min(9, max(0, compress_level if compress_level is not None else 0)) self._json_serializer = json_serializer self._queued_data = None self._event_queue = self._init_event_queue(chill_until=queue_chill_count, max_chill_time=queue_chill_time) self._is_chilled_queue = isinstance(self._event_queue, ChilledQueue) self._thread = None self._last_flush = timeit.default_timer() self._counts = defaultdict(int) self._flushed = threading.Event() self._closed = False self._processors = processors if processors is not None else [] super(Transport, self).__init__() self.start_stop_order = sys.maxsize # ensure that the transport thread is always started/stopped last @property def _max_flush_time_seconds(self): return self.client.config.api_request_time.total_seconds() if self.client else None @property def _max_buffer_size(self): return self.client.config.api_request_size if self.client else None def queue(self, event_type, data, flush=False) -> None: try: self._flushed.clear() kwargs = {"chill": not (event_type == "close" or flush)} if self._is_chilled_queue else {} self._event_queue.put((event_type, data, flush), block=False, **kwargs) except _queue.Full: logger.debug("Event of type %s dropped due to full event queue", event_type) def _process_queue(self) -> None: # Rebuild the metadata to capture new process information if self.client: self._metadata = self.client.build_metadata() buffer = self._init_buffer() buffer_written = False # add some randomness to timeout to avoid stampedes of several workers that are booted at the same time max_flush_time = ( self._max_flush_time_seconds * random.uniform(0.9, 1.1) if self._max_flush_time_seconds else None ) while True: since_last_flush = timeit.default_timer() - self._last_flush # take max flush time into account to calculate timeout timeout = max(0, max_flush_time - since_last_flush) if max_flush_time else None timed_out = False try: event_type, data, flush = self._event_queue.get(block=True, timeout=timeout) except _queue.Empty: event_type, data, flush = None, None, None timed_out = True if event_type == "close": if buffer_written: try: self._flush(buffer) except Exception as exc: logger.error( "Exception occurred while flushing the buffer " "before closing the transport connection: {0}".format(exc) ) self._flushed.set() return # time to go home! if data is not None: data = self._process_event(event_type, data) if data is not None: if not buffer_written: # Write metadata just in time to allow for late metadata changes (such as in lambda) self._write_metadata(buffer) buffer.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8")) buffer_written = True self._counts[event_type] += 1 queue_size = 0 if buffer.fileobj is None else buffer.fileobj.tell() forced_flush = flush if forced_flush: logger.debug("forced flush") elif timed_out or timeout == 0: # update last flush time, as we might have waited for a non trivial amount of time in # _event_queue.get() since_last_flush = timeit.default_timer() - self._last_flush logger.debug( "flushing due to time since last flush %.3fs > max_flush_time %.3fs", since_last_flush, max_flush_time, ) flush = True elif self._max_buffer_size and queue_size > self._max_buffer_size: logger.debug( "flushing since queue size %d bytes > max_queue_size %d bytes", queue_size, self._max_buffer_size ) flush = True if flush: if buffer_written: self._flush(buffer, forced_flush=forced_flush) elif forced_flush and any(x in self.client.config.server_url for x in ("/localhost:", "/127.0.0.1:")): # No data on buffer, but due to manual flush we should send # an empty payload with flushed=true query param, but only # to a local APM server (or lambda extension) try: self.send("", forced_flush=True) self.handle_transport_success() except Exception as e: self.handle_transport_fail(e) self._last_flush = timeit.default_timer() buffer = self._init_buffer() buffer_written = False max_flush_time = ( self._max_flush_time_seconds * random.uniform(0.9, 1.1) if self._max_flush_time_seconds else None ) self._flushed.set() def _process_event(self, event_type, data): # Run the data through processors for processor in self._processors: if not hasattr(processor, "event_types") or event_type in processor.event_types: try: data = processor(self.client, data) if not data: logger.debug( "Dropped event of type %s due to processor %s.%s", event_type, processor.__module__, processor.__name__, ) return None except Exception: logger.warning( "Dropped event of type %s due to exception in processor %s.%s", event_type, processor.__module__, processor.__name__, exc_info=True, ) return None return data def _init_buffer(self): buffer = gzip.GzipFile(fileobj=io.BytesIO(), mode="w", compresslevel=self._compress_level) return buffer def _write_metadata(self, buffer) -> None: data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8") buffer.write(data) def _init_event_queue(self, chill_until, max_chill_time): # some libraries like eventlet monkeypatch queue.Queue and switch out the implementation. # In those cases we can't rely on internals of queue.Queue to be there, so we simply use # their queue and forgo the optimizations of ChilledQueue. In the case of eventlet, this # isn't really a loss, because the main reason for ChilledQueue (avoiding context switches # due to the event processor thread being woken up all the time) is not an issue. if all( ( hasattr(_queue.Queue, "not_full"), hasattr(_queue.Queue, "not_empty"), hasattr(_queue.Queue, "unfinished_tasks"), ) ): return ChilledQueue(maxsize=10000, chill_until=chill_until, max_chill_time=max_chill_time) else: return _queue.Queue(maxsize=10000) def _flush(self, buffer, forced_flush=False) -> None: """ Flush the queue. This method should only be called from the event processing queue :return: None """ if not self.state.should_try(): logger.error("dropping flushed data due to transport failure back-off") buffer.close() else: fileobj = buffer.fileobj # get a reference to the fileobj before closing the gzip file buffer.close() data = fileobj.getbuffer() try: self.send(data, forced_flush=forced_flush) self.handle_transport_success() except Exception as e: self.handle_transport_fail(e) data.release() def start_thread(self, pid=None) -> None: super(Transport, self).start_thread(pid=pid) if (not self._thread or self.pid != self._thread.pid) and not self._closed: self.handle_fork() try: self._thread = threading.Thread(target=self._process_queue, name="eapm event processor thread") self._thread.daemon = True self._thread.pid = self.pid self._thread.start() except RuntimeError: pass def send(self, data, forced_flush=False): """ You need to override this to do something with the actual data. Usually - this is sending to a server """ raise NotImplementedError def close(self) -> None: """ Cleans up resources and closes connection :return: """ if self._closed or (not self._thread or self._thread.pid != os.getpid()): return self._closed = True self.queue("close", None) if not self._flushed.wait(timeout=self._max_flush_time_seconds): logger.error("Closing the transport connection timed out.") def stop_thread(self) -> None: self.close() def flush(self): """ Trigger a flush of the queue. Note: this method will only return once the queue is empty. This means it can block indefinitely if more events are produced in other threads than can be consumed. """ self.queue(None, None, flush=True) if not self._flushed.wait(timeout=self._max_flush_time_seconds): raise ValueError("flush timed out") def handle_transport_success(self, **kwargs) -> None: """ Success handler called by the transport on successful send """ self.state.set_success() def handle_transport_fail(self, exception=None, **kwargs) -> None: """ Failure handler called by the transport on send failure """ message = str(exception) logger.error("Failed to submit message: %r", message, exc_info=False) self.state.set_fail() def handle_fork(self) -> None: """Helper method to run code after a fork has been detected""" pass # left for backwards compatibility AsyncTransport = Transport class TransportState(object): ONLINE = 1 ERROR = 0 def __init__(self) -> None: self.status = self.ONLINE self.last_check = None self.retry_number = -1 def should_try(self): if self.status == self.ONLINE: return True interval = min(self.retry_number, 6) ** 2 return timeit.default_timer() - self.last_check > interval def set_fail(self) -> None: self.status = self.ERROR self.retry_number += 1 self.last_check = timeit.default_timer() def set_success(self) -> None: self.status = self.ONLINE self.last_check = None self.retry_number = -1 def did_fail(self): return self.status == self.ERROR class ChilledQueue(_queue.Queue, object): """ A queue subclass that is a bit more chill about how often it notifies the not empty event Note: we inherit from object because queue.Queue is an old-style class in Python 2. This can be removed once we stop support for Python 2 """ def __init__(self, maxsize=0, chill_until=100, max_chill_time=1.0) -> None: self._chill_until = chill_until self._max_chill_time = max_chill_time self._last_unchill = time.time() super(ChilledQueue, self).__init__(maxsize=maxsize) def put(self, item, block=True, timeout=None, chill=True): """Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Full exception if no free slot was available within that time. Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). """ with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise _queue.Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time.time() + timeout while self._qsize() >= self.maxsize: remaining = endtime - time.time() if remaining <= 0.0: raise _queue.Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 if ( not chill or self._qsize() > self._chill_until or (time.time() - self._last_unchill) > self._max_chill_time ): self.not_empty.notify() self._last_unchill = time.time()