in identity-resolution/notebooks/identity-graph/nepytune/benchmarks/drop_graph.py [0:0]
def edge_fetcher(g, q,start_offset,bracket_size):
p1 = start_offset
inc = min(bracket_size,MAX_FETCH_SIZE)
p2 = start_offset + inc
org = p1
done = False
nm = threading.currentThread().name
print(nm,"[edges] Fetching from offset", start_offset, "with end at",start_offset+bracket_size)
edge_fetch_wait_queue.get()
done = False
while not done:
success = False
while not success:
print(nm,"[edges] retrieving range ({},{} batch=size={})".format(p1,p2,p2-p1))
try:
edges = g.E().range(p1,p2).id().toList()
success = True
except:
print("***",nm,"Exception while fetching. Retrying.")
time.sleep(1)
slices = math.ceil(len(edges)/EDGE_BATCH_SIZE)
s1 = 0
s2 = 0
for i in range(slices):
s2 += min(len(edges)-s1,EDGE_BATCH_SIZE)
q.put(["edges",edges[s1:s2]])
s1 = s2
p1 += inc
if p1 >= org + bracket_size:
done = True
else:
p2 += min(inc, org+bracket_size - p2)
size = q.qsize()
print(nm,"[edges] work done. Queue size ==>",size)
edge_fetch_wait_queue.task_done()
return