def data_profile()

in sdk/aws_orbit_sdk/transformations.py [0:0]


def data_profile(parameters: Dict[str, Any]) -> Dict[str, Any]:
    """
    Runs a data profiling task and returns the profiling reports to a target folder in s3.

    Parameters
    ------------
    parameters : dict
        Dictionary of different optional and required parameters to run the data profiling task (listed below)

        cluster_name : str, optional
            A unique name for the EMR cluster (default: 'lake-user-TestCluster')
        reuse_cluster : bool, optional
            If reuse_cluster is True and cluster exists , will reuse the existing cluster (default: 'True')
        start_cluster : bool, optional
            If start_cluster is True and cannot re-use cluster, then will start a new cluster (default: 'True')
        show_container_log : bool, optional
            If True, will tail the log of the containers until they are stopped (default: False)
        core_instance_count : int, optional
            The number of the instances within the cluster (default: 4)
        samplingRatio : float, optional
            The sample ratio of rows used for inferring (default: 0.05)
        terminate_cluster : str, optional
            If terminate_cluster is False and cluster exists, will not terminate cluster (default: 'False')
        total_runtime : int
            The number of minutes allocated to the notebook to run
        sns_topic : str, optional
            A name for the topic that will get updates on the tasks execution
        database : str
            Database name to profile
        table_filter : str
            A string filter for the tables to profile, can be a single table
            (i.e 'de1_0_2008_beneficiary_summary_file_sample_1') or a regex expression (i.e 'de*' for all tables)
        target_folder : str
            The location of the final reports and notebooks that are generated
        container_concurrency : int, optional
            The number of containers that runs in parallel (default: 1)
        trigger_name : str, optional
            The name of the trigger function in case there is a scheduled task
        frequency : str, optional
            The frequency of the trigger function, cron-like i.e: "cron(0/3 * 1/1 * ? *)"

    Returns
    --------
    response : dict
        The response will be a dictionary that contains the table list and the target folder for the notebooks and
        reports

    Example
    --------
    >>> import aws.utils.notebooks.transformations.transformations as transformations
    >>> params = {
    ...            "total_runtime" : 1800,
    ...            "database" : "cms_raw_db",
    ...            "table_filter" : 'de1_0_2008_beneficiary_summary_file_sample_1',
    ...            "target_folder" : target_folder
    ...          }
    >>> response = transformations.data_profile(params)
    """

    # Initialize: Check if there are missing optional parameters
    if "reuse_cluster" not in parameters:
        parameters["reuse_cluster"] = "True"
    if "start_cluster" not in parameters:
        parameters["start_cluster"] = "True"
    if "cluster_name" not in parameters:
        parameters["cluster_name"] = "lake-user-TestCluster"
    if "terminate_cluster" not in parameters:
        parameters["terminate_cluster"] = "False"
    if "container_concurrency" not in parameters:
        parameters["container_concurrency"] = 1
    if "show_container_log" not in parameters:
        parameters["show_container_log"] = False
    if "core_instance_count" not in parameters:
        parameters["core_instance_count"] = 4
    if "samplingRatio" not in parameters:
        parameters["samplingRatio"] = 0.05

    # Start Spark
    logger.info("Starting Spark cluster")
    livy_url, cluster_id, started = sparkConnection.connect_to_spark(
        parameters["cluster_name"],
        reuseCluster=parameters["reuse_cluster"],
        startCluster=parameters["start_cluster"],
        clusterArgs={"CoreInstanceCount": parameters["core_instance_count"]},
    )
    logger.info(f"Cluster is ready:{livy_url} livy_url:{cluster_id} cluster_id: started:{started}")

    # Get profiling data and print pretty json
    response = glue.get_tables(DatabaseName=parameters["database"], Expression=parameters["table_filter"])
    logger.debug(f"Glue response: {json.dumps(response, indent=4, sort_keys=True, default=str)}")

    if len(response["TableList"]) == 0:
        assert False

    tasks = create_tasks(
        response,
        parameters["target_folder"],
        parameters["database"],
        parameters["samplingRatio"],
    )

    # Running the tasks
    logger.info("Starting to run spark tasks")
    notebooks_to_run: Dict[str, Any] = {
        "compute": {"container": {"p_concurrent": parameters["container_concurrency"]}},
        "tasks": tasks,
        "env_vars": [
            {"name": "cluster_name", "value": parameters["cluster_name"]},
            {"name": "start_cluster", "value": parameters["start_cluster"]},
            {"name": "reuse_cluster", "value": parameters["reuse_cluster"]},
            {"name": "terminate_cluster", "value": parameters["terminate_cluster"]},
        ],
    }

    # Append sns topic if given as parameter
    if "sns_topic" in parameters:
        notebooks_to_run["compute"]["sns.topic.name"] = parameters["sns_topic"]

    # Running the tasks on containers
    t = time.localtime()
    current_time = time.strftime("%H:%M:%S", t)

    if "trigger_name" in parameters:
        if "frequency" not in parameters:
            raise Exception("Missing frequency parameter while a trigger_name was given")
        container = controller.schedule_notebooks(parameters["trigger_name"], parameters["frequency"], notebooks_to_run)
    else:
        container = controller.run_notebooks(notebooks_to_run)

    if isinstance(container, str):
        containers = [container]
    else:
        containers = []
        containers = containers + container

    logger.info(
        f"Task : {current_time}, {str(container)}, --> {notebooks_to_run['tasks'][0]['params']['table_to_profile']}"
    )

    logger.debug(f"Starting time: {datetime.datetime.now()}")
    controller.wait_for_tasks_to_complete(
        containers,
        120,
        int(parameters["total_runtime"] / 120),
        parameters["show_container_log"],
    )
    logger.debug(f"Ending time: {datetime.datetime.now()}")

    # Shutting down Spark cluster
    if started and parameters["terminate_cluster"] == "True":
        logger.info("Shutting down Spark cluster")
        sparkConnection.stop_cluster(cluster_id)

    logger.debug(f"data_profile results are in: {base_path}")

    # Returning the result path and tables that ran on
    tables = []
    for table in response["TableList"]:
        tables.append(table["Name"])
    res = {
        "tables": tables,
        "result_path": f"{base_path}/{parameters['target_folder']}",
    }

    return res