def run()

in src/smspark/job.py [0:0]


    def run(self, spark_submit_cmd: str, spark_event_logs_s3_uri: str, local_spark_event_logs_dir: str) -> None:
        """Run a Spark job.

        First, wait for workers to come up and bootstraps the cluster.
        Then runs spark-submit, waits until the job succeeds or fails.
        Worker nodes are shut down gracefully.

        Args:
          spark_submit_cmd (str): Command submitted to run spark-submit
        """
        self.logger.info("waiting for hosts")
        self._wait_for_hostname_resolution()
        self.logger.info("starting status server")
        self._start_status_server()
        self.logger.info("bootstrapping cluster")
        self._bootstrap_yarn()
        self.logger.info("starting executor logs watcher")
        self._start_executor_logs_watcher()

        if self._is_primary_host:
            self.logger.info("start log event log publisher")
            spark_log_publisher = self._start_spark_event_log_publisher(
                spark_event_logs_s3_uri, local_spark_event_logs_dir
            )

            self.logger.info(f"Waiting for hosts to bootstrap: {self.hosts}")

            def all_hosts_have_bootstrapped() -> bool:
                try:
                    host_statuses: Mapping[str, StatusMessage] = self.status_client.get_status(self.hosts)

                except ConnectionError as e:
                    self.logger.info(
                        f"Got ConnectionError when polling hosts for status. Host may not have come up: {str(e)}.\nTraceback: {traceback.format_exc()}"
                    )
                    return False
                self.logger.info(f"Received host statuses: {host_statuses.items()}")
                has_bootstrapped = [message.status == Status.WAITING for message in host_statuses.values()]
                return all(has_bootstrapped)

            self.waiter.wait_for(
                predicate_fn=all_hosts_have_bootstrapped,
                timeout=ProcessingJobManager._bootstrapping_timeout,
                period=5.0,
            )

            try:
                subprocess.run(spark_submit_cmd, check=True, shell=True)
                self.logger.info("spark submit was successful. primary node exiting.")
            except subprocess.CalledProcessError as e:
                self.logger.error(
                    f"spark-submit command failed with exit code {e.returncode}: {str(e)}\n{traceback.format_exc()}"
                    + str(e)
                    + "\n"
                    + traceback.format_exc()
                )
                raise AlgorithmError("spark failed with a non-zero exit code", caused_by=e, exit_code=e.returncode)
            except Exception as e:
                self.logger.error("Exception during processing: " + str(e) + "\n" + traceback.format_exc())
                raise AlgorithmError(
                    message="error occurred during spark-submit execution. Please see logs for details.",
                    caused_by=e,
                )

            finally:
                spark_log_publisher.down()
                spark_log_publisher.join(timeout=20)

        else:
            # workers wait until the primary is up, then wait until it's down.
            def primary_is_up() -> bool:
                try:
                    self.status_client.get_status([self._cluster_primary_host])
                    return True
                except Exception:
                    return False

            def primary_is_down() -> bool:
                return not primary_is_up()

            self.logger.info("waiting for the primary to come up")
            self.waiter.wait_for(primary_is_up, timeout=ProcessingJobManager._wait_for_primary_timeout, period=1.0)
            self.logger.info("waiting for the primary to go down")
            self.waiter.wait_for(primary_is_down, timeout=float("inf"), period=5.0)
            self.logger.info("primary is down, worker now exiting")