in smallpond/execution/task.py [0:0]
def initialize(self):
self.inject_fault()
if self._memory_limit is None:
self._memory_limit = np.int64(self.ctx.usable_memory_size * self._cpu_limit // self.ctx.usable_cpu_count)
assert self.partition_infos, f"empty partition infos: {self}"
os.makedirs(self.runtime_output_abspath, exist_ok=self.output_root is not None)
os.makedirs(self.temp_abspath, exist_ok=False)
if not self.exec_on_scheduler:
if self.ctx.enable_profiling:
self.perf_profile = cProfile.Profile()
self.perf_profile.enable()
if self.ctx.enforce_memory_limit:
self.set_memory_limit(round_up(self.memory_limit * 1.2), round_up(self.memory_limit * 1.5))
if self.ctx.remove_empty_parquet:
for dataset in self.input_datasets:
if isinstance(dataset, ParquetDataSet):
dataset.remove_empty_files()
logger.info("running task: {}", self)
logger.debug("input datasets: {}", self.input_datasets)
logger.trace(f"final output directory: {self.final_output_abspath}")
logger.trace(f"runtime output directory: {self.runtime_output_abspath}")
logger.trace(f"resource limit: {self.cpu_limit} cpus, {self.gpu_limit} gpus, {self.memory_limit/GB:.3f}GB memory")
random.seed(self.random_seed_bytes)
arrow.set_cpu_count(self.cpu_limit)
arrow.set_io_thread_count(self.cpu_limit)
os.environ["OMP_NUM_THREADS"] = str(self.cpu_limit)
os.environ["POLARS_MAX_THREADS"] = str(self.cpu_limit)