def __init__()

in smallpond/session.py [0:0]


    def __init__(self, **kwargs):
        """
        Create a smallpond environment.
        """
        super().__init__()
        self._ctx = Context()
        self.config, self._platform = Config.from_args_and_env(**kwargs)

        # construct runtime context for Tasks
        runtime_ctx = RuntimeContext(
            job_id=JobId(hex=self.config.job_id),
            job_time=self.config.job_time,
            data_root=self.config.data_root,
            num_executors=self.config.num_executors,
            bind_numa_node=self.config.bind_numa_node,
            shared_log_root=self._platform.shared_log_root(),
        )
        self._runtime_ctx = runtime_ctx

        # if `spawn` is specified, spawn a job and exit
        if os.environ.get("SP_SPAWN") == "1":
            self._spawn_self()
            exit(0)

        self._runtime_ctx.initialize(exec_id=socket.gethostname())
        logger.info(f"using platform: {self._platform}")
        logger.info(f"command-line arguments: {' '.join(sys.argv)}")
        logger.info(f"session config: {self.config}")

        def setup_worker():
            runtime_ctx._init_logs(exec_id=socket.gethostname(), capture_stdout_stderr=True)

        if self.config.ray_address is None:
            # find the memory allocator
            if self.config.memory_allocator == "system":
                malloc_path = ""
            elif self.config.memory_allocator == "jemalloc":
                malloc_path = shutil.which("libjemalloc.so.2")
                assert malloc_path is not None, "jemalloc is not installed"
            elif self.config.memory_allocator == "mimalloc":
                malloc_path = shutil.which("libmimalloc.so.2.1")
                assert malloc_path is not None, "mimalloc is not installed"
            else:
                raise ValueError(f"unsupported memory allocator: {self.config.memory_allocator}")
            memory_purge_delay = 10000

            # start ray head node
            # for ray head node to access grafana
            os.environ["RAY_GRAFANA_HOST"] = "http://localhost:8122"
            self._ray_address = ray.init(
                # start a new local cluster
                address="local",
                # disable local CPU resource if not running on localhost
                num_cpus=(0 if self.config.num_executors > 0 else self._runtime_ctx.usable_cpu_count),
                # set the memory limit to the available memory size
                _memory=self._runtime_ctx.usable_memory_size,
                # setup logging for workers
                log_to_driver=False,
                runtime_env={
                    "worker_process_setup_hook": setup_worker,
                    "env_vars": {
                        "LD_PRELOAD": malloc_path,
                        "MALLOC_CONF": f"percpu_arena:percpu,background_thread:true,metadata_thp:auto,dirty_decay_ms:{memory_purge_delay},muzzy_decay_ms:{memory_purge_delay},oversize_threshold:0,lg_tcache_max:16",
                        "MIMALLOC_PURGE_DELAY": f"{memory_purge_delay}",
                        "ARROW_DEFAULT_MEMORY_POOL": self.config.memory_allocator,
                        "ARROW_IO_THREADS": "2",
                        "OMP_NUM_THREADS": "2",
                        "POLARS_MAX_THREADS": "2",
                        "NUMEXPR_MAX_THREADS": "2",
                        "RAY_PROFILING": "1",
                    },
                },
                dashboard_host="0.0.0.0",
                dashboard_port=8008,
                # for prometheus to scrape metrics
                _metrics_export_port=8080,
            ).address_info["gcs_address"]
            logger.info(f"started ray cluster at {self._ray_address}")

            self._prometheus_process = self._start_prometheus()
            self._grafana_process = self._start_grafana()
        else:
            self._ray_address = self.config.ray_address
            self._prometheus_process = None
            self._grafana_process = None
            logger.info(f"connected to ray cluster at {self._ray_address}")

        # start workers
        if self.config.num_executors > 0:
            # override configs
            kwargs["job_id"] = self.config.job_id

            self._job_names = self._platform.start_job(
                self.config.num_executors,
                entrypoint=os.path.join(os.path.dirname(__file__), "worker.py"),
                args=[
                    f"--ray_address={self._ray_address}",
                    f"--log_dir={self._runtime_ctx.log_root}",
                    *(["--bind_numa_node"] if self.config.bind_numa_node else []),
                ],
                extra_opts=kwargs,
            )
        else:
            self._job_names = []

        # spawn a thread to periodically dump metrics
        self._stop_event = threading.Event()
        self._dump_thread = threading.Thread(name="dump_thread", target=self._dump_periodically, daemon=True)
        self._dump_thread.start()