def edge_fetcher()

in identity-resolution/notebooks/identity-graph/nepytune/benchmarks/drop_graph.py [0:0]


def edge_fetcher(g, q,start_offset,bracket_size):
    p1 = start_offset
    inc = min(bracket_size,MAX_FETCH_SIZE)
    p2 = start_offset + inc
    org = p1
    done = False
    nm = threading.currentThread().name
    print(nm,"[edges] Fetching from offset", start_offset, "with end at",start_offset+bracket_size)
    edge_fetch_wait_queue.get()

    done = False
    while not done:
        success = False
        while not success:
            print(nm,"[edges] retrieving range ({},{} batch=size={})".format(p1,p2,p2-p1))
            try:
                edges = g.E().range(p1,p2).id().toList()
                success = True
            except:
                print("***",nm,"Exception while fetching. Retrying.")
                time.sleep(1)

        slices = math.ceil(len(edges)/EDGE_BATCH_SIZE)
        s1 = 0
        s2 = 0
        for i in range(slices):
            s2 += min(len(edges)-s1,EDGE_BATCH_SIZE)
            q.put(["edges",edges[s1:s2]])
            s1 = s2
        p1 += inc
        if p1 >= org + bracket_size:
            done = True
        else:
            p2 += min(inc, org+bracket_size - p2)
    size = q.qsize()
    print(nm,"[edges] work done. Queue size ==>",size)
    edge_fetch_wait_queue.task_done()
    return