in modules/python/src/datapreprocessing/ray_utils.py [0:0]
def run_remote(self):
"""
Runs the data processing tasks remotely using Ray. This method initializes the Ray cluster,
imports the necessary modules and classes, instantiates the data processing class, and
distributes the data processing tasks to Ray workers.
Returns:
pd.DataFrame: A concatenated Pandas DataFrame containing the results from all ray workers in this example. It returns the data returned by the function invoked as ray task.
"""
# Initiate a driver: start and connect with Ray cluster
if self.ray_cluster_host != "local":
ClientContext = ray.init(
f"ray://{self.ray_cluster_host}", runtime_env=self.ray_runtime
)
self.logger.debug(ClientContext)
# Get the ID of the node where the driver process is running
driver_process_node_id = ray.get_runtime_context().get_node_id() # HEX
self.logger.debug(f"ray_driver_node_id={driver_process_node_id}")
self.logger.debug(ray.cluster_resources())
else:
RayContext = ray.init()
self.logger.debug(RayContext)
complete_module_name = self.package_name + "." + self.module_name
module = importlib.import_module(complete_module_name)
MyClass = getattr(module, self.class_name)
preprocessor = MyClass()
# Probably make this comment generic since any function can be passed to rayutil for running as a task
self.logger.debug("Data Preparation started")
start_time = time.time()
results = ray.get(
[
self.invoke_process_data.remote(
self, preprocessor, self.df[i], i, self.gcs_bucket, self.gcs_folder
)
for i in range(len(self.df))
]
)
duration = time.time() - start_time
self.logger.debug(f"Data Preparation finished in {duration} seconds")
# Disconnect the worker, and terminate processes started by ray.init()
ray.shutdown()
# concat all the resulting data frames
result_df = pd.concat(results, axis=0, ignore_index=True)
return result_df