in scripts/util.py [0:0]
def batch_execute(requests, retry_cb=None, log_err=log.error):
"""execute list or dict<req_id, request> as batch requests
retry if retry_cb returns true
"""
compute = globals()["compute"]
BATCH_LIMIT = 1000
if not isinstance(requests, dict):
requests = {str(k): v for k, v in enumerate(requests)} # rid generated here
done = {}
failed = {}
timestamps = []
rate_limited = False
def batch_callback(rid, resp, exc):
nonlocal rate_limited
if exc is not None:
log_err(f"compute request exception {rid}: {exc}")
if retry_exception(exc):
rate_limited = True
else:
req = requests.pop(rid)
failed[rid] = (req, exc)
else:
# if retry_cb is set, don't move to done until it returns false
if retry_cb is None or not retry_cb(resp):
requests.pop(rid)
done[rid] = resp
def batch_request(reqs):
batch = compute.new_batch_http_request(callback=batch_callback)
for rid, req in reqs:
batch.add(req, request_id=rid)
return batch
while requests:
if timestamps:
timestamps = [stamp for stamp in timestamps if stamp > time()]
if rate_limited and timestamps:
stamp = next(iter(timestamps))
sleep(max(stamp - time(), 0))
rate_limited = False
# up to API_REQ_LIMIT (2000) requests
# in chunks of up to BATCH_LIMIT (1000)
batches = [
batch_request(chunk)
for chunk in chunked(islice(requests.items(), API_REQ_LIMIT), BATCH_LIMIT)
]
timestamps.append(time() + 100)
with ThreadPoolExecutor() as exe:
futures = []
for batch in batches:
future = exe.submit(ensure_execute, batch)
futures.append(future)
for future in futures:
result = future.exception()
if result is not None:
raise result
return done, failed