def get_cluster_info()

in sagemaker_run_notebook/emr.py [0:0]


def get_cluster_info(cluster_name, session=None):
    """Get the information about a running cluster so that the processing job can be connected to it.

    Args:
        cluster_name (str): The name of a running EMR cluster to connect to (required).
        session (boto3.Session): The boto3 session to use. Will create a default session if not supplied (default: None).

    Returns:
        tuple: A tuple with cluster DNS address, the security group, and the subnet.
    """
    session = ensure_session(session)
    emr = session.client("emr")

    id = None
    marker = None

    while True:
        if marker:
            marker_args = dict(Marker=marker)
        else:
            marker_args = {}
        list_results = emr.list_clusters(
            ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"],
            **marker_args
        )
        marker = list_results.get("Marker")
        cluster_info = list(
            it.islice(
                filter(
                    lambda c: c.get("Name") == cluster_name, list_results["Clusters"]
                ),
                1,
            )
        )
        if cluster_info:
            id = cluster_info[0]["Id"]
            break
        elif not marker:
            break

    if not id:
        raise RuntimeError('Active cluster named "{}" not found'.format(cluster_name))

    desc = emr.describe_cluster(ClusterId=id)
    dns_addr = desc["Cluster"]["MasterPublicDnsName"]
    sg = desc["Cluster"]["Ec2InstanceAttributes"]["EmrManagedMasterSecurityGroup"]
    subnet = desc["Cluster"]["Ec2InstanceAttributes"]["Ec2SubnetId"]

    return dns_addr, sg, subnet