def _build_spark_submit_command()

in providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py [0:0]


    def _build_spark_submit_command(self, application: str) -> list[str]:
        """
        Construct the spark-submit command to execute.

        :param application: command to append to the spark-submit command
        :return: full command to be executed
        """
        connection_cmd = self._get_spark_binary_path()

        # The url of the spark master
        connection_cmd += ["--master", self._connection["master"]]

        for key in self._conf:
            connection_cmd += ["--conf", f"{key}={self._conf[key]}"]
        if self._env_vars and (self._is_kubernetes or self._is_yarn):
            if self._is_yarn:
                tmpl = "spark.yarn.appMasterEnv.{}={}"
                # Allow dynamic setting of hadoop/yarn configuration environments
                self._env = self._env_vars
            else:
                tmpl = "spark.kubernetes.driverEnv.{}={}"
            for key in self._env_vars:
                connection_cmd += ["--conf", tmpl.format(key, str(self._env_vars[key]))]
        elif self._env_vars and self._connection["deploy_mode"] != "cluster":
            self._env = self._env_vars  # Do it on Popen of the process
        elif self._env_vars and self._connection["deploy_mode"] == "cluster":
            raise AirflowException("SparkSubmitHook env_vars is not supported in standalone-cluster mode.")
        if self._is_kubernetes and self._connection["namespace"]:
            connection_cmd += [
                "--conf",
                f"spark.kubernetes.namespace={self._connection['namespace']}",
            ]
        if self._properties_file:
            connection_cmd += ["--properties-file", self._properties_file]
        if self._files:
            connection_cmd += ["--files", self._files]
        if self._py_files:
            connection_cmd += ["--py-files", self._py_files]
        if self._archives:
            connection_cmd += ["--archives", self._archives]
        if self._driver_class_path:
            connection_cmd += ["--driver-class-path", self._driver_class_path]
        if self._jars:
            connection_cmd += ["--jars", self._jars]
        if self._packages:
            connection_cmd += ["--packages", self._packages]
        if self._exclude_packages:
            connection_cmd += ["--exclude-packages", self._exclude_packages]
        if self._repositories:
            connection_cmd += ["--repositories", self._repositories]
        if self._num_executors:
            connection_cmd += ["--num-executors", str(self._num_executors)]
        if self._total_executor_cores:
            connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
        if self._executor_cores:
            connection_cmd += ["--executor-cores", str(self._executor_cores)]
        if self._executor_memory:
            connection_cmd += ["--executor-memory", self._executor_memory]
        if self._driver_memory:
            connection_cmd += ["--driver-memory", self._driver_memory]
        if self._connection["keytab"]:
            connection_cmd += ["--keytab", self._connection["keytab"]]
        if self._connection["principal"]:
            connection_cmd += ["--principal", self._connection["principal"]]
        if self._use_krb5ccache:
            if not os.getenv("KRB5CCNAME"):
                raise AirflowException(
                    "KRB5CCNAME environment variable required to use ticket ccache is missing."
                )
            connection_cmd += ["--conf", "spark.kerberos.renewal.credentials=ccache"]
        if self._proxy_user:
            connection_cmd += ["--proxy-user", self._proxy_user]
        if self._name:
            connection_cmd += ["--name", self._name]
        if self._java_class:
            connection_cmd += ["--class", self._java_class]
        if self._verbose:
            connection_cmd += ["--verbose"]
        if self._connection["queue"]:
            connection_cmd += ["--queue", self._connection["queue"]]
        if self._connection["deploy_mode"]:
            connection_cmd += ["--deploy-mode", self._connection["deploy_mode"]]

        # The actual script to execute
        connection_cmd += [application]

        # Append any application arguments
        if self._application_args:
            connection_cmd += self._application_args

        self.log.info("Spark-Submit cmd: %s", self._mask_cmd(connection_cmd))

        return connection_cmd