in evaluation/python_executor.py [0:0]
def batch_apply(self, batch_code):
all_code_snippets = self.process_generation_to_code(batch_code)
timeout_cnt = 0
all_exec_results = []
# with ProcessPool(max_workers=min(len(all_code_snippets), os.cpu_count())) as pool:
with ProcessPool(max_workers=min(len(all_code_snippets), 1)) as pool:
executor = partial(
self.execute,
get_answer_from_stdout=self.get_answer_from_stdout,
runtime=self.runtime,
answer_symbol=self.answer_symbol,
answer_expr=self.answer_expr,
timeout_length=self.timeout_length, # this timeout not work
auto_mode=True
)
future = pool.map(executor, all_code_snippets, timeout=self.timeout_length)
iterator = future.result()
if len(all_code_snippets) > 100:
progress_bar = tqdm(total=len(all_code_snippets), desc="Execute")
else:
progress_bar = None
while True:
try:
result = next(iterator)
all_exec_results.append(result)
except StopIteration:
break
except TimeoutError as error:
print(error)
all_exec_results.append(("", "Timeout Error"))
timeout_cnt += 1
except Exception as error:
print(error)
exit()
if progress_bar is not None:
progress_bar.update(1)
if progress_bar is not None:
progress_bar.close()
batch_results = []
for code, (res, report) in zip(all_code_snippets, all_exec_results):
# post processing
res, report = str(res).strip(), str(report).strip()
res, report = self.truncate(res), self.truncate(report)
batch_results.append((res, report))
return batch_results