in providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py [0:0]
def _build_spark_submit_command(self, application: str) -> list[str]:
"""
Construct the spark-submit command to execute.
:param application: command to append to the spark-submit command
:return: full command to be executed
"""
connection_cmd = self._get_spark_binary_path()
# The url of the spark master
connection_cmd += ["--master", self._connection["master"]]
for key in self._conf:
connection_cmd += ["--conf", f"{key}={self._conf[key]}"]
if self._env_vars and (self._is_kubernetes or self._is_yarn):
if self._is_yarn:
tmpl = "spark.yarn.appMasterEnv.{}={}"
# Allow dynamic setting of hadoop/yarn configuration environments
self._env = self._env_vars
else:
tmpl = "spark.kubernetes.driverEnv.{}={}"
for key in self._env_vars:
connection_cmd += ["--conf", tmpl.format(key, str(self._env_vars[key]))]
elif self._env_vars and self._connection["deploy_mode"] != "cluster":
self._env = self._env_vars # Do it on Popen of the process
elif self._env_vars and self._connection["deploy_mode"] == "cluster":
raise AirflowException("SparkSubmitHook env_vars is not supported in standalone-cluster mode.")
if self._is_kubernetes and self._connection["namespace"]:
connection_cmd += [
"--conf",
f"spark.kubernetes.namespace={self._connection['namespace']}",
]
if self._properties_file:
connection_cmd += ["--properties-file", self._properties_file]
if self._files:
connection_cmd += ["--files", self._files]
if self._py_files:
connection_cmd += ["--py-files", self._py_files]
if self._archives:
connection_cmd += ["--archives", self._archives]
if self._driver_class_path:
connection_cmd += ["--driver-class-path", self._driver_class_path]
if self._jars:
connection_cmd += ["--jars", self._jars]
if self._packages:
connection_cmd += ["--packages", self._packages]
if self._exclude_packages:
connection_cmd += ["--exclude-packages", self._exclude_packages]
if self._repositories:
connection_cmd += ["--repositories", self._repositories]
if self._num_executors:
connection_cmd += ["--num-executors", str(self._num_executors)]
if self._total_executor_cores:
connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
if self._executor_cores:
connection_cmd += ["--executor-cores", str(self._executor_cores)]
if self._executor_memory:
connection_cmd += ["--executor-memory", self._executor_memory]
if self._driver_memory:
connection_cmd += ["--driver-memory", self._driver_memory]
if self._connection["keytab"]:
connection_cmd += ["--keytab", self._connection["keytab"]]
if self._connection["principal"]:
connection_cmd += ["--principal", self._connection["principal"]]
if self._use_krb5ccache:
if not os.getenv("KRB5CCNAME"):
raise AirflowException(
"KRB5CCNAME environment variable required to use ticket ccache is missing."
)
connection_cmd += ["--conf", "spark.kerberos.renewal.credentials=ccache"]
if self._proxy_user:
connection_cmd += ["--proxy-user", self._proxy_user]
if self._name:
connection_cmd += ["--name", self._name]
if self._java_class:
connection_cmd += ["--class", self._java_class]
if self._verbose:
connection_cmd += ["--verbose"]
if self._connection["queue"]:
connection_cmd += ["--queue", self._connection["queue"]]
if self._connection["deploy_mode"]:
connection_cmd += ["--deploy-mode", self._connection["deploy_mode"]]
# The actual script to execute
connection_cmd += [application]
# Append any application arguments
if self._application_args:
connection_cmd += self._application_args
self.log.info("Spark-Submit cmd: %s", self._mask_cmd(connection_cmd))
return connection_cmd