import copy
import time
import base64
import re
from typing import Any, Optional, Dict, Iterable, List, Union, cast, Tuple, Iterator
from urllib.parse import urlparse, ParseResult, urlencode, quote
from xml.etree import ElementTree as ET
from . import retry
from . import transport
from . import exceptions
from . import utils
from . import defaults
from . import validation
from . import serde
from . import io_utils
from . import endpoints
from .signer import SignerV4, SignerV1
from .credentials import AnonymousCredentialsProvider
from .config import Config
from .types import (
    Retryer,
    CredentialsProvider,
    HttpClient,
    HttpRequest,
    HttpResponse,
    SigningContext,
    Signer,
    BodyType,
    OperationInput,
    OperationOutput,
)


class AddressStyle():
    """address style information
    """
    Virtual = 1
    Path = 2
    CName = 3


class _MarkedBody:
    def __init__(
        self,
        body: BodyType,
    ) -> None:
        self._body = body
        self._io_curr: int = 0
        self._is_fileobj = False
        if body is None:
            self._seekable = True
        elif isinstance(body, io_utils.TeeIterator):
            self._seekable = body.seekable()
        elif utils.is_fileobj(body):
            self._seekable = utils.is_seekable(body)
            self._is_fileobj = True
        elif isinstance(body, Iterator):
            self._seekable = False
        elif isinstance(body, (str, bytes, Iterable)):
            self._seekable = True
        else:
            self._seekable = False

    def is_seekable(self) -> bool:
        """'is seekable
        """
        return self._seekable

    def mark(self) -> None:
        """Set the current marked position in the stream.
        """
        if self.is_seekable() is False:
            return

        if self._is_fileobj:
            self._io_curr = self._body.tell()

    def reset(self) -> None:
        """Resets the buffer to the marked position.
        """
        if self.is_seekable() is False:
            return

        if isinstance(self._body, io_utils.TeeIterator):
            self._body.reset()

        if self._is_fileobj:
            self._body.seek(self._io_curr, 0)


class _Options:
    """client level's configuration."""

    def __init__(
        self,
        product: str,
        region: str,
        endpoint: Optional[ParseResult] = None,
        retry_max_attempts: Optional[int] = None,
        retryer: Optional[Retryer] = None,
        signer: Optional[Signer] = None,
        credentials_provider: Optional[CredentialsProvider] = None,
        http_client: Optional[Union[HttpClient]] = None,
        address_style: Optional[AddressStyle] = None,
        readwrite_timeout: Optional[Union[int, float]] = None,
        response_handlers: Optional[List] = None,
        response_stream: Optional[bool] = None,
        auth_method: Optional[str] = None,
        feature_flags: Optional[int] = None,
        additional_headers: Optional[List[str]] = None,
        operation_timeout: Optional[Union[int, float]] = None,
    ) -> None:
        self.product = product
        self.region = region
        self.endpoint = endpoint
        self.retry_max_attempts = retry_max_attempts
        self.retryer = retryer
        self.signer = signer
        self.credentials_provider = credentials_provider
        self.http_client = http_client
        self.address_style = address_style
        self.readwrite_timeout = readwrite_timeout
        self.response_handlers = response_handlers or []
        self.response_stream= response_stream
        self.auth_method = auth_method
        self.feature_flags = feature_flags or defaults.FF_DEFAULT
        self.additional_headers = additional_headers
        self.operation_timeout = operation_timeout


class _InnerOptions:
    """client runtime's information."""
    def __init__(
        self,
        user_agent: str = None,
    ) -> None:
        self.user_agent = user_agent



