in asfpy/pubsub.py [0:0]
def listen_forever(func, url, auth=None, raw=False, since=-1, debug=False):
"""Listen on URL forever, calling FUNC for each payload.
### more docco about FUNC calling, AUTH, RAW, SINCE, DEBUG
"""
warnings.warn('use listen() instead', DeprecationWarning)
while True:
if debug:
message("[INFO] Subscribing to stream at %s\n", url, fp=sys.stdout)
connection = None
while not connection:
try:
headers = {
'User-Agent': 'python/asfpy'
}
if since != -1:
headers['X-Fetch-Since'] = str(since)
connection = requests.get(url, headers=headers, auth=auth, timeout=30, stream=True)
if debug:
message("[INFO] Subscribed, reading stream\n", fp=sys.stdout)
except requests.exceptions.RequestException:
message("[WARNING] Could not connect to pubsub service at %s,"
" retrying in 10s...\n", url, sleep=10)
continue
if not connection:
if debug:
message("[WARNING] %s did not respond with a streamable connection,"
" reconnecting in 10 seconds\n", url, sleep=10)
try:
body = ""
for chunk in connection.iter_content(chunk_size=None):
body += chunk.decode('utf-8', errors='ignore')
# pypubsub/gitpubsub payloads end in \n, svnpubsub payloads end in \0:
if body[-1] in ["\n", "\x00"]:
try:
payload = json.loads(body.rstrip("\r\n\x00"))
except ValueError as detail:
if debug:
message("[WARNING] Bad JSON or something: %s\n", detail)
# No payload. Circle back around for another.
payload = None
if not raw and isinstance(payload, dict):
payload = payload.get('payload')
if payload:
# Since we have a valid payload, we do not want to repeat it.
# Thus, set `since` to -1 now, so as to not have an x-fetch-since
# header on the next retry in case this connection fails at some point.
since = -1
func(payload)
body = ""
except requests.exceptions.RequestException:
if debug:
message("[WARNING] Disconnected from %s, reconnecting\n", url, sleep=2)
continue
if debug:
message("Connection to %s was closed, reconnecting in 10 seconds\n", url, sleep=10)