elasticapm/transport/http.py (192 lines of code) (raw):

# -*- coding: utf-8 -*- # BSD 3-Clause License # # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import hashlib import json import os import re import ssl import urllib.parse from urllib.request import getproxies_environment, proxy_bypass_environment import urllib3 from urllib3.exceptions import MaxRetryError, TimeoutError from elasticapm.transport.exceptions import TransportException from elasticapm.transport.http_base import HTTPTransportBase from elasticapm.utils import json_encoder, read_pem_file from elasticapm.utils.logging import get_logger try: import certifi except ImportError: certifi = None logger = get_logger("elasticapm.transport.http") class Transport(HTTPTransportBase): def __init__(self, url: str, *args, **kwargs) -> None: super(Transport, self).__init__(url, *args, **kwargs) pool_kwargs = {"cert_reqs": "CERT_REQUIRED", "ca_certs": self.ca_certs, "block": True} if url.startswith("https"): if self._server_cert: pool_kwargs.update( {"assert_fingerprint": self.cert_fingerprint, "assert_hostname": False, "cert_reqs": ssl.CERT_NONE} ) del pool_kwargs["ca_certs"] elif not self._verify_server_cert: pool_kwargs["cert_reqs"] = ssl.CERT_NONE pool_kwargs["assert_hostname"] = False self._pool_kwargs = pool_kwargs self._http = None self._url = url def send(self, data, forced_flush=False, custom_url=None, custom_headers=None): response = None headers = self._headers.copy() if self._headers else {} headers.update(self.auth_headers) if custom_headers: headers.update(custom_headers) else: headers.update( { b"Content-Type": b"application/x-ndjson", b"Content-Encoding": b"gzip", } ) url = custom_url or self._url if forced_flush: url = f"{url}?flushed=true" try: try: response = self.http.urlopen( "POST", url, body=data, headers=headers, timeout=self._timeout, preload_content=False ) logger.debug("Sent request, url=%s size=%.2fkb status=%s", url, len(data) / 1024.0, response.status) except Exception as e: if isinstance(e, MaxRetryError) and isinstance(e.reason, TimeoutError): message = "Connection to APM Server timed out " "(url: %s, timeout: %s seconds)" % ( self._url, self._timeout, ) else: message = "Unable to reach APM Server: %s (url: %s)" % (e, self._url) raise TransportException(message, data) body = response.read() if response.status >= 400: if response.status == 429: # rate-limited message = "Temporarily rate limited: " else: message = "HTTP %s: " % response.status message += body.decode("utf8", errors="replace")[:10000] raise TransportException(message, data) return response.headers.get("Location") finally: if response: response.close() @property def http(self) -> urllib3.PoolManager: if not self._http: url_parts = urllib.parse.urlparse(self._url) proxies = getproxies_environment() proxy_url = proxies.get("https", proxies.get("http", None)) if proxy_url and not proxy_bypass_environment(url_parts.netloc): self._http = urllib3.ProxyManager(proxy_url, **self._pool_kwargs) else: self._http = urllib3.PoolManager(**self._pool_kwargs) return self._http def handle_fork(self) -> None: # reset http pool to avoid sharing connections with the parent process self._http = None def get_config(self, current_version=None, keys=None): """ Gets configuration from a remote APM Server :param current_version: version of the current configuration :param keys: a JSON-serializable dict to identify this instance, e.g. { "service": { "name": "foo", "environment": "bar" } } :return: a three-tuple of new version, config dictionary and validity in seconds. Any element of the tuple can be None. """ url = self._config_url data = json_encoder.dumps(keys).encode("utf-8") headers = self._headers.copy() headers[b"Content-Type"] = "application/json" headers.update(self.auth_headers) max_age = 300 if current_version: headers["If-None-Match"] = current_version try: response = self.http.urlopen( "POST", url, body=data, headers=headers, timeout=self._timeout, preload_content=False ) except (urllib3.exceptions.RequestError, urllib3.exceptions.HTTPError) as e: logger.debug("HTTP error while fetching remote config: %s", str(e)) return current_version, None, max_age body = response.read() max_age = self._get_cache_control_max_age(response.headers) or max_age if response.status == 304: # config is unchanged, return logger.debug("Configuration unchanged") return current_version, None, max_age elif response.status >= 400: return None, None, max_age if not body: logger.debug("APM Server answered with empty body and status code %s", response.status) return current_version, None, max_age body = body.decode("utf-8") try: data = json_encoder.loads(body) return response.headers.get("Etag"), data, max_age except json.JSONDecodeError: logger.warning("Failed decoding APM Server response as JSON: %s", body) return current_version, None, max_age def _get_cache_control_max_age(self, response_headers): max_age = None if "Cache-Control" in response_headers: try: cc_max_age = int(next(re.finditer(r"max-age=(\d+)", response_headers["Cache-Control"])).groups()[0]) if cc_max_age <= 0: # max_age remains at default value pass elif cc_max_age < 5: max_age = 5 else: max_age = cc_max_age except StopIteration: logger.debug("Could not parse Cache-Control header: %s", response_headers["Cache-Control"]) return max_age def _process_queue(self) -> None: if not self.client.server_version: self.fetch_server_info() super()._process_queue() def fetch_server_info(self) -> None: headers = self._headers.copy() if self._headers else {} headers.update(self.auth_headers) headers[b"accept"] = b"text/plain" try: response = self.http.urlopen("GET", self._server_info_url, headers=headers, timeout=self._timeout) body = response.data data = json_encoder.loads(body.decode("utf8")) version = data["version"] logger.debug("Fetched APM Server version %s", version) self.client.server_version = version_string_to_tuple(version) except (urllib3.exceptions.RequestError, urllib3.exceptions.HTTPError) as e: logger.debug("HTTP error while fetching server information: %s", str(e)) except json.JSONDecodeError as e: logger.debug( f"JSON decoding error while fetching server information. Error: {str(e)} Body: {body.decode('utf8')}" ) except (KeyError, TypeError): logger.debug("No version key found in server response: %s", response.data) @property def cert_fingerprint(self): if self._server_cert: with open(self._server_cert, "rb") as f: cert_data = read_pem_file(f) digest = hashlib.sha256() digest.update(cert_data) return digest.hexdigest() return None @property def auth_headers(self): headers = super(Transport, self).auth_headers return {k.encode("ascii"): v.encode("ascii") for k, v in headers.items()} @property def ca_certs(self): """ Return location of certificate store. If the server_ca_cert_file config option is set, its value is returned. Otherwise, the certifi store is used, unless it is disabled or not installed. """ if self._server_ca_cert_file: return self._server_ca_cert_file return certifi.where() if (certifi and self.client.config.use_certifi) else None def close(self): """ Take care of being able to shutdown cleanly :return: """ if self._closed or (not self._thread or self._thread.pid != os.getpid()): return self._closed = True # we are racing against urllib3 ConnectionPool weakref finalizer that would lead to having them closed # and we hanging waiting for send any eventual queued data # Force the creation of a new PoolManager so that we are always able to flush self._http = None self.queue("close", None) if not self._flushed.wait(timeout=self._max_flush_time_seconds): logger.error("Closing the transport connection timed out.") def version_string_to_tuple(version): if version: version_parts = re.split(r"[.\-]", version) return tuple(int(p) if p.isdigit() else p for p in version_parts) return () # left for backwards compatibility AsyncTransport = Transport