in aws_emr_launch/lambda_sources/emr_utilities/run_job_flow/lambda_source.py [0:0]
def handler(event: Dict[str, Any], context: Optional[Dict[str, Any]]) -> None:
try:
logger.info(f"Lambda metadata: {json.dumps(event)} (type = {type(event)})")
cluster_configuration = event["Input"]["Cluster"]
task_token = event.get("TaskToken", None)
cluster_status_lambda = event.get("CheckStatusLambda", None)
fire_and_forget = event.get("FireAndForget", False)
secret_configurations = event["Input"].get("SecretConfigurations", None)
kerberos_attributes_secret = event["Input"].get("KerberosAttributesSecret", None)
rule_name = event.get("RuleName", None)
# NoneType values need to be removed from the cluster_configuration
logger.info(f"Preparing ClusterConfiguration: {json.dumps(cluster_configuration)}")
cluster_configuration = {k: v for k, v in cluster_configuration.items() if v is not None}
cluster_configuration["Instances"] = {
k: v for k, v in cluster_configuration["Instances"].items() if v is not None
}
logger.info(f"Removed NoneType values from ClusterConfiguration: {json.dumps(cluster_configuration)}")
if secret_configurations:
logger.info(f"Getting SecretConfigurations: {json.dumps(secret_configurations)}")
for classification, secret_id in secret_configurations.items():
properties = get_secret_value(secret_id)
cluster_configuration["Configurations"] = update_configurations(
cluster_configuration["Configurations"], classification, properties
)
if kerberos_attributes_secret:
logger.info(f"Getting KerberosAttributesSecret: {json.dumps(kerberos_attributes_secret)}")
kerberos_attributes = get_secret_value(kerberos_attributes_secret)
cluster_configuration["KerberosAttributes"] = {
k: v
for k, v in kerberos_attributes.items()
if k
in [
"Realm",
"KdcAdminPassword",
"ADDomainJoinUser",
"ADDomainJoinPassword",
"CrossRealmTrustPrincipalPassword",
]
}
logger.info("Calling RunJobFlow")
response = emr.run_job_flow(**cluster_configuration)
logger.info(f"Got JobFlow response {json.dumps(response)}")
cluster_id = response["JobFlowId"]
if fire_and_forget:
response["ClusterId"] = cluster_id
logger.info(
f"Sending Task Success, TaskToken: {task_token}, "
f"Output: {json.dumps(response, default=json_serial)}"
)
sfn.send_task_success(taskToken=task_token, output=json.dumps(response, default=json_serial))
else:
target_input = {
"Id": cluster_id,
"Arn": cluster_status_lambda,
"Input": json.dumps(
{
"ClusterId": cluster_id,
"TaskToken": task_token,
"RuleName": rule_name,
"ExpectedState": "WAITING",
}
),
}
logger.info(f"Putting Rule Targets: {json.dumps(target_input)}")
failed_targets = events.put_targets(Rule=rule_name, Targets=[target_input])
if failed_targets["FailedEntryCount"] > 0:
failed_entries = failed_targets["FailedEntries"]
raise Exception(f"Failed Putting Targets: {json.dumps(failed_entries)}")
logger.info(f"Enabling Rule: {rule_name}")
events.enable_rule(Name=rule_name)
except Exception as e:
log_and_raise(e, event)