in aws_emr_launch/lambda_sources/emr_utilities/check_cluster_status/lambda_source.py [0:0]
def handler(event: Dict[str, Any], context: Optional[Dict[str, Any]]) -> None:
logger.info(f"Lambda metadata: {json.dumps(event)} (type = {type(event)})")
cluster_id = event["ClusterId"]
task_token = event["TaskToken"]
rule_name = event["RuleName"]
expected_state = event["ExpectedState"]
try:
cluster_description = emr.describe_cluster(ClusterId=cluster_id)
state = cluster_description["Cluster"]["Status"]["State"]
if state == expected_state:
success = True
elif state in ["TERMINATING", "TERMINATED", "TERMINATED_WITH_ERRORS"]:
success = False
else:
heartbeat = {
"ClusterId": cluster_id,
"TaskToken": task_token,
"ClusterState": state,
"ExpectedState": expected_state,
}
logger.info(f"Sending Task Heartbeat: {heartbeat}")
sfn.send_task_heartbeat(taskToken=task_token)
return
cluster_description["ClusterId"] = cluster_id
if success:
logger.info(
f"Sending Task Success, TaskToken: {task_token}, "
f"Output: {json.dumps(cluster_description, default=json_serial)}"
)
sfn.send_task_success(taskToken=task_token, output=json.dumps(cluster_description, default=json_serial))
else:
logger.info(
f"Sending Task Failure,TaskToken: {task_token}, "
f"Output: {json.dumps(cluster_description, default=json_serial)}"
)
sfn.send_task_failure(
taskToken=task_token,
error="States.TaskFailed",
cause=json.dumps(cluster_description, default=json_serial),
)
task_token = None
logger.info(f"Removing Rule Targets: {cluster_id}")
failed_targets = events.remove_targets(Rule=rule_name, Ids=[cluster_id])
if failed_targets["FailedEntryCount"] > 0:
failed_entries = failed_targets["FailedEntries"]
raise Exception(f"Failed Removing Targets: {json.dumps(failed_entries)}")
targets = events.list_targets_by_rule(Rule=rule_name)["Targets"]
if len(targets) == 0:
logger.info(f"Disabling Rule with no Targets: {rule_name}")
events.disable_rule(Name=rule_name)
except Exception as e:
try:
if task_token:
logger.error(f"Sending TaskFailure: {task_token}")
sfn.send_task_failure(taskToken=task_token, error="States.TaskFailed", cause=str(e))
logger.error(f"Removing Rule Targets: {cluster_id}")
events.remove_targets(Rule=rule_name, Ids=[cluster_id])
except Exception as ee:
logger.exception(ee)
log_exception(e, event)
raise e