def _stage_submit_deps()

in src/sagemaker/spark/processing.py [0:0]


    def _stage_submit_deps(self, submit_deps, input_channel_name):
        """Prepares a list of paths to jars, py-files, or files dependencies.

        This prepared list of paths is provided as `spark-submit` options.
        The submit_deps list may include a combination of S3 URIs and local paths.
        Any S3 URIs are appended to the `spark-submit` option value without modification.
        Any local file paths are copied to a temp directory, uploaded to ``dependency location``,
        and included as a ProcessingInput channel to provide as local files to the SageMaker
        Spark container.

        :param submit_deps (list[str]): List of one or more dependency paths to include.
        :param input_channel_name (str): The `spark-submit` option name associated with
                    the input channel.
        :return (Optional[ProcessingInput], str): Tuple of (left) optional ProcessingInput
                    for the input channel, and (right) comma-delimited value for
                    `spark-submit` option.
        """
        if not submit_deps:
            raise ValueError(
                f"submit_deps value may not be empty. {self._submit_deps_error_message}"
            )
        if not input_channel_name:
            raise ValueError("input_channel_name value may not be empty.")

        use_input_channel = False
        spark_opt_s3_uris = []
        spark_opt_s3_uris_has_pipeline_var = False

        with tempfile.TemporaryDirectory() as tmpdir:
            for dep_path in submit_deps:
                if is_pipeline_variable(dep_path):
                    spark_opt_s3_uris.append(dep_path)
                    spark_opt_s3_uris_has_pipeline_var = True
                    continue
                dep_url = urlparse(dep_path)
                # S3 URIs are included as-is in the spark-submit argument
                if dep_url.scheme in ["s3", "s3a"]:
                    spark_opt_s3_uris.append(dep_path)
                # Local files are copied to temp directory to be uploaded to S3
                elif not dep_url.scheme or dep_url.scheme == "file":
                    if not os.path.isfile(dep_path):
                        raise ValueError(
                            f"submit_deps path {dep_path} is not a valid local file. "
                            f"{self._submit_deps_error_message}"
                        )
                    logger.info(
                        "Copying dependency from local path %s to tmpdir %s", dep_path, tmpdir
                    )
                    shutil.copy(dep_path, tmpdir)
                else:
                    raise ValueError(
                        f"submit_deps path {dep_path} references unsupported filesystem "
                        f"scheme: {dep_url.scheme} {self._submit_deps_error_message}"
                    )

            # If any local files were found and copied, upload the temp directory to S3
            if os.listdir(tmpdir):
                from sagemaker.workflow.utilities import _pipeline_config

                if self.dependency_location:
                    if self.dependency_location.endswith("/"):
                        s3_prefix_uri = self.dependency_location[:-1]
                    else:
                        s3_prefix_uri = self.dependency_location
                else:
                    s3_prefix_uri = s3.s3_path_join(
                        "s3://",
                        self.sagemaker_session.default_bucket(),
                        self.sagemaker_session.default_bucket_prefix,
                    )

                if _pipeline_config and _pipeline_config.code_hash:
                    input_channel_s3_uri = (
                        f"{s3_prefix_uri}/{_pipeline_config.pipeline_name}/"
                        f"code/{_pipeline_config.code_hash}/{input_channel_name}"
                    )
                else:
                    input_channel_s3_uri = (
                        f"{s3_prefix_uri}/{self._current_job_name}/input/{input_channel_name}"
                    )
                logger.info(
                    "Uploading dependencies from tmpdir %s to S3 %s", tmpdir, input_channel_s3_uri
                )
                S3Uploader.upload(
                    local_path=tmpdir,
                    desired_s3_uri=input_channel_s3_uri,
                    sagemaker_session=self.sagemaker_session,
                )
                use_input_channel = True

        # If any local files were uploaded, construct a ProcessingInput to provide
        # them to the Spark container  and form the spark-submit option from a
        # combination of S3 URIs and container's local input path
        if use_input_channel:
            input_channel = ProcessingInput(
                source=input_channel_s3_uri,
                destination=f"{self._conf_container_base_path}{input_channel_name}",
                input_name=input_channel_name,
            )
            spark_opt = (
                Join(on=",", values=spark_opt_s3_uris + [input_channel.destination])
                if spark_opt_s3_uris_has_pipeline_var
                else ",".join(spark_opt_s3_uris + [input_channel.destination])
            )
        # If no local files were uploaded, form the spark-submit option from a list of S3 URIs
        else:
            input_channel = None
            spark_opt = (
                Join(on=",", values=spark_opt_s3_uris)
                if spark_opt_s3_uris_has_pipeline_var
                else ",".join(spark_opt_s3_uris)
            )

        return input_channel, spark_opt