class _ClientImplMixIn:
    """Client implement"""

    def resolve_config(self, config: Config) ->Tuple[_Options, _InnerOptions]:
        """convert config into client's options"""

        options = _default_options(config)

        _resolve_endpoint(config, options)
        _resolve_retryer(config, options)
        _resolve_signer(config, options)
        _resolve_address_style(config, options)
        _resolve_feature_flags(config, options)
        _resolve_cloud_box(config, options)
        self._resolve_httpclient(config, options) # pylint: disable=no-member

        inner = _InnerOptions()
        #UserAgent
        inner.user_agent = _build_user_agent(config)

        return options, inner

    def resolve_kwargs(self, options: _Options, **kwargs):
        """client's configuration from user by key/value args"""

        if len(kwargs) == 0:
            return

        options.product = kwargs.get("product", options.product)
        options.region = kwargs.get("region", options.region)
        options.endpoint = kwargs.get("endpoint", options.endpoint)
        options.retry_max_attempts = kwargs.get("retry_max_attempts", options.retry_max_attempts)
        options.retryer = kwargs.get("retryer", options.retryer)
        options.signer = kwargs.get("signer", options.signer)
        options.credentials_provider = kwargs.get("credentials_provider", options.credentials_provider)
        options.http_client = kwargs.get("http_client", options.http_client)
        options.address_style = kwargs.get("address_style", options.address_style)
        options.readwrite_timeout = kwargs.get("readwrite_timeout", options.readwrite_timeout)
        options.auth_method = kwargs.get("auth_method", None)
        options.additional_headers = kwargs.get("additional_headers", options.additional_headers)


    def resolve_operation_kwargs(self, options: _Options, **kwargs):
        """operation's configuration from user by key/value args"""

        if len(kwargs) == 0:
            return

        options.retry_max_attempts = kwargs.get("retry_max_attempts", options.retry_max_attempts)
        options.retryer = kwargs.get("retryer", options.retryer)
        options.http_client = kwargs.get("http_client", options.http_client)
        options.readwrite_timeout = kwargs.get("readwrite_timeout", options.readwrite_timeout)
        options.auth_method = kwargs.get("auth_method", options.auth_method)
        options.operation_timeout = kwargs.get("operation_timeout", None)

    def verify_operation(self, op_input: OperationInput, options: _Options) -> None:
        """verify input and options"""

        if not options.endpoint:
            raise exceptions.ParamInvalidError(field="endpoint")

        if (op_input.bucket is not None and
                not validation.is_valid_bucket_name(op_input.bucket)):
            raise exceptions.BucketNameInvalidError(
                name=utils.safety_str(op_input.bucket))

        if (op_input.key is not None and
                not validation.is_valid_object_name(op_input.key)):
            raise exceptions.ObjectNameInvalidError()

    def apply_operation(self, options: _Options, op_input: OperationInput) -> None:
        """apply operation"""
        self._apply_operation_options(options) # pylint: disable=no-member
        _apply_operation_metadata(op_input, options)


    def build_request_context(self, op_input: OperationInput, options: _Options, inner: _InnerOptions
                              ) -> SigningContext:
        """build request context
        """
        # host & path
        url = _build_url(op_input, options)

        # queries
        if op_input.parameters is not None:
            query = urlencode(op_input.parameters, quote_via=quote)
            if len(query) > 0:
                url = url + "?" + query

        # build http request
        request = HttpRequest(method=op_input.method, url=url)

        # headers
        request.headers.update(op_input.headers or {})

        request.headers.update({'User-Agent': inner.user_agent})

        # body
        body = op_input.body or b''

        # body tracker
        if op_input.op_metadata is not None:
            tracker = op_input.op_metadata.get("opm-request-body-tracker", None)
            if tracker is not None:
                writers = []
                for t in tracker:
                    if hasattr(t, 'write'):
                        writers.append(t)
                if len(writers) > 0:
                    body = io_utils.TeeIterator.from_source(body, writers)

        request.body = body

        # signing context
        context = SigningContext(
            product=options.product,
            region=options.region,
            bucket=op_input.bucket,
            key=op_input.key,
            request=request,
        )

        if utils.safety_str(options.auth_method) == 'query':
            context.auth_method_query = True

        oss_date = request.headers.get('x-oss-date', None)
        if oss_date is not None:
            context.signing_time = serde.deserialize_httptime(oss_date)
        if (expiration_time := op_input.op_metadata.get('expiration_time', None)) is not None:
            context.expiration_time = expiration_time

        context.sub_resource = op_input.op_metadata.get("sub-resource", [])

        return context

    def retry_max_attempts(self, options: _Options) -> int:
        """retry max attempts"""
        if options.retry_max_attempts is not None:
            attempts = int(options.retry_max_attempts)
        elif options.retryer is not None:
            attempts = options.retryer.max_attempts()
        else:
            attempts = defaults.DEFAULT_MAX_ATTEMPTS

        return max(1, attempts)

    def has_feature(self, flag: int) -> bool:
        """has feature"""
        return (self._options.feature_flags & flag) > 0 # pylint: disable=no-member

    def get_retry_attempts(self) -> bool:
        """get retry attempts"""
        return self.retry_max_attempts(self._options) # pylint: disable=no-member

