in src/sagemaker/spark/processing.py [0:0]
def _stage_submit_deps(self, submit_deps, input_channel_name):
"""Prepares a list of paths to jars, py-files, or files dependencies.
This prepared list of paths is provided as `spark-submit` options.
The submit_deps list may include a combination of S3 URIs and local paths.
Any S3 URIs are appended to the `spark-submit` option value without modification.
Any local file paths are copied to a temp directory, uploaded to ``dependency location``,
and included as a ProcessingInput channel to provide as local files to the SageMaker
Spark container.
:param submit_deps (list[str]): List of one or more dependency paths to include.
:param input_channel_name (str): The `spark-submit` option name associated with
the input channel.
:return (Optional[ProcessingInput], str): Tuple of (left) optional ProcessingInput
for the input channel, and (right) comma-delimited value for
`spark-submit` option.
"""
if not submit_deps:
raise ValueError(
f"submit_deps value may not be empty. {self._submit_deps_error_message}"
)
if not input_channel_name:
raise ValueError("input_channel_name value may not be empty.")
use_input_channel = False
spark_opt_s3_uris = []
spark_opt_s3_uris_has_pipeline_var = False
with tempfile.TemporaryDirectory() as tmpdir:
for dep_path in submit_deps:
if is_pipeline_variable(dep_path):
spark_opt_s3_uris.append(dep_path)
spark_opt_s3_uris_has_pipeline_var = True
continue
dep_url = urlparse(dep_path)
# S3 URIs are included as-is in the spark-submit argument
if dep_url.scheme in ["s3", "s3a"]:
spark_opt_s3_uris.append(dep_path)
# Local files are copied to temp directory to be uploaded to S3
elif not dep_url.scheme or dep_url.scheme == "file":
if not os.path.isfile(dep_path):
raise ValueError(
f"submit_deps path {dep_path} is not a valid local file. "
f"{self._submit_deps_error_message}"
)
logger.info(
"Copying dependency from local path %s to tmpdir %s", dep_path, tmpdir
)
shutil.copy(dep_path, tmpdir)
else:
raise ValueError(
f"submit_deps path {dep_path} references unsupported filesystem "
f"scheme: {dep_url.scheme} {self._submit_deps_error_message}"
)
# If any local files were found and copied, upload the temp directory to S3
if os.listdir(tmpdir):
from sagemaker.workflow.utilities import _pipeline_config
if self.dependency_location:
if self.dependency_location.endswith("/"):
s3_prefix_uri = self.dependency_location[:-1]
else:
s3_prefix_uri = self.dependency_location
else:
s3_prefix_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
self.sagemaker_session.default_bucket_prefix,
)
if _pipeline_config and _pipeline_config.code_hash:
input_channel_s3_uri = (
f"{s3_prefix_uri}/{_pipeline_config.pipeline_name}/"
f"code/{_pipeline_config.code_hash}/{input_channel_name}"
)
else:
input_channel_s3_uri = (
f"{s3_prefix_uri}/{self._current_job_name}/input/{input_channel_name}"
)
logger.info(
"Uploading dependencies from tmpdir %s to S3 %s", tmpdir, input_channel_s3_uri
)
S3Uploader.upload(
local_path=tmpdir,
desired_s3_uri=input_channel_s3_uri,
sagemaker_session=self.sagemaker_session,
)
use_input_channel = True
# If any local files were uploaded, construct a ProcessingInput to provide
# them to the Spark container and form the spark-submit option from a
# combination of S3 URIs and container's local input path
if use_input_channel:
input_channel = ProcessingInput(
source=input_channel_s3_uri,
destination=f"{self._conf_container_base_path}{input_channel_name}",
input_name=input_channel_name,
)
spark_opt = (
Join(on=",", values=spark_opt_s3_uris + [input_channel.destination])
if spark_opt_s3_uris_has_pipeline_var
else ",".join(spark_opt_s3_uris + [input_channel.destination])
)
# If no local files were uploaded, form the spark-submit option from a list of S3 URIs
else:
input_channel = None
spark_opt = (
Join(on=",", values=spark_opt_s3_uris)
if spark_opt_s3_uris_has_pipeline_var
else ",".join(spark_opt_s3_uris)
)
return input_channel, spark_opt