in sdk/aws_orbit_sdk/emr.py [0:0]
def spark_submit(job: Dict[str, Any]) -> Dict[str, Any]:
"""
This code will run your PySpark job on the spark EMR using your current source directory.
Parameters
----------
job : dict
A pyspark job definition to execute
cluster_id : str
The EMR cluster ID
app_name : str
The Spark application Name
module : str
A relative path from repository root to the Python module containing the pyspark application
waitAppCompletion : bool
If True , will wait until EMR step is completed.
app_args : list
List of arguments to pass to the spark application
spark_args : list
List of arguments controlling the spark execution
Returns
-------
response : dict
The output for the AddJobFlowSteps operation and identifiers of the list of steps added to the job flow.
Example
-------
>>> import aws.utils.notebooks.spark.emr as sparkConnection
>>> response = sparkConnection.spark_submit(job = {
... "cluster_id" : cluster_id,
... "app_name": "test1",
... "module" : "samples/python/pyspark/createTbl.py",
... "wait_app_completion": False,
... "app_args": [arg1,arg2],
... "spark_args": [--num-executors,2,--num_cores,4,--executor_memory,1g]
... })
"""
cluster_id = job["cluster_id"] if "cluster_id" in job.keys() else None
if cluster_id is None:
raise Exception("cluster_id must be provided")
app_name = job["app_name"] if "app_name" in job.keys() else None
if app_name is None:
raise Exception("app_name must be provided")
module = job["module"] if "module" in job.keys() else None
if module is None:
raise Exception("module must be provided")
waitAppCompletion = job["wait_app_completion"] if "wait_app_completion" in job.keys() else False
appargs = job["app_args"] if "app_args" in job.keys() else []
sparkargs = job["spark_args"] if "spark_args" in job.keys() else []
props = get_properties()
if waitAppCompletion:
waitApp = "true"
else:
waitApp = "false"
workspaceDir = "workspace"
notebookInstanceName = socket.gethostname()
s3WorkspaceDir = "s3://{}/{}/workspaces/{}".format(
props["AWS_ORBIT_S3_BUCKET"],
props["AWS_ORBIT_TEAM_SPACE"],
notebookInstanceName,
)
cmd = 'aws s3 sync --delete --exclude "*.git/*" {} {}'.format(workspaceDir, s3WorkspaceDir)
logout = os.popen(cmd).read()
logger.info("s3 workspace directory is %s", s3WorkspaceDir)
logger.debug(logout)
module = os.path.join(s3WorkspaceDir, module)
emr = boto3.client("emr")
args = [
"/usr/bin/spark-submit",
"--verbose",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--conf",
"pyspark.yarn.submit.waitAppCompletion=" + waitApp,
"--conf",
"hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"--conf",
"hive.metastore.connect.retries=5",
]
args.extend(sparkargs)
args.append(module)
args.extend(appargs)
response = emr.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[
{
"Name": app_name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {"Jar": "command-runner.jar", "Args": args},
},
],
)
return response