in src/sagemaker_rl/ray_launcher.py [0:0]
def ray_init_config(self):
num_workers = max(self.num_cpus, 3)
config = {"num_cpus": num_workers, "num_gpus": self.num_gpus}
if self.is_master_node:
all_workers_host_names = self.get_all_host_names()[1:]
# Single machine job
if len(all_workers_host_names) == 0:
return config
master_ip = get_ip_from_host(host_name=self.host_name)
self.start_ray_cluster(master_ip)
self.sage_cluster_communicator.write_host_config(
ip=master_ip, host_name="%s:%s" % (self.cluster_type.value, self.host_name)
)
self.sage_cluster_communicator.create_s3_signal(
"%s:%s" % (self.cluster_type.value, self.host_name)
)
print("Waiting for %s worker nodes to join!" % (len(all_workers_host_names)))
self.sage_cluster_communicator.wait_for_signals(all_workers_host_names)
print("All worker nodes have joined the cluster. Now training...")
if ray.__version__ >= "0.8.2":
config = {"address": "%s:6379" % master_ip}
else:
config = {"redis_address": "%s:6379" % master_ip}
else:
master_ip, master_hostname = self.sage_cluster_communicator.get_master_config()
node_ip = get_ip_from_host(host_name=self.host_name)
self.sage_cluster_communicator.wait_for_signals([master_hostname])
print("Attempting to join ray cluster.")
self.join_ray_cluster(master_ip, node_ip)
self.sage_cluster_communicator.create_s3_signal(
"%s:%s" % (self.cluster_type.value, self.host_name)
)
print("Joined ray cluster at %s successfully!" % master_ip)
self.sage_cluster_communicator.wait_for_signals(
[TERMINATION_SIGNAL], timeout=sys.maxsize
)
print("Received job termination signal. Shutting down.")
return config