in elasticapm/transport/base.py [0:0]
def _process_queue(self) -> None:
# Rebuild the metadata to capture new process information
if self.client:
self._metadata = self.client.build_metadata()
buffer = self._init_buffer()
buffer_written = False
# add some randomness to timeout to avoid stampedes of several workers that are booted at the same time
max_flush_time = (
self._max_flush_time_seconds * random.uniform(0.9, 1.1) if self._max_flush_time_seconds else None
)
while True:
since_last_flush = timeit.default_timer() - self._last_flush
# take max flush time into account to calculate timeout
timeout = max(0, max_flush_time - since_last_flush) if max_flush_time else None
timed_out = False
try:
event_type, data, flush = self._event_queue.get(block=True, timeout=timeout)
except _queue.Empty:
event_type, data, flush = None, None, None
timed_out = True
if event_type == "close":
if buffer_written:
try:
self._flush(buffer)
except Exception as exc:
logger.error(
"Exception occurred while flushing the buffer "
"before closing the transport connection: {0}".format(exc)
)
self._flushed.set()
return # time to go home!
if data is not None:
data = self._process_event(event_type, data)
if data is not None:
if not buffer_written:
# Write metadata just in time to allow for late metadata changes (such as in lambda)
self._write_metadata(buffer)
buffer.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
buffer_written = True
self._counts[event_type] += 1
queue_size = 0 if buffer.fileobj is None else buffer.fileobj.tell()
forced_flush = flush
if forced_flush:
logger.debug("forced flush")
elif timed_out or timeout == 0:
# update last flush time, as we might have waited for a non trivial amount of time in
# _event_queue.get()
since_last_flush = timeit.default_timer() - self._last_flush
logger.debug(
"flushing due to time since last flush %.3fs > max_flush_time %.3fs",
since_last_flush,
max_flush_time,
)
flush = True
elif self._max_buffer_size and queue_size > self._max_buffer_size:
logger.debug(
"flushing since queue size %d bytes > max_queue_size %d bytes", queue_size, self._max_buffer_size
)
flush = True
if flush:
if buffer_written:
self._flush(buffer, forced_flush=forced_flush)
elif forced_flush and any(x in self.client.config.server_url for x in ("/localhost:", "/127.0.0.1:")):
# No data on buffer, but due to manual flush we should send
# an empty payload with flushed=true query param, but only
# to a local APM server (or lambda extension)
try:
self.send("", forced_flush=True)
self.handle_transport_success()
except Exception as e:
self.handle_transport_fail(e)
self._last_flush = timeit.default_timer()
buffer = self._init_buffer()
buffer_written = False
max_flush_time = (
self._max_flush_time_seconds * random.uniform(0.9, 1.1) if self._max_flush_time_seconds else None
)
self._flushed.set()