def initialize()

in smallpond/execution/task.py [0:0]


    def initialize(self):
        self.inject_fault()

        if self._memory_limit is None:
            self._memory_limit = np.int64(self.ctx.usable_memory_size * self._cpu_limit // self.ctx.usable_cpu_count)
        assert self.partition_infos, f"empty partition infos: {self}"
        os.makedirs(self.runtime_output_abspath, exist_ok=self.output_root is not None)
        os.makedirs(self.temp_abspath, exist_ok=False)

        if not self.exec_on_scheduler:
            if self.ctx.enable_profiling:
                self.perf_profile = cProfile.Profile()
                self.perf_profile.enable()
            if self.ctx.enforce_memory_limit:
                self.set_memory_limit(round_up(self.memory_limit * 1.2), round_up(self.memory_limit * 1.5))
            if self.ctx.remove_empty_parquet:
                for dataset in self.input_datasets:
                    if isinstance(dataset, ParquetDataSet):
                        dataset.remove_empty_files()
            logger.info("running task: {}", self)
            logger.debug("input datasets: {}", self.input_datasets)
            logger.trace(f"final output directory: {self.final_output_abspath}")
            logger.trace(f"runtime output directory: {self.runtime_output_abspath}")
            logger.trace(f"resource limit: {self.cpu_limit} cpus, {self.gpu_limit} gpus, {self.memory_limit/GB:.3f}GB memory")
            random.seed(self.random_seed_bytes)
            arrow.set_cpu_count(self.cpu_limit)
            arrow.set_io_thread_count(self.cpu_limit)
            os.environ["OMP_NUM_THREADS"] = str(self.cpu_limit)
            os.environ["POLARS_MAX_THREADS"] = str(self.cpu_limit)