def _process_queue()

in elasticapm/transport/base.py [0:0]


    def _process_queue(self) -> None:
        # Rebuild the metadata to capture new process information
        if self.client:
            self._metadata = self.client.build_metadata()

        buffer = self._init_buffer()
        buffer_written = False
        # add some randomness to timeout to avoid stampedes of several workers that are booted at the same time
        max_flush_time = (
            self._max_flush_time_seconds * random.uniform(0.9, 1.1) if self._max_flush_time_seconds else None
        )

        while True:
            since_last_flush = timeit.default_timer() - self._last_flush
            # take max flush time into account to calculate timeout
            timeout = max(0, max_flush_time - since_last_flush) if max_flush_time else None
            timed_out = False
            try:
                event_type, data, flush = self._event_queue.get(block=True, timeout=timeout)
            except _queue.Empty:
                event_type, data, flush = None, None, None
                timed_out = True

            if event_type == "close":
                if buffer_written:
                    try:
                        self._flush(buffer)
                    except Exception as exc:
                        logger.error(
                            "Exception occurred while flushing the buffer "
                            "before closing the transport connection: {0}".format(exc)
                        )
                self._flushed.set()
                return  # time to go home!

            if data is not None:
                data = self._process_event(event_type, data)
                if data is not None:
                    if not buffer_written:
                        # Write metadata just in time to allow for late metadata changes (such as in lambda)
                        self._write_metadata(buffer)
                    buffer.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
                    buffer_written = True
                    self._counts[event_type] += 1

            queue_size = 0 if buffer.fileobj is None else buffer.fileobj.tell()

            forced_flush = flush
            if forced_flush:
                logger.debug("forced flush")
            elif timed_out or timeout == 0:
                # update last flush time, as we might have waited for a non trivial amount of time in
                # _event_queue.get()
                since_last_flush = timeit.default_timer() - self._last_flush
                logger.debug(
                    "flushing due to time since last flush %.3fs > max_flush_time %.3fs",
                    since_last_flush,
                    max_flush_time,
                )
                flush = True
            elif self._max_buffer_size and queue_size > self._max_buffer_size:
                logger.debug(
                    "flushing since queue size %d bytes > max_queue_size %d bytes", queue_size, self._max_buffer_size
                )
                flush = True
            if flush:
                if buffer_written:
                    self._flush(buffer, forced_flush=forced_flush)
                elif forced_flush and any(x in self.client.config.server_url for x in ("/localhost:", "/127.0.0.1:")):
                    # No data on buffer, but due to manual flush we should send
                    # an empty payload with flushed=true query param, but only
                    # to a local APM server (or lambda extension)
                    try:
                        self.send("", forced_flush=True)
                        self.handle_transport_success()
                    except Exception as e:
                        self.handle_transport_fail(e)
                self._last_flush = timeit.default_timer()
                buffer = self._init_buffer()
                buffer_written = False
                max_flush_time = (
                    self._max_flush_time_seconds * random.uniform(0.9, 1.1) if self._max_flush_time_seconds else None
                )
                self._flushed.set()