in assets/OfficialDemos/Others/Python/GitHubFunction.py [0:0]
def run():
info("STARTED github to eventhub")
headers = {"Authorization": "token {}".format(token)}
monitor = Monitor(info)
senders = []
for eh_client in eventhubs:
senders.append(
BufferedEventHubSender(
eh_client.add_sender(),
flush_cb=lambda took, count: debug(
"EVENTHUB REQUEST | took: {} sec, sent {} records to {}.".format(took, count, eh_client.eh_name)
),
)
)
failed = eh_client.run()
if failed:
raise EventHubError("Couldn't connect to EH {}".format(eh_client.eh_name))
seconds_per_request = round(
1.0 / (5000 / 60 / 60), 2
) # requests / minutes / seconds = requests per sec, ^-1=secs per request
cache = SlidingCache()
events = 0
loop = True
while loop:
loop_start_time = time.time()
monitor.report()
try:
resp = requests.get(ENDPOINT, headers=headers)
resp.raise_for_status()
monitor.requests_issued += 1
data = sorted(resp.json(), key=lambda x: x["id"])
payload = ""
debug("GITHUB REQUEST | took {} sec, got {} events.".format(resp.elapsed.total_seconds(), len(data)))
for d in data:
if d["id"] not in cache:
for buffered_sender in senders:
try:
buffered_sender.push(d)
except EventHubError as e:
error("EventHubError", e.message)
monitor.events_sent += 1
cache.add(d.get("id"))
cycle_took = time.time() - loop_start_time
delay = seconds_per_request - cycle_took
debug("CYCLE DONE | took {}, waiting for {}".format(cycle_took, max(delay, 0)))
if delay > 0:
time.sleep(delay)
except requests.HTTPError as e:
if resp.status_code in [429, 403]:
time_to_wait = int(
float(resp.headers.get("X-RateLimit-Reset", 60)) - datetime.datetime.utcnow().timestamp()
)
info("waiting for {}".format(time_to_wait))
if time_to_wait > 0:
time.sleep(time_to_wait)
error("HTTP EXCEPTION", repr(e))
except EventHubError as e:
error("Failed to send events to eventhub, skipping", repr(e))
except Exception as e:
error("UNEXPECTED ERROR", repr(e))
traceback.print_exc()
os.kill(os.getpid(), 9)