in src/smspark/job.py [0:0]
def run(self, spark_submit_cmd: str, spark_event_logs_s3_uri: str, local_spark_event_logs_dir: str) -> None:
"""Run a Spark job.
First, wait for workers to come up and bootstraps the cluster.
Then runs spark-submit, waits until the job succeeds or fails.
Worker nodes are shut down gracefully.
Args:
spark_submit_cmd (str): Command submitted to run spark-submit
"""
self.logger.info("waiting for hosts")
self._wait_for_hostname_resolution()
self.logger.info("starting status server")
self._start_status_server()
self.logger.info("bootstrapping cluster")
self._bootstrap_yarn()
self.logger.info("starting executor logs watcher")
self._start_executor_logs_watcher()
if self._is_primary_host:
self.logger.info("start log event log publisher")
spark_log_publisher = self._start_spark_event_log_publisher(
spark_event_logs_s3_uri, local_spark_event_logs_dir
)
self.logger.info(f"Waiting for hosts to bootstrap: {self.hosts}")
def all_hosts_have_bootstrapped() -> bool:
try:
host_statuses: Mapping[str, StatusMessage] = self.status_client.get_status(self.hosts)
except ConnectionError as e:
self.logger.info(
f"Got ConnectionError when polling hosts for status. Host may not have come up: {str(e)}.\nTraceback: {traceback.format_exc()}"
)
return False
self.logger.info(f"Received host statuses: {host_statuses.items()}")
has_bootstrapped = [message.status == Status.WAITING for message in host_statuses.values()]
return all(has_bootstrapped)
self.waiter.wait_for(
predicate_fn=all_hosts_have_bootstrapped,
timeout=ProcessingJobManager._bootstrapping_timeout,
period=5.0,
)
try:
subprocess.run(spark_submit_cmd, check=True, shell=True)
self.logger.info("spark submit was successful. primary node exiting.")
except subprocess.CalledProcessError as e:
self.logger.error(
f"spark-submit command failed with exit code {e.returncode}: {str(e)}\n{traceback.format_exc()}"
+ str(e)
+ "\n"
+ traceback.format_exc()
)
raise AlgorithmError("spark failed with a non-zero exit code", caused_by=e, exit_code=e.returncode)
except Exception as e:
self.logger.error("Exception during processing: " + str(e) + "\n" + traceback.format_exc())
raise AlgorithmError(
message="error occurred during spark-submit execution. Please see logs for details.",
caused_by=e,
)
finally:
spark_log_publisher.down()
spark_log_publisher.join(timeout=20)
else:
# workers wait until the primary is up, then wait until it's down.
def primary_is_up() -> bool:
try:
self.status_client.get_status([self._cluster_primary_host])
return True
except Exception:
return False
def primary_is_down() -> bool:
return not primary_is_up()
self.logger.info("waiting for the primary to come up")
self.waiter.wait_for(primary_is_up, timeout=ProcessingJobManager._wait_for_primary_timeout, period=1.0)
self.logger.info("waiting for the primary to go down")
self.waiter.wait_for(primary_is_down, timeout=float("inf"), period=5.0)
self.logger.info("primary is down, worker now exiting")