import asyncio
import aiohttp
import logging
import io
import os
import ssl
import time
import re
import certifi
import json
from requests import status_codes, adapters, PreparedRequest
from typing import Any, Dict, Optional
from enum import Enum
from urllib.parse import urlencode, urlparse
from requests import status_codes, adapters, PreparedRequest, Session
from darabonba.exceptions import RequiredArgumentException, RetryError
from darabonba.model import DaraModel
from darabonba.request import DaraRequest
from darabonba.response import DaraResponse
from darabonba.utils.stream import BaseStream
from darabonba.policy.retry import RetryOptions, RetryPolicyContext

sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

DEFAULT_CONNECT_TIMEOUT = 5000
DEFAULT_READ_TIMEOUT = 10000
DEFAULT_POOL_SIZE = 10
MAX_DELAY_TIME = 120 * 1000
MIN_DELAY_TIME = 100

logger = logging.getLogger('darabonba-core')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
logger.addHandler(ch)

class _ModelEncoder(json.JSONEncoder):
    def default(self, o: Any) -> Any:
        if isinstance(o, DaraModel):
            return o.to_map()
        elif isinstance(o, bytes):
            return o.decode('utf-8')
        super().default(o)

class TLSVersion(Enum):
    TLSv1 = 'TLSv1'
    TLSv1_1 = 'TLSv1.1'
    TLSv1_2 = 'TLSv1.2'
    TLSv1_3 = 'TLSv1.3'    

class _TLSAdapter(adapters.HTTPAdapter):
    """A HTTPAdapter that uses an arbitrary TLS version."""

    def __init__(self, ssl_context=None, **kwargs):
        self.ssl_context = ssl_context
        super().__init__(**kwargs)

    def init_poolmanager(self, *args, **kwargs):
        """Override the init_poolmanager method to set the SSL."""
        kwargs['ssl_context'] = self.ssl_context
        super().init_poolmanager(*args, **kwargs)

