in smallpond/session.py [0:0]
def __init__(self, **kwargs):
"""
Create a smallpond environment.
"""
super().__init__()
self._ctx = Context()
self.config, self._platform = Config.from_args_and_env(**kwargs)
# construct runtime context for Tasks
runtime_ctx = RuntimeContext(
job_id=JobId(hex=self.config.job_id),
job_time=self.config.job_time,
data_root=self.config.data_root,
num_executors=self.config.num_executors,
bind_numa_node=self.config.bind_numa_node,
shared_log_root=self._platform.shared_log_root(),
)
self._runtime_ctx = runtime_ctx
# if `spawn` is specified, spawn a job and exit
if os.environ.get("SP_SPAWN") == "1":
self._spawn_self()
exit(0)
self._runtime_ctx.initialize(exec_id=socket.gethostname())
logger.info(f"using platform: {self._platform}")
logger.info(f"command-line arguments: {' '.join(sys.argv)}")
logger.info(f"session config: {self.config}")
def setup_worker():
runtime_ctx._init_logs(exec_id=socket.gethostname(), capture_stdout_stderr=True)
if self.config.ray_address is None:
# find the memory allocator
if self.config.memory_allocator == "system":
malloc_path = ""
elif self.config.memory_allocator == "jemalloc":
malloc_path = shutil.which("libjemalloc.so.2")
assert malloc_path is not None, "jemalloc is not installed"
elif self.config.memory_allocator == "mimalloc":
malloc_path = shutil.which("libmimalloc.so.2.1")
assert malloc_path is not None, "mimalloc is not installed"
else:
raise ValueError(f"unsupported memory allocator: {self.config.memory_allocator}")
memory_purge_delay = 10000
# start ray head node
# for ray head node to access grafana
os.environ["RAY_GRAFANA_HOST"] = "http://localhost:8122"
self._ray_address = ray.init(
# start a new local cluster
address="local",
# disable local CPU resource if not running on localhost
num_cpus=(0 if self.config.num_executors > 0 else self._runtime_ctx.usable_cpu_count),
# set the memory limit to the available memory size
_memory=self._runtime_ctx.usable_memory_size,
# setup logging for workers
log_to_driver=False,
runtime_env={
"worker_process_setup_hook": setup_worker,
"env_vars": {
"LD_PRELOAD": malloc_path,
"MALLOC_CONF": f"percpu_arena:percpu,background_thread:true,metadata_thp:auto,dirty_decay_ms:{memory_purge_delay},muzzy_decay_ms:{memory_purge_delay},oversize_threshold:0,lg_tcache_max:16",
"MIMALLOC_PURGE_DELAY": f"{memory_purge_delay}",
"ARROW_DEFAULT_MEMORY_POOL": self.config.memory_allocator,
"ARROW_IO_THREADS": "2",
"OMP_NUM_THREADS": "2",
"POLARS_MAX_THREADS": "2",
"NUMEXPR_MAX_THREADS": "2",
"RAY_PROFILING": "1",
},
},
dashboard_host="0.0.0.0",
dashboard_port=8008,
# for prometheus to scrape metrics
_metrics_export_port=8080,
).address_info["gcs_address"]
logger.info(f"started ray cluster at {self._ray_address}")
self._prometheus_process = self._start_prometheus()
self._grafana_process = self._start_grafana()
else:
self._ray_address = self.config.ray_address
self._prometheus_process = None
self._grafana_process = None
logger.info(f"connected to ray cluster at {self._ray_address}")
# start workers
if self.config.num_executors > 0:
# override configs
kwargs["job_id"] = self.config.job_id
self._job_names = self._platform.start_job(
self.config.num_executors,
entrypoint=os.path.join(os.path.dirname(__file__), "worker.py"),
args=[
f"--ray_address={self._ray_address}",
f"--log_dir={self._runtime_ctx.log_root}",
*(["--bind_numa_node"] if self.config.bind_numa_node else []),
],
extra_opts=kwargs,
)
else:
self._job_names = []
# spawn a thread to periodically dump metrics
self._stop_event = threading.Event()
self._dump_thread = threading.Thread(name="dump_thread", target=self._dump_periodically, daemon=True)
self._dump_thread.start()