in SketchExperiment.py [0:0]
def execute(self, write_mode="w"):
assert(self.options.oracle_mode == self.options.OracleMode.PRECOMPUTE)
if self.options.nparallel > 1:
executor = ProcessPoolExecutor(max_workers=self.options.nparallel)
for run in range(self.options.ndatasets):
logging.info(f"Starting run {run}")
workload_seed = self.start_seed + run
sketch_seed_start = workload_seed * self.MAX_SKETCHES
task = Task(workload_seed, sketch_seed_start, experiment=self)
futures = deque()
job_sizes = []
for job in task.makeJobs():
if self.options.nparallel > 1:
pickled_job = dill.dumps(job)
job_sizes.append(len(pickled_job))
futures.append(executor.submit(executePickledJob, pickled_job))
else:
job_result = executeJob(job)
self.results.merge(job_result)
job_sizes = np.array(job_sizes)
print(f"pickled job max size: {max(job_sizes)} avg size: {np.mean(job_sizes)}")
if self.options.nparallel > 1:
# block when there are enough parallel jobs running
while futures and len(futures) >= self.options.nparallel:
f = futures.popleft()
job_result = f.result()
self.results.merge(job_result)
# cleanup last jobs
while futures:
f = futures.popleft()
job_result = f.result()
self.results.merge(job_result)
print(f"writing output {self.result_file} mode: {write_mode}")
self.results.toDataFrame().to_csv(self.result_file, mode=write_mode, header=(write_mode == 'w'))
timing_df = self.results.timingsToDataFrame()
agg_timing_df = timing_df.groupby(['sketch_name']).agg(runtime=('time', np.mean))
print(agg_timing_df.to_string())