#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" PyPubSub listener class. """
#
# TYPICAL USAGE:
#
#   async for payload in listen(PUBSUB_URL):
#
# This will produce a series of payloads, forever.
#
# NOTE: this listener is intended for pypubsub, which terminates
#   payloads with a newline. The old svnpubsub used NUL characters,
#   so this client will not work with that server.
#

import requests
import requests.exceptions
import json
import time
import sys
import asyncio
import logging
import warnings

import aiohttp


LOGGER = logging.getLogger(__name__)

# The server sends keepalives every 5 seconds, so we should see
# activity well within this timeout period.
DEFAULT_INACTIVITY_TIMEOUT = 11
### for debug:
#DEFAULT_INACTIVITY_TIMEOUT = 4.5

# Default read buffer size. Max payload size in pypubsub is 256kb (plus metadata and JSON overhead)
DEFAULT_READ_BUFFER_SIZE = 300 * 1024


async def listen(pubsub_url, username=None, password=None, timeout=None, buffersize=None):

    if username:
        auth = aiohttp.BasicAuth(username, password)
    else:
        auth = None

    if timeout is None:
        timeout = DEFAULT_INACTIVITY_TIMEOUT
    ct = aiohttp.ClientTimeout(sock_read=timeout)
    
    if buffersize is None:
        buffersize = DEFAULT_READ_BUFFER_SIZE

    async with aiohttp.ClientSession(auth=auth, timeout=ct, read_bufsize=buffersize) as session:

        # Retry immediately, and then back it off.
        delay = 0.0

        ### tbd: look at event loop, to see if it has been halted
        while True:
            LOGGER.debug('Opening new connection...')
            try:
                async for payload in _process_connection(session, pubsub_url):
                    if not payload:
                        pass  ### tbd?: event loop killed or hit EOF

                    # We got a payload, so reset the DELAY.
                    delay = 0.0

                    yield payload

            except (ConnectionRefusedError,
                    aiohttp.ClientConnectorError,
                    aiohttp.ServerTimeoutError,
                    aiohttp.ClientPayloadError,
                    ) as e:
                LOGGER.error(f'Connection failed ({type(e).__name__}: {e})'
                             f', reconnecting in {delay} seconds')
                await asyncio.sleep(delay)

                # Back off on the delay. Step it up from 0s, doubling each
                # time, and top out at 30s retry. Steps: 0, 2, 6, 14, 30.
                delay = min(30.0, (delay + 1.0) * 2)


async def _process_connection(session, pubsub_url):
    # Connect to pubsub and listen for payloads.
    async with session.get(pubsub_url) as conn:

        #print('LIMITS:', conn.content.get_read_buffer_limits())

        while True:

            # The pubsub server defines stream payloads as:
            #    ENCODED_JSON(payload)+"\n"
            #
            # Due to the encoding, bare newlines will not occur
            # within the encoded part. Thus, we can read content
            # until we find a newline.
            #
            # Note: this newline is in RAW, but the json loader
            # ignores it.
            try:
                raw = await conn.content.readuntil(b'\n')
            except ValueError as e:
                LOGGER.error(f'Saw "{e}"; re-raising as ClientPayloadError to close/reconnect')
                raise aiohttp.ClientPayloadError('re-raised from ValueError in readuntil()')

            if not raw:
                # We just hit EOF.
                yield None

            yield json.loads(raw)


def test_listening():
    logging.basicConfig(level=logging.DEBUG)

    start = time.time()
    count = 0

    # We'll say "now" is when we believe the connection to be alive.
    last_traffic = time.time()

    async def report_stats():
        while True:
            # NOTE: do not set this lower than 60, or at startup,
            # DURATION will be zero, creating a div-by-zero error.
            await asyncio.sleep(70)

            duration = int((time.time() - start) / 60)
            alive_since = int(time.time() - last_traffic)
            print(f'[{duration}m] {count} events.  {count/duration:.1f}/min'
                  f'  traffic: {alive_since}s ago')

    async def print_events():
        async for payload in listen('https://pubsub.apache.org:2070/'):
            nonlocal last_traffic
            last_traffic = time.time()

            if 'stillalive' not in payload:
                print(f'PAYLOAD: [{payload.get("pubsub_path", "none")}]'
                      f'  KEYS: {sorted(payload.keys())}')
                nonlocal count
                count += 1

    async def run_test():
        await asyncio.gather(report_stats(), print_events())

    asyncio.run(run_test())


class Listener:
    """ Generic listener for pubsubs. Grabs each payload and runs process() on them. """

    def __init__(self, url):
        warnings.warn('use listen() instead', DeprecationWarning)
        self.url = url
        self.connection = None

    def attach(self, func, **kwargs):
        raw = kwargs.get('raw', False)
        debug = kwargs.get('debug', False)
        since = kwargs.get('since', -1)
        auth = kwargs.get('auth', None)
        listen_forever(func, self.url, auth, raw, since, debug)


def listen_forever(func, url, auth=None, raw=False, since=-1, debug=False):
    """Listen on URL forever, calling FUNC for each payload.

    ### more docco about FUNC calling, AUTH, RAW, SINCE, DEBUG
    """
    warnings.warn('use listen() instead', DeprecationWarning)

    while True:
        if debug:
            message("[INFO] Subscribing to stream at %s\n", url, fp=sys.stdout)
        connection = None
        while not connection:
            try:
                headers = {
                    'User-Agent': 'python/asfpy'
                }
                if since != -1:
                    headers['X-Fetch-Since'] = str(since)
                connection = requests.get(url, headers=headers, auth=auth, timeout=30, stream=True)
                if debug:
                    message("[INFO] Subscribed, reading stream\n", fp=sys.stdout)
            except requests.exceptions.RequestException:
                message("[WARNING] Could not connect to pubsub service at %s,"
                        " retrying in 10s...\n", url, sleep=10)
                continue
            if not connection:
                if debug:
                    message("[WARNING] %s did not respond with a streamable connection,"
                            " reconnecting in 10 seconds\n", url, sleep=10)
        try:
            body = ""
            for chunk in connection.iter_content(chunk_size=None):
                body += chunk.decode('utf-8', errors='ignore')
                # pypubsub/gitpubsub payloads end in \n, svnpubsub payloads end in \0:
                if body[-1] in ["\n", "\x00"]:
                    try:
                        payload = json.loads(body.rstrip("\r\n\x00"))
                    except ValueError as detail:
                        if debug:
                            message("[WARNING] Bad JSON or something: %s\n", detail)
                        # No payload. Circle back around for another.
                        payload = None
                    if not raw and isinstance(payload, dict):
                        payload = payload.get('payload')
                    if payload:
                        # Since we have a valid payload, we do not want to repeat it.
                        # Thus, set `since` to -1 now, so as to not have an x-fetch-since
                        # header on the next retry in case this connection fails at some point.
                        since = -1
                        func(payload)
                    body = ""
        except requests.exceptions.RequestException:
            if debug:
                message("[WARNING] Disconnected from %s, reconnecting\n", url, sleep=2)
                continue
        if debug:
            message("Connection to %s was closed, reconnecting in 10 seconds\n", url, sleep=10)


def message(fmt, *args, sleep=None, fp=sys.stderr):
    fp.write(fmt % args)
    fp.flush()
    if sleep:
        time.sleep(sleep)


if __name__ == '__main__':
    ### pass PUBSUB_URL ?
    test_listening()
