def spark_submit()

in sdk/aws_orbit_sdk/emr.py [0:0]


def spark_submit(job: Dict[str, Any]) -> Dict[str, Any]:
    """
    This code will run your PySpark job on the spark EMR using your current source directory.

    Parameters
    ----------
    job : dict
        A pyspark job definition to execute

        cluster_id : str
            The EMR cluster ID
        app_name : str
            The Spark application Name
        module : str
            A relative path from repository root to the Python module containing the pyspark application
        waitAppCompletion : bool
            If True , will wait until EMR step is completed.
        app_args : list
            List of arguments to pass to the spark application
        spark_args : list
            List of arguments controlling the spark execution

    Returns
    -------
    response : dict
        The output for the AddJobFlowSteps operation and identifiers of the list of steps added to the job flow.

    Example
    -------
    >>> import aws.utils.notebooks.spark.emr as sparkConnection
    >>> response = sparkConnection.spark_submit(job = {
    ...                                         "cluster_id" : cluster_id,
    ...                                            "app_name":  "test1",
    ...                                            "module" : "samples/python/pyspark/createTbl.py",
    ...                                            "wait_app_completion": False,
    ...                                            "app_args": [arg1,arg2],
    ...                                            "spark_args": [--num-executors,2,--num_cores,4,--executor_memory,1g]
    ...                                             })
    """
    cluster_id = job["cluster_id"] if "cluster_id" in job.keys() else None
    if cluster_id is None:
        raise Exception("cluster_id must be provided")
    app_name = job["app_name"] if "app_name" in job.keys() else None
    if app_name is None:
        raise Exception("app_name must be provided")
    module = job["module"] if "module" in job.keys() else None
    if module is None:
        raise Exception("module must be provided")

    waitAppCompletion = job["wait_app_completion"] if "wait_app_completion" in job.keys() else False
    appargs = job["app_args"] if "app_args" in job.keys() else []
    sparkargs = job["spark_args"] if "spark_args" in job.keys() else []
    props = get_properties()
    if waitAppCompletion:
        waitApp = "true"
    else:
        waitApp = "false"

    workspaceDir = "workspace"

    notebookInstanceName = socket.gethostname()

    s3WorkspaceDir = "s3://{}/{}/workspaces/{}".format(
        props["AWS_ORBIT_S3_BUCKET"],
        props["AWS_ORBIT_TEAM_SPACE"],
        notebookInstanceName,
    )
    cmd = 'aws s3 sync --delete --exclude "*.git/*" {} {}'.format(workspaceDir, s3WorkspaceDir)
    logout = os.popen(cmd).read()
    logger.info("s3 workspace directory is %s", s3WorkspaceDir)
    logger.debug(logout)

    module = os.path.join(s3WorkspaceDir, module)

    emr = boto3.client("emr")
    args = [
        "/usr/bin/spark-submit",
        "--verbose",
        "--deploy-mode",
        "cluster",
        "--master",
        "yarn",
        "--conf",
        "pyspark.yarn.submit.waitAppCompletion=" + waitApp,
        "--conf",
        "hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
        "--conf",
        "hive.metastore.connect.retries=5",
    ]
    args.extend(sparkargs)
    args.append(module)
    args.extend(appargs)

    response = emr.add_job_flow_steps(
        JobFlowId=cluster_id,
        Steps=[
            {
                "Name": app_name,
                "ActionOnFailure": "CONTINUE",
                "HadoopJarStep": {"Jar": "command-runner.jar", "Args": args},
            },
        ],
    )

    return response