in sagemaker_run_notebook/emr.py [0:0]
def get_cluster_info(cluster_name, session=None):
"""Get the information about a running cluster so that the processing job can be connected to it.
Args:
cluster_name (str): The name of a running EMR cluster to connect to (required).
session (boto3.Session): The boto3 session to use. Will create a default session if not supplied (default: None).
Returns:
tuple: A tuple with cluster DNS address, the security group, and the subnet.
"""
session = ensure_session(session)
emr = session.client("emr")
id = None
marker = None
while True:
if marker:
marker_args = dict(Marker=marker)
else:
marker_args = {}
list_results = emr.list_clusters(
ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"],
**marker_args
)
marker = list_results.get("Marker")
cluster_info = list(
it.islice(
filter(
lambda c: c.get("Name") == cluster_name, list_results["Clusters"]
),
1,
)
)
if cluster_info:
id = cluster_info[0]["Id"]
break
elif not marker:
break
if not id:
raise RuntimeError('Active cluster named "{}" not found'.format(cluster_name))
desc = emr.describe_cluster(ClusterId=id)
dns_addr = desc["Cluster"]["MasterPublicDnsName"]
sg = desc["Cluster"]["Ec2InstanceAttributes"]["EmrManagedMasterSecurityGroup"]
subnet = desc["Cluster"]["Ec2InstanceAttributes"]["Ec2SubnetId"]
return dns_addr, sg, subnet