in smallpond/execution/task.py [0:0]
def run(self) -> bool:
try:
conn = None
self.add_elapsed_time()
if self.skip_when_any_input_empty:
return True
if self.use_duckdb_reader:
conn = duckdb.connect(database=":memory:", config={"allow_unsigned_extensions": "true"})
self.prepare_connection(conn)
input_tables = [dataset.to_arrow_table(max_workers=self.cpu_limit, conn=conn) for dataset in self.input_datasets]
self.perf_metrics["num input rows"] += sum(table.num_rows for table in input_tables)
self.add_elapsed_time("input load time (secs)")
if conn is not None:
conn.close()
output_table = self._call_process(self.ctx.set_current_task(self), input_tables)
self.add_elapsed_time("compute time (secs)")
return self.dump_output(output_table)
except arrow.lib.ArrowMemoryError as ex:
raise OutOfMemory(f"{self.key} failed with OOM error") from ex
finally:
if conn is not None:
conn.close()