assets/OfficialDemos/Others/Python/GitHubFunction.py (152 lines of code) (raw):
from pprint import pprint
import requests
import json
import os
import sys
import time
import datetime
import traceback
from azure.eventhub import EventHubClient, Sender, EventData, EventHubError
import logging
eventhubs = [
EventHubClient(
"sb://githubdemo12.servicebus.windows.net/githubevents",
username="<myusername>",
password="<mypassword>",
)
]
token = "<myGHtoken>"
ENDPOINT = "https://api.github.com/events?per_page=100"
def log(m):
print("{} | {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], m))
def debug(message):
# log("{} | {}".format("DEBUG", message))
pass
def info(message):
log("{} | {}".format("INFO", message))
def error(message, err):
log("{} | {} | {}".format("ERROR", message, err))
class Monitor:
interval = 60
def __init__(self, report_cb=None):
self.lut = time.time()
self.events_sent = 0
self.requests_issued = 0
self.report_cb = report_cb or print
def reset(self):
self.lut = time.time()
self.events_sent = 0
self.requests_issued = 0
def report(self):
now = time.time()
if now - self.lut >= 60:
self._report()
self.reset()
def _report(self):
msg = "Monitor: events sent : {} calls made :{}".format(self.events_sent, self.requests_issued)
self.report_cb(msg)
class SlidingCache:
def __init__(self, max_size=500):
self.prev = set()
self.current = set()
self.max_size = max_size
def add(self, item):
if item not in self:
if len(self.current) > self.max_size:
self.prev = self.current
self.current = set()
self.current.add(item)
def __contains__(self, item):
return item in self.current or item in self.prev
class BufferedEventHubSender:
# eventhub max size is 262144, just to be on the safe side, reduce this to 250000
def __init__(self, sender, flush_cb, serializer=json.dumps, item_seperator="\n", max_size=250000):
self.max_size = max_size
self.buffer = ""
self.item_count = 0
self.sender = sender
self.flush_cb = flush_cb
self.item_seperator = item_seperator
self.serializer = serializer
def push(self, item):
_item = "{}{}".format(self.serializer(item), self.item_seperator)
if len(_item.encode("utf-8")) > self.max_size:
raise EventHubError(
"Item {} is to big ({}) where as limit is {}. Ignoring.".format(
_item, len(_item.encode("utf-8")), self.max_size
)
)
if len((self.buffer + _item).encode("utf-8")) > self.max_size:
self.flush()
self.buffer += _item
self.item_count += 1
def flush(self):
try:
start = time.time()
self.sender.send(EventData(self.buffer))
self.flush_cb(time.time() - start, self.item_count)
except Exception as e:
raise EventHubError("lost the following {} records:\n {}".format(self.item_count, self.buffer))
self.buffer = ""
self.item_count = 0
def run():
info("STARTED github to eventhub")
headers = {"Authorization": "token {}".format(token)}
monitor = Monitor(info)
senders = []
for eh_client in eventhubs:
senders.append(
BufferedEventHubSender(
eh_client.add_sender(),
flush_cb=lambda took, count: debug(
"EVENTHUB REQUEST | took: {} sec, sent {} records to {}.".format(took, count, eh_client.eh_name)
),
)
)
failed = eh_client.run()
if failed:
raise EventHubError("Couldn't connect to EH {}".format(eh_client.eh_name))
seconds_per_request = round(
1.0 / (5000 / 60 / 60), 2
) # requests / minutes / seconds = requests per sec, ^-1=secs per request
cache = SlidingCache()
events = 0
loop = True
while loop:
loop_start_time = time.time()
monitor.report()
try:
resp = requests.get(ENDPOINT, headers=headers)
resp.raise_for_status()
monitor.requests_issued += 1
data = sorted(resp.json(), key=lambda x: x["id"])
payload = ""
debug("GITHUB REQUEST | took {} sec, got {} events.".format(resp.elapsed.total_seconds(), len(data)))
for d in data:
if d["id"] not in cache:
for buffered_sender in senders:
try:
buffered_sender.push(d)
except EventHubError as e:
error("EventHubError", e.message)
monitor.events_sent += 1
cache.add(d.get("id"))
cycle_took = time.time() - loop_start_time
delay = seconds_per_request - cycle_took
debug("CYCLE DONE | took {}, waiting for {}".format(cycle_took, max(delay, 0)))
if delay > 0:
time.sleep(delay)
except requests.HTTPError as e:
if resp.status_code in [429, 403]:
time_to_wait = int(
float(resp.headers.get("X-RateLimit-Reset", 60)) - datetime.datetime.utcnow().timestamp()
)
info("waiting for {}".format(time_to_wait))
if time_to_wait > 0:
time.sleep(time_to_wait)
error("HTTP EXCEPTION", repr(e))
except EventHubError as e:
error("Failed to send events to eventhub, skipping", repr(e))
except Exception as e:
error("UNEXPECTED ERROR", repr(e))
traceback.print_exc()
os.kill(os.getpid(), 9)
run()