# -*- coding: utf-8 -*-

#  BSD 3-Clause License
#
#  Copyright (c) 2019, Elasticsearch BV
#  All rights reserved.
#
#  Redistribution and use in source and binary forms, with or without
#  modification, are permitted provided that the following conditions are met:
#
#  * Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
#  * Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
#  * Neither the name of the copyright holder nor the names of its
#    contributors may be used to endorse or promote products derived from
#    this software without specific prior written permission.
#
#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
#  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
#  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
#  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
#  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import hashlib
import json
import os
import re
import ssl
import urllib.parse
from urllib.request import getproxies_environment, proxy_bypass_environment

import urllib3
from urllib3.exceptions import MaxRetryError, TimeoutError

from elasticapm.transport.exceptions import TransportException
from elasticapm.transport.http_base import HTTPTransportBase
from elasticapm.utils import json_encoder, read_pem_file
from elasticapm.utils.logging import get_logger

try:
    import certifi
except ImportError:
    certifi = None

logger = get_logger("elasticapm.transport.http")


class Transport(HTTPTransportBase):
    def __init__(self, url: str, *args, **kwargs) -> None:
        super(Transport, self).__init__(url, *args, **kwargs)
        pool_kwargs = {"cert_reqs": "CERT_REQUIRED", "ca_certs": self.ca_certs, "block": True}
        if url.startswith("https"):
            if self._server_cert:
                pool_kwargs.update(
                    {"assert_fingerprint": self.cert_fingerprint, "assert_hostname": False, "cert_reqs": ssl.CERT_NONE}
                )
                del pool_kwargs["ca_certs"]
            elif not self._verify_server_cert:
                pool_kwargs["cert_reqs"] = ssl.CERT_NONE
                pool_kwargs["assert_hostname"] = False
        self._pool_kwargs = pool_kwargs
        self._http = None
        self._url = url

    def send(self, data, forced_flush=False, custom_url=None, custom_headers=None):
        response = None

        headers = self._headers.copy() if self._headers else {}
        headers.update(self.auth_headers)
        if custom_headers:
            headers.update(custom_headers)
        else:
            headers.update(
                {
                    b"Content-Type": b"application/x-ndjson",
                    b"Content-Encoding": b"gzip",
                }
            )

        url = custom_url or self._url
        if forced_flush:
            url = f"{url}?flushed=true"
        try:
            try:
                response = self.http.urlopen(
                    "POST", url, body=data, headers=headers, timeout=self._timeout, preload_content=False
                )
                logger.debug("Sent request, url=%s size=%.2fkb status=%s", url, len(data) / 1024.0, response.status)
            except Exception as e:
                if isinstance(e, MaxRetryError) and isinstance(e.reason, TimeoutError):
                    message = "Connection to APM Server timed out " "(url: %s, timeout: %s seconds)" % (
                        self._url,
                        self._timeout,
                    )
                else:
                    message = "Unable to reach APM Server: %s (url: %s)" % (e, self._url)
                raise TransportException(message, data)
            body = response.read()
            if response.status >= 400:
                if response.status == 429:  # rate-limited
                    message = "Temporarily rate limited: "
                else:
                    message = "HTTP %s: " % response.status
                message += body.decode("utf8", errors="replace")[:10000]
                raise TransportException(message, data)
            return response.headers.get("Location")
        finally:
            if response:
                response.close()

    @property
    def http(self) -> urllib3.PoolManager:
        if not self._http:
            url_parts = urllib.parse.urlparse(self._url)
            proxies = getproxies_environment()
            proxy_url = proxies.get("https", proxies.get("http", None))
            if proxy_url and not proxy_bypass_environment(url_parts.netloc):
                self._http = urllib3.ProxyManager(proxy_url, **self._pool_kwargs)
            else:
                self._http = urllib3.PoolManager(**self._pool_kwargs)
        return self._http

    def handle_fork(self) -> None:
        # reset http pool to avoid sharing connections with the parent process
        self._http = None

    def get_config(self, current_version=None, keys=None):
        """
        Gets configuration from a remote APM Server

        :param current_version: version of the current configuration
        :param keys: a JSON-serializable dict to identify this instance, e.g.
                {
                    "service": {
                        "name": "foo",
                        "environment": "bar"
                    }
                }
        :return: a three-tuple of new version, config dictionary and validity in seconds.
            Any element of the tuple can be None.
        """
        url = self._config_url
        data = json_encoder.dumps(keys).encode("utf-8")
        headers = self._headers.copy()
        headers[b"Content-Type"] = "application/json"
        headers.update(self.auth_headers)
        max_age = 300
        if current_version:
            headers["If-None-Match"] = current_version
        try:
            response = self.http.urlopen(
                "POST", url, body=data, headers=headers, timeout=self._timeout, preload_content=False
            )
        except (urllib3.exceptions.RequestError, urllib3.exceptions.HTTPError) as e:
            logger.debug("HTTP error while fetching remote config: %s", str(e))
            return current_version, None, max_age
        body = response.read()

        max_age = self._get_cache_control_max_age(response.headers) or max_age

        if response.status == 304:
            # config is unchanged, return
            logger.debug("Configuration unchanged")
            return current_version, None, max_age
        elif response.status >= 400:
            return None, None, max_age

        if not body:
            logger.debug("APM Server answered with empty body and status code %s", response.status)
            return current_version, None, max_age
        body = body.decode("utf-8")
        try:
            data = json_encoder.loads(body)
            return response.headers.get("Etag"), data, max_age
        except json.JSONDecodeError:
            logger.warning("Failed decoding APM Server response as JSON: %s", body)
            return current_version, None, max_age

    def _get_cache_control_max_age(self, response_headers):
        max_age = None
        if "Cache-Control" in response_headers:
            try:
                cc_max_age = int(next(re.finditer(r"max-age=(\d+)", response_headers["Cache-Control"])).groups()[0])
                if cc_max_age <= 0:
                    # max_age remains at default value
                    pass
                elif cc_max_age < 5:
                    max_age = 5
                else:
                    max_age = cc_max_age
            except StopIteration:
                logger.debug("Could not parse Cache-Control header: %s", response_headers["Cache-Control"])
        return max_age

    def _process_queue(self) -> None:
        if not self.client.server_version:
            self.fetch_server_info()
        super()._process_queue()

    def fetch_server_info(self) -> None:
        headers = self._headers.copy() if self._headers else {}
        headers.update(self.auth_headers)
        headers[b"accept"] = b"text/plain"
        try:
            response = self.http.urlopen("GET", self._server_info_url, headers=headers, timeout=self._timeout)
            body = response.data
            data = json_encoder.loads(body.decode("utf8"))
            version = data["version"]
            logger.debug("Fetched APM Server version %s", version)
            self.client.server_version = version_string_to_tuple(version)
        except (urllib3.exceptions.RequestError, urllib3.exceptions.HTTPError) as e:
            logger.debug("HTTP error while fetching server information: %s", str(e))
        except json.JSONDecodeError as e:
            logger.debug(
                f"JSON decoding error while fetching server information. Error: {str(e)} Body: {body.decode('utf8')}"
            )
        except (KeyError, TypeError):
            logger.debug("No version key found in server response: %s", response.data)

    @property
    def cert_fingerprint(self):
        if self._server_cert:
            with open(self._server_cert, "rb") as f:
                cert_data = read_pem_file(f)
            digest = hashlib.sha256()
            digest.update(cert_data)
            return digest.hexdigest()
        return None

    @property
    def auth_headers(self):
        headers = super(Transport, self).auth_headers
        return {k.encode("ascii"): v.encode("ascii") for k, v in headers.items()}

    @property
    def ca_certs(self):
        """
        Return location of certificate store. If the server_ca_cert_file config option is set,
        its value is returned. Otherwise, the certifi store is used, unless it is disabled or not installed.
        """
        if self._server_ca_cert_file:
            return self._server_ca_cert_file
        return certifi.where() if (certifi and self.client.config.use_certifi) else None

    def close(self):
        """
        Take care of being able to shutdown cleanly
        :return:
        """
        if self._closed or (not self._thread or self._thread.pid != os.getpid()):
            return

        self._closed = True
        # we are racing against urllib3 ConnectionPool weakref finalizer that would lead to having them closed
        # and we hanging waiting for send any eventual queued data
        # Force the creation of a new PoolManager so that we are always able to flush
        self._http = None
        self.queue("close", None)
        if not self._flushed.wait(timeout=self._max_flush_time_seconds):
            logger.error("Closing the transport connection timed out.")


def version_string_to_tuple(version):
    if version:
        version_parts = re.split(r"[.\-]", version)
        return tuple(int(p) if p.isdigit() else p for p in version_parts)
    return ()


# left for backwards compatibility
AsyncTransport = Transport