class _SyncClientImpl(_ClientImplMixIn):
    """Sync API Client for common API."""

    def __init__(self, config: Config, **kwargs) -> None:
        options, inner = self.resolve_config(config)
        self.resolve_kwargs(options, **kwargs)

        self._config = config
        self._options = options
        self._inner = inner

    def invoke_operation(self, op_input: OperationInput, **kwargs) -> OperationOutput:
        """Common class interface invoice operation

        Args:
            op_input (OperationInput): _description_

        Raises:
            exceptions.OperationError: _description_

        Returns:
            OperationOutput: _description_
        """

        options = copy.copy(self._options)
        self.resolve_operation_kwargs(options, **kwargs)
        self.apply_operation(options, op_input)

        try:
            self.verify_operation(op_input, options)
            output = self._sent_request(op_input, options)
        except Exception as err:
            raise exceptions.OperationError(
                name=op_input.op_name,
                error=err,
            )

        return output

    def _resolve_httpclient(self, config: Config, options: _Options) -> None:
        """httpclient"""
        if options.http_client:
            return

        kwargs: Dict[str, Any] = {}

        if bool(config.insecure_skip_verify):
            kwargs["insecure_skip_verify"] = True

        if bool(config.enabled_redirect):
            kwargs["enabled_redirect"] = True

        if config.connect_timeout:
            kwargs["connect_timeout"] = config.connect_timeout

        if config.readwrite_timeout:
            kwargs["readwrite_timeout"] = config.readwrite_timeout

        if config.proxy_host:
            kwargs["proxy_host"] = config.proxy_host

        options.http_client = transport.RequestsHttpClient(**kwargs)


    def _apply_operation_options(self, options: _Options) -> None:
        # response handler
        handlers = []

        def service_error_response_handler(response: HttpResponse) -> None:
            """ check service error """
            if response.status_code // 100 == 2:
                return

            if not response.is_stream_consumed:
                _ = response.read()

            raise _to_service_error(response)

        # insert service error responsed handler first
        handlers.append(service_error_response_handler)

        handlers.extend(options.response_handlers)

        options.response_handlers = handlers

    def _sent_request(self, op_input: OperationInput, options: _Options) -> OperationOutput:
        context = self.build_request_context(op_input, options, self._inner)
        response = self._sent_http_request(context, options)
        output = OperationOutput(
            status=response.reason,
            status_code=response.status_code,
            headers=response.headers,
            op_input=op_input,
            http_response=response
        )

        # save other info by Metadata filed
        # output.op_metadata
        if context.auth_method_query:
            output.op_metadata['expiration_time'] = context.expiration_time

        # update clock offset

        return output

    def _sent_http_request(self, context: SigningContext, options: _Options) -> HttpResponse:
        request = context.request
        retryer = options.retryer
        max_attempts = self.retry_max_attempts(options)

        # operation timeout
        dealline = None
        if isinstance(options.operation_timeout, (int, float)):
            dealline = time.time() +  options.operation_timeout

        # Mark body
        marked_body = _MarkedBody(request.body)
        marked_body.mark()

        reset_time = context.signing_time is None
        error: Optional[Exception] = None
        response: HttpResponse = None
        for tries in range(max_attempts):
            if tries > 0:
                try:
                    marked_body.reset()
                except:  # pylint: disable=bare-except
                    # if meets reset error, just ignores, and retures last error
                    break

                if reset_time:
                    context.signing_time = None

                dealy = retryer.retry_delay(tries, error)
                time.sleep(dealy)

                # operation timeout
                if dealline is not None and (time.time() > dealline):
                    break

            try:
                error = None
                response = self._sent_http_request_once(context, options)
                break
            except Exception as e:
                error = e

            # operation timeout
            if dealline is not None and (time.time() > dealline):
                break

            if marked_body.is_seekable() is False:
                break

            if not retryer.is_error_retryable(error):
                break

        if error is not None:
            raise error

        return response

    def _sent_http_request_once(self, context: SigningContext, options: _Options) -> HttpResponse:
        # sign request
        if not isinstance(options.credentials_provider, AnonymousCredentialsProvider):
            try:
                cred = options.credentials_provider.get_credentials()
            except Exception as e:
                raise exceptions.CredentialsFetchError(error=e)

            if cred is None or not cred.has_keys():
                raise exceptions.CredentialsEmptyError()

            # update credentials
            context.credentials = cred

            options.signer.sign(context)

        # send
        send_kwargs = {}
        if options.response_stream is not None:
            send_kwargs['stream'] = options.response_stream
        if options.readwrite_timeout is not None:
            send_kwargs['readwrite_timeout'] = options.readwrite_timeout

        response = options.http_client.send(context.request, **send_kwargs)

        # response handler
        for h in options.response_handlers:
            h(response)

        return response

