in aws_emr_launch/lambda_sources/emr_utilities/load_cluster_configuration/lambda_source.py [0:0]
def handler(event: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
logger.info(f"Lambda metadata: {json.dumps(event)} (type = {type(event)})")
cluster_name = event.get("ClusterName", "")
tags = event.get("ClusterTags", [])
profile_namespace = event.get("ProfileNamespace", "")
profile_name = event.get("ProfileName", "")
configuration_namespace = event.get("ConfigurationNamespace", "")
configuration_name = event.get("ConfigurationName", "")
if not cluster_name:
cluster_name = configuration_name
try:
emr_profile = get_parameter_value(
ssm_parameter_prefix=PROFILES_SSM_PARAMETER_PREFIX, namespace=profile_namespace, name=profile_name
)
logger.info(f"ProfileFound: {json.dumps(emr_profile)}")
except ClientError as e:
if e.response["Error"]["Code"] == "ParameterNotFound":
log_exception(EMRProfileNotFoundError(f"ProfileNotFound: {profile_namespace}/{profile_name}"), event)
else:
log_exception(e, event)
raise e
try:
cluster_configuration = get_parameter_value(
ssm_parameter_prefix=CONFIGURATIONS_SSM_PARAMETER_PREFIX,
namespace=configuration_namespace,
name=configuration_name,
)
logger.info(f"ConfigurationFound: {json.dumps(cluster_configuration)}")
except ClientError as e:
if e.response["Error"]["Code"] == "ParameterNotFound":
log_exception(
ClusterConfigurationNotFoundError(
f"ConfigurationNotFound: {configuration_namespace}/{configuration_name}"
),
event,
)
else:
log_exception(e, event)
raise e
try:
logs_bucket = emr_profile.get("LogsBucket", None)
logs_path = emr_profile.get("LogsPath", "")
kerberos_attributes_secret = emr_profile.get("KerberosAttributesSecret", None)
secret_configurations = cluster_configuration.get("SecretConfigurations", None)
cluster_configuration = cluster_configuration["ClusterConfiguration"]
cluster_configuration["Name"] = cluster_name
cluster_configuration["LogUri"] = (
os.path.join(f"s3://{logs_bucket}", logs_path, cluster_name) if logs_bucket else None
)
cluster_configuration["JobFlowRole"] = emr_profile["Roles"]["InstanceRole"].split("/")[-1]
cluster_configuration["ServiceRole"] = emr_profile["Roles"]["ServiceRole"].split("/")[-1]
cluster_configuration["AutoScalingRole"] = (
emr_profile["Roles"]["AutoScalingRole"].split("/")[-1]
if cluster_configuration["Instances"].get("InstanceGroups", [])
and len(cluster_configuration["Instances"].get("InstanceGroups", [])) > 0
else None
)
cluster_configuration["Tags"] = tags
cluster_configuration["Instances"]["EmrManagedMasterSecurityGroup"] = emr_profile["SecurityGroups"][
"MasterGroup"
]
cluster_configuration["Instances"]["EmrManagedSlaveSecurityGroup"] = emr_profile["SecurityGroups"][
"WorkersGroup"
]
cluster_configuration["Instances"]["ServiceAccessSecurityGroup"] = emr_profile["SecurityGroups"].get(
"ServiceGroup", None
)
cluster_configuration["SecurityConfiguration"] = emr_profile.get("SecurityConfiguration", None)
# Set a default for new Parameters added to the RunJobFlow API that may
# not be stored on existing ClusterConfigurations
cluster_configuration["ManagedScalingPolicy"] = cluster_configuration.get("ManagedScalingPolicy", None)
cluster = {
"Cluster": cluster_configuration,
"SecretConfigurations": secret_configurations,
"KerberosAttributesSecret": kerberos_attributes_secret,
}
logger.info(f"ClusterConfiguration: {json.dumps(cluster)}")
return cluster
except Exception as e:
log_exception(e, event)
raise e