in aws_emr_launch/lambda_sources/emr_utilities/fail_if_cluster_running/lambda_source.py [0:0]
def handler(event: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
try:
logger.info(f"Lambda metadata: {json.dumps(event)} (type = {type(event)})")
default_fail_if_cluster_running = parse_bool(event.get("DefaultFailIfClusterRunning", False))
# This will work for {"JobInput": {"FailIfClusterRunning": true}} or {"FailIfClusterRunning": true}
fail_if_cluster_running = parse_bool(
event.get("ExecutionInput", event).get("FailIfClusterRunning", default_fail_if_cluster_running)
)
# check if job flow already exists
if fail_if_cluster_running:
cluster_name = event.get("Input", {}).get("Name", "")
cluster_is_running = False
logger.info(f'Checking if job flow "{cluster_name}" is running already')
response = emr.list_clusters(ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"])
for job_flow_running in response["Clusters"]:
jf_name = job_flow_running["Name"]
cluster_id = job_flow_running["Id"]
if jf_name == cluster_name:
logger.info(f"Job flow {cluster_name} is already running: terminate? {fail_if_cluster_running}")
cluster_is_running = True
break
if cluster_is_running and fail_if_cluster_running:
raise ClusterRunningError(
f"Found running Cluster with name {cluster_name}. "
f"ClusterId: {cluster_id}. FailIfClusterRunning is {fail_if_cluster_running}"
)
else:
return cast(Dict[str, Any], event["Input"])
else:
return cast(Dict[str, Any], event["Input"])
except Exception as e:
logger.error(f"Error processing event {json.dumps(event)}")
logger.exception(e)
raise e