def run_remote()

in modules/python/src/datapreprocessing/ray_utils.py [0:0]


    def run_remote(self):
        """
        Runs the data processing tasks remotely using Ray.  This method initializes the Ray cluster,
        imports the necessary modules and classes, instantiates the data processing class, and
        distributes the data processing tasks to Ray workers.

        Returns:
            pd.DataFrame: A concatenated Pandas DataFrame containing the results from all ray workers in this example. It returns the data returned by the function invoked as ray task.
        """
        # Initiate a driver: start and connect with Ray cluster
        if self.ray_cluster_host != "local":
            ClientContext = ray.init(
                f"ray://{self.ray_cluster_host}", runtime_env=self.ray_runtime
            )
            self.logger.debug(ClientContext)

            # Get the ID of the node where the driver process is running
            driver_process_node_id = ray.get_runtime_context().get_node_id()  # HEX
            self.logger.debug(f"ray_driver_node_id={driver_process_node_id}")

            self.logger.debug(ray.cluster_resources())
        else:
            RayContext = ray.init()
            self.logger.debug(RayContext)

        complete_module_name = self.package_name + "." + self.module_name
        module = importlib.import_module(complete_module_name)
        MyClass = getattr(module, self.class_name)
        preprocessor = MyClass()
        # Probably make this comment generic since any function can be passed to rayutil for running as a task
        self.logger.debug("Data Preparation started")
        start_time = time.time()
        results = ray.get(
            [
                self.invoke_process_data.remote(
                    self, preprocessor, self.df[i], i, self.gcs_bucket, self.gcs_folder
                )
                for i in range(len(self.df))
            ]
        )
        duration = time.time() - start_time
        self.logger.debug(f"Data Preparation finished in {duration} seconds")

        # Disconnect the worker, and terminate processes started by ray.init()
        ray.shutdown()

        # concat all the resulting data frames
        result_df = pd.concat(results, axis=0, ignore_index=True)

        return result_df