def run()

in smallpond/execution/task.py [0:0]


    def run(self) -> bool:
        try:
            conn = None
            self.add_elapsed_time()
            if self.skip_when_any_input_empty:
                return True

            if self.use_duckdb_reader:
                conn = duckdb.connect(database=":memory:", config={"allow_unsigned_extensions": "true"})
                self.prepare_connection(conn)

            input_tables = [dataset.to_arrow_table(max_workers=self.cpu_limit, conn=conn) for dataset in self.input_datasets]
            self.perf_metrics["num input rows"] += sum(table.num_rows for table in input_tables)
            self.add_elapsed_time("input load time (secs)")
            if conn is not None:
                conn.close()

            output_table = self._call_process(self.ctx.set_current_task(self), input_tables)
            self.add_elapsed_time("compute time (secs)")

            return self.dump_output(output_table)
        except arrow.lib.ArrowMemoryError as ex:
            raise OutOfMemory(f"{self.key} failed with OOM error") from ex
        finally:
            if conn is not None:
                conn.close()