class DaraCore:
    _sessions = {}
    http_adapter = adapters.HTTPAdapter(pool_connections=DEFAULT_POOL_SIZE, pool_maxsize=DEFAULT_POOL_SIZE * 4)
    https_adapter = adapters.HTTPAdapter(pool_connections=DEFAULT_POOL_SIZE, pool_maxsize=DEFAULT_POOL_SIZE * 4)

    @staticmethod
    def to_json_string(
        val: Any,
    ) -> str:
        """
        Stringify a value by JSON format
        @return: the JSON format string
        """
        if isinstance(val, str):
            return str(val)
        return json.dumps(
            val, cls=_ModelEncoder, ensure_ascii=False, separators=(",", ":")
        )

    @staticmethod
    def _set_tls_minimum_version(sls_context, tls_min_version):
        context = sls_context
        if tls_min_version is not None:
            if tls_min_version == 'TLSv1':
                context.minimum_version = ssl.TLSVersion.TLSv1
            elif tls_min_version == 'TLSv1.1':
                context.minimum_version = ssl.TLSVersion.TLSv1_1
            elif tls_min_version == 'TLSv1.2':
                context.minimum_version = ssl.TLSVersion.TLSv1_2
            elif tls_min_version == 'TLSv1.3':
                context.minimum_version = ssl.TLSVersion.TLSv1_3
        return context
    
    @staticmethod
    def get_adapter(prefix, tls_min_version: str = None):
        ca_cert = certifi.where()
        context = ssl.create_default_context()
        if ca_cert and prefix.upper() == 'HTTPS':
            context = DaraCore._set_tls_minimum_version(context, tls_min_version)
            context.load_verify_locations(ca_cert)
        adapter = _TLSAdapter(ssl_context=context, pool_connections=DEFAULT_POOL_SIZE,
                              pool_maxsize=DEFAULT_POOL_SIZE * 4)
        return adapter

    @staticmethod
    def _prepare_http_debug(request, symbol):
        base = ''
        for key, value in request.headers.items():
            base += f'\n{symbol} {key} : {value}'
        return base

    @staticmethod
    def _do_http_debug(request, response):
        # logger the request
        url = urlparse(request.url)
        request_base = f'\n> {request.method.upper()} {url.path + url.query} HTTP/1.1'
        logger.debug(request_base + DaraCore._prepare_http_debug(request, '>'))

        # logger the response
        response_base = f'\n< HTTP/1.1 {response.status_code}' \
                        f' {status_codes._codes.get(response.status_code)[0].upper()}'
        logger.debug(response_base + DaraCore._prepare_http_debug(response, '<'))

    @staticmethod
    def compose_url(request):
        host = request.headers.get('host')
        if not host:
            raise RequiredArgumentException('endpoint')
        else:
            host = host.rstrip('/')
        protocol = f'{request.protocol.lower()}://'
        pathname = request.pathname

        if host.startswith(('http://', 'https://')):
            protocol = ''

        if request.port == 80:
            port = ''
        else:
            port = f':{request.port}'

        url = protocol + host + port + pathname

        if request.query:
            if "?" in url:
                if not url.endswith("&"):
                    url += "&"
            else:
                url += "?"

            encode_query = {}
            for key in request.query:
                value = request.query[key]
                if value is not None:
                    encode_query[key] = str(value)
            url += urlencode(encode_query)
        return url.rstrip("?&")

    @staticmethod
    async def async_do_action(
            request: DaraRequest,
            runtime_option=None
    ) -> DaraResponse:
        runtime_option = runtime_option or {}

        url = DaraCore.compose_url(request)
        verify = not runtime_option.get('ignoreSSL', False)
        tls_min_version = runtime_option.get('tlsMinVersion')
        if isinstance(tls_min_version, Enum):
            tls_min_version = tls_min_version.value

        timeout = runtime_option.get('timeout')
        connect_timeout = runtime_option.get('connectTimeout') or timeout or DEFAULT_CONNECT_TIMEOUT
        read_timeout = runtime_option.get('readTimeout') or timeout or DEFAULT_READ_TIMEOUT

        connect_timeout, read_timeout = (int(connect_timeout) / 1000, int(read_timeout) / 1000)

        proxy = None
        if request.protocol.upper() == 'HTTP':
            proxy = runtime_option.get('httpProxy')
            if not proxy:
                proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy')
        elif request.protocol.upper() == 'HTTPS':
            proxy = runtime_option.get('httpsProxy')
            if not proxy:
                proxy = os.environ.get('HTTPS_PROXY') or os.environ.get('https_proxy')

        connector = None
        ca_cert = certifi.where()
        if ca_cert and request.protocol.upper() == 'HTTPS':
            ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
            ssl_context = DaraCore._set_tls_minimum_version(ssl_context, tls_min_version)
            ssl_context.load_verify_locations(ca_cert)
            connector = aiohttp.TCPConnector(
                ssl=ssl_context,
            )
        else:
            verify = False

        timeout = aiohttp.ClientTimeout(
            sock_read=read_timeout,
            sock_connect=connect_timeout
        )
        async with aiohttp.ClientSession(
                connector=connector
        ) as s:
            body = b''
            if isinstance(request.body, BaseStream):
                for content in request.body:
                    body += content
            elif isinstance(request.body, str):
                body = request.body.encode('utf-8')
            else:
                body = request.body
            try:
                async with s.request(request.method, url,
                                     data=body,
                                     headers=request.headers,
                                     ssl=verify,
                                     proxy=proxy,
                                     timeout=timeout) as response:
                    tea_resp = DaraResponse()
                    tea_resp.body = await response.read()
                    tea_resp.headers = {k.lower(): v for k, v in response.headers.items()}
                    tea_resp.status_code = response.status
                    tea_resp.status_message = response.reason
                    tea_resp.response = response
            except IOError as e:
                raise RetryError(str(e))
        return tea_resp

    @staticmethod
    def do_action(
            request: DaraRequest,
            runtime_option=None
    ) -> DaraResponse:
        url = DaraCore.compose_url(request)

        runtime_option = runtime_option or {}

        verify = not runtime_option.get('ignoreSSL', False)
        tls_min_version = runtime_option.get('tlsMinVersion')
        if isinstance(tls_min_version, Enum):
            tls_min_version = tls_min_version.value

        if verify:
            verify = runtime_option.get('ca', True) if runtime_option.get('ca', True) is not None else True
        cert = runtime_option.get('cert', None)

        timeout = runtime_option.get('timeout')
        connect_timeout = runtime_option.get('connectTimeout') or timeout or DEFAULT_CONNECT_TIMEOUT
        read_timeout = runtime_option.get('readTimeout') or timeout or DEFAULT_READ_TIMEOUT

        timeout = (int(connect_timeout) / 1000, int(read_timeout) / 1000)

        if isinstance(request.body, str):
            request.body = request.body.encode('utf-8')

        p = PreparedRequest()
        p.prepare(
            method=request.method.upper(),
            url=url,
            data=request.body,
            headers=request.headers,
        )

        proxies = {}
        http_proxy = runtime_option.get('httpProxy')
        https_proxy = runtime_option.get('httpsProxy')
        no_proxy = runtime_option.get('noProxy')

        if not http_proxy:
            http_proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy')
        if not https_proxy:
            https_proxy = os.environ.get('HTTPS_PROXY') or os.environ.get('https_proxy')

        if http_proxy:
            proxies['http'] = http_proxy
        if https_proxy:
            proxies['https'] = https_proxy
        if no_proxy:
            proxies['no_proxy'] = no_proxy

        host = request.headers.get('host')
        host = host.rstrip('/')

        session_key = f'{request.protocol.lower()}://{host}:{request.port}'
        session = DaraCore._get_session(session_key=session_key, protocol=request.protocol,
                                       tls_min_version=tls_min_version, verify=verify)
        try:
            resp = session.send(
                p,
                proxies=proxies,
                timeout=timeout,
                verify=verify,
                cert=cert,
            )
        except IOError as e:
            raise RetryError(str(e))

        debug = runtime_option.get('debug') or os.getenv('DEBUG')
        if debug and debug.lower() == 'sdk':
            DaraCore._do_http_debug(p, resp)

        response = DaraResponse()
        response.status_message = resp.reason
        response.status_code = resp.status_code
        response.headers = {k.lower(): v for k, v in resp.headers.items()}
        response.body = resp.content
        response.response = resp
        return response

    @staticmethod
    def get_response_body(resp) -> str:
        return resp.content.decode("utf-8")

    @staticmethod
    def allow_retry(dic, retry_times, now=None) -> bool:
        if retry_times == 0:
            return True
        if dic is None or not dic.__contains__("maxAttempts") or \
                dic.get('retryable') is not True and retry_times >= 1:
            return False
        else:
            retry = 0 if dic.get("maxAttempts") is None else int(
                dic.get("maxAttempts"))
        return retry >= retry_times
    
    @staticmethod
    def should_retry(options: RetryOptions, ctx: RetryPolicyContext) -> bool:
        if ctx.retries_attempted == 0:
            return True

        if not options or not options.retryable:
            return False

        retries_attempted = ctx.retries_attempted
        ex = ctx.exception

        for condition in options.no_retry_condition:
            if getattr(ex, 'name', None) in condition.exception or getattr(ex, 'code', None) in condition.error_code:
                return False

        for condition in options.retry_condition:
            if getattr(ex, 'name', None) not in condition.exception and getattr(ex, 'code', None) not in condition.error_code:
                continue
            
            if retries_attempted >= condition.max_attempts:
                return False
            return True

        return False

    @staticmethod
    def get_backoff_time(options: RetryOptions, ctx: RetryPolicyContext) -> int:
        ex = ctx.exception
        conditions = options.retry_condition
        for condition in conditions:
            if getattr(ex, 'name', None) not in condition.exception and getattr(ex, 'code', None) not in condition.error_code:
                continue
            max_delay = condition.max_delay or MAX_DELAY_TIME
            retry_after = getattr(ctx.exception, "retry_after", None)
            if retry_after is not None:
                return min(retry_after, max_delay)
            if not condition.backoff:
                return MIN_DELAY_TIME
            return min(condition.backoff.get_delay_time(ctx), max_delay)
        return MIN_DELAY_TIME

    @staticmethod
    async def sleep_async(millisecond: int):
        await asyncio.sleep(millisecond / 1000)

    @staticmethod
    def sleep(millisecond: int):
        time.sleep(millisecond / 1000)

    @staticmethod
    def is_retryable(ex) -> bool:
        return isinstance(ex, RetryError)

    @staticmethod
    def bytes_readable(body):
        return body

    @staticmethod
    def merge(*dic_list) -> dict:
        dic_result = {}
        for item in dic_list:
            if isinstance(item, dict):
                dic_result.update(item)
            elif isinstance(item, DaraModel):
                dic_result.update(item.to_map())
        return dic_result

    @staticmethod
    def is_null(value) -> bool:
        return value is None
    
    @staticmethod
    def to_readable_stream(data):
        if isinstance(data, str):
            return io.StringIO(data)
        elif isinstance(data, bytes):
            return io.BytesIO(data)
        else:
            raise TypeError("Input data must be of type str or bytes")

    @staticmethod
    def to_map(model: Optional[DaraModel]) -> Dict[str, Any]:
        if isinstance(model, DaraModel):
            return model.to_map()
        else:
            return dict()

    @staticmethod
    def to_number(model) -> int:
        if isinstance(model, int):
            return model
        if isinstance(model, str):
            if model == "":
                return 0
            return int(model)
        if isinstance(model, float):
            return int(model)
        return 0

    @staticmethod
    def from_map(
            model: DaraModel,
            dic: Dict[str, Any]
    ) -> DaraModel:
        if isinstance(model, DaraModel):
            try:
                return model.from_map(dic)
            except Exception:
                model._map = dic
                return model
        else:
            return model

    @staticmethod
    def _get_session(session_key: str, protocol: str, tls_min_version: str = None, verify: bool = True):
        if session_key not in DaraCore._sessions:
            session = Session()
            adapter = DaraCore.get_adapter(protocol, tls_min_version)
            if protocol.upper() == 'HTTPS':
                if verify:
                    session.mount('https://', adapter)
                else:
                    session.mount('https://', DaraCore.https_adapter)
            else:
                session.mount('http://', adapter)
            DaraCore._sessions[session_key] = session
        return DaraCore._sessions[session_key]