def listen_forever()

in asfpy/pubsub.py [0:0]


def listen_forever(func, url, auth=None, raw=False, since=-1, debug=False):
    """Listen on URL forever, calling FUNC for each payload.

    ### more docco about FUNC calling, AUTH, RAW, SINCE, DEBUG
    """
    warnings.warn('use listen() instead', DeprecationWarning)

    while True:
        if debug:
            message("[INFO] Subscribing to stream at %s\n", url, fp=sys.stdout)
        connection = None
        while not connection:
            try:
                headers = {
                    'User-Agent': 'python/asfpy'
                }
                if since != -1:
                    headers['X-Fetch-Since'] = str(since)
                connection = requests.get(url, headers=headers, auth=auth, timeout=30, stream=True)
                if debug:
                    message("[INFO] Subscribed, reading stream\n", fp=sys.stdout)
            except requests.exceptions.RequestException:
                message("[WARNING] Could not connect to pubsub service at %s,"
                        " retrying in 10s...\n", url, sleep=10)
                continue
            if not connection:
                if debug:
                    message("[WARNING] %s did not respond with a streamable connection,"
                            " reconnecting in 10 seconds\n", url, sleep=10)
        try:
            body = ""
            for chunk in connection.iter_content(chunk_size=None):
                body += chunk.decode('utf-8', errors='ignore')
                # pypubsub/gitpubsub payloads end in \n, svnpubsub payloads end in \0:
                if body[-1] in ["\n", "\x00"]:
                    try:
                        payload = json.loads(body.rstrip("\r\n\x00"))
                    except ValueError as detail:
                        if debug:
                            message("[WARNING] Bad JSON or something: %s\n", detail)
                        # No payload. Circle back around for another.
                        payload = None
                    if not raw and isinstance(payload, dict):
                        payload = payload.get('payload')
                    if payload:
                        # Since we have a valid payload, we do not want to repeat it.
                        # Thus, set `since` to -1 now, so as to not have an x-fetch-since
                        # header on the next retry in case this connection fails at some point.
                        since = -1
                        func(payload)
                    body = ""
        except requests.exceptions.RequestException:
            if debug:
                message("[WARNING] Disconnected from %s, reconnecting\n", url, sleep=2)
                continue
        if debug:
            message("Connection to %s was closed, reconnecting in 10 seconds\n", url, sleep=10)