in identity-resolution/notebooks/identity-graph/nepytune/benchmarks/drop_graph.py [0:0]
def drop(g):
####################################################################################
# Do the work!
#
####################################################################################
# Fetch the edges
equeue_start_time = time.time()
ecount = fetch_edges(g, pending_work)
edge_fetch_wait_queue.join()
equeue_end_time = time.time()
# Create the pool of workers that will drop the edges and vertices
print("Creating drop() workers")
workers = [None] * NUM_THREADS
ready_flag.set()
edrop_start_time = time.time()
for i in range(NUM_THREADS):
workers[i] = Thread(target=worker, args=(g, pending_work,))
workers[i].setDaemon(True)
workers[i].start()
# Wait until all of the edges in the queue have been dropped
pending_work.join()
edrop_end_time = time.time()
# Tell the workers to wait until the vertex IDs have all been enqueued
ready_flag.clear()
# Fetch the vertex IDs
vqueue_start_time = time.time()
vcount = fetch_vertices(g, pending_work)
vertex_fetch_wait_queue.join()
vqueue_end_time = time.time()
# Tell the workers to start dropping the vertices
vdrop_start_time = time.time()
ready_flag.set()
pending_work.join()
vdrop_end_time = time.time()
# Calculate how long each phase took
eqtime = equeue_end_time - equeue_start_time
vqtime = vqueue_end_time - vqueue_start_time
etime = edrop_end_time - edrop_start_time
vtime = vdrop_end_time - vdrop_start_time
print("Summary")
print("-------")
print("Worker threads", NUM_THREADS)
print("Max fetch size", MAX_FETCH_SIZE)
print("Edge batch size", EDGE_BATCH_SIZE)
print("Vertex batch size", VERTEX_BATCH_SIZE)
print("Edges dropped", ecount)
print("Vertices dropped", vcount)
print("Time taken to queue edges", eqtime)
print("Time taken to drop edges", etime)
print("Time taken to queue vertices", vqtime)
print("Time taken to drop vertices", vtime)
print("TOTAL TIME",eqtime + vqtime + etime + vtime)