def _default_options(config: Config) -> _Options:
    """convert config to options"""
    return _Options(
        product=defaults.DEFAULT_PRODUCT,
        region=config.region,
        retry_max_attempts=config.retry_max_attempts,
        retryer=cast(Retryer, config.retryer),
        credentials_provider=cast(
            CredentialsProvider, config.credentials_provider),
        http_client=cast(HttpClient, config.http_client),
        additional_headers=config.additional_headers
    )


def _resolve_endpoint(config: Config, options: _Options) -> None:
    """endpoint"""
    disable_ssl = utils.safety_bool(config.disable_ssl)
    endpoint = utils.safety_str(config.endpoint)
    region = utils.safety_str(config.region)
    if len(endpoint) > 0:
        endpoint = endpoints.add_scheme(endpoint, disable_ssl)
    elif validation.is_valid_region(region):
        if bool(config.use_dualstack_endpoint):
            etype = "dualstack"
        elif bool(config.use_internal_endpoint):
            etype = "internal"
        elif bool(config.use_accelerate_endpoint):
            etype = "accelerate"
        else:
            etype = "default"

        endpoint = endpoints.from_region(region, disable_ssl, etype)

    if endpoint == "":
        return

    options.endpoint = urlparse(endpoint)


def _resolve_retryer(_: Config, options: _Options) -> None:
    """retryer"""
    if options.retryer:
        return

    options.retryer = retry.StandardRetryer()


def _resolve_signer(config: Config, options: _Options) -> None:
    """signer"""
    if options.signer:
        return

    if utils.safety_str(config.signature_version) == "v1":
        options.signer = SignerV1()
    else:
        options.signer = SignerV4()


def _resolve_address_style(config: Config, options: _Options) -> None:
    """address_style"""
    if bool(config.use_cname):
        style = AddressStyle.CName
    elif bool(config.use_path_style):
        style = AddressStyle.Path
    else:
        style = AddressStyle.Virtual

    # if the endpoint is ip, set to path-style
    if options.endpoint:
        hostname = options.endpoint.hostname
        if endpoints.is_ip(hostname):
            style = AddressStyle.Path

    options.address_style = style


def _resolve_feature_flags(config: Config, options: _Options) -> None:
    """flags for feature"""
    if utils.safety_bool(config.disable_upload_crc64_check):
        options.feature_flags = options.feature_flags & ~defaults.FF_ENABLE_CRC64_CHECK_UPLOAD

    if utils.safety_bool(config.disable_download_crc64_check):
        options.feature_flags = options.feature_flags & ~defaults.FF_ENABLE_CRC64_CHECK_DOWNLOAD


def _resolve_cloud_box(config: Config, options: _Options) -> None:
    """cloud box"""
    if config.cloud_box_id is not None:
        options.region = str(config.cloud_box_id)
        options.product = defaults.CLOUD_BOX_PRODUCT
        return

    if not config.enable_auto_detect_cloud_box_id:
        return

    host = options.endpoint.hostname
    if not (host.endswith(".oss-cloudbox.aliyuncs.com") or
            host.endswith(".oss-cloudbox-control.aliyuncs.com")):
        return

    keys = host.split(".")
    if len(keys) != 5 or not keys[0].startswith("cb-"):
        return
    options.region = keys[0]
    options.product = defaults.CLOUD_BOX_PRODUCT


def _apply_operation_metadata(op_input: OperationInput, options: _Options) -> None:
    handlers = op_input.op_metadata.get('opm-response-handler', None)
    if handlers is not None:
        options.response_handlers.extend(handlers)

    stream = op_input.op_metadata.get('response-stream', None)
    if stream is not None:
        options.response_stream = stream


def _build_url(op_input: OperationInput, options: _Options) -> str:
    host = ""
    paths = []
    if op_input.bucket is None:
        host = options.endpoint.netloc
    else:
        if options.address_style == AddressStyle.Path:
            host = options.endpoint.netloc
            paths.append(op_input.bucket)
            if op_input.key is None:
                paths.append('')
        elif options.address_style == AddressStyle.CName:
            host = options.endpoint.netloc
        else:
            host = f'{op_input.bucket}.{options.endpoint.netloc}'

    if op_input.key is not None:
        paths.append(quote(op_input.key))

    return f'{options.endpoint.scheme}://{host}/{"/".join(paths)}'


def _to_service_error(response: HttpResponse) -> exceptions.ServiceError:
    timestamp = serde.deserialize_httptime(response.headers.get('Date'))
    content = response.content or b''
    response.close()

    error_fileds = {}
    code = 'BadErrorResponse'
    message = ''
    ec = ''
    request_id = ''
    err_body = b''
    try:
        err_body = content
        if len(err_body) == 0:
            err_body = base64.b64decode(
                response.headers.get('x-oss-err', ''))
        root = ET.fromstring(err_body)
        if root.tag == 'Error':
            for child in root:
                error_fileds[child.tag] = child.text
            message = error_fileds.get('Message', '')
            code = error_fileds.get('Code', '')
            ec = error_fileds.get('EC', '')
            request_id = error_fileds.get('RequestId', '')
        else:
            message = f'Expect root node Error, but get {root.tag}.'
    except ET.ParseError as e:
        errstr = err_body.decode()
        if '<Error>' in errstr and '</Error>' in errstr:
            m = re.search('<Code>(.*)</Code>', errstr)
            if m:
                code = m.group(1)
            m = re.search('<Message>(.*)</Message>', errstr)
            if m:
                message = m.group(1)
        if len(message) == 0:
            message = f'Failed to parse xml from response body due to: {str(e)}. With part response body {err_body[:256]}.'
    except Exception as e:
        message = f'The body of the response was not readable, due to : {str(e)}.'

    return exceptions.ServiceError(
        status_code=response.status_code,
        code=code,
        message=message,
        request_id=request_id or response.headers.get('x-oss-request-id', ''),
        ec=ec or response.headers.get('x-oss-ec', ''),
        timestamp=timestamp,
        request_target=f'{response.request.method} {response.request.url}',
        snapshot=content,
        headers=response.headers,
        error_fileds=error_fileds
    )

def _build_user_agent(config: Config) -> str:
    if config.user_agent:
        return f'{utils.get_default_user_agent()}/{config.user_agent}'

    return utils.get_default_user_agent()
