def handler()

in aws_emr_launch/lambda_sources/emr_utilities/fail_if_cluster_running/lambda_source.py [0:0]


def handler(event: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:

    try:
        logger.info(f"Lambda metadata: {json.dumps(event)} (type = {type(event)})")
        default_fail_if_cluster_running = parse_bool(event.get("DefaultFailIfClusterRunning", False))

        # This will work for {"JobInput": {"FailIfClusterRunning": true}} or {"FailIfClusterRunning": true}
        fail_if_cluster_running = parse_bool(
            event.get("ExecutionInput", event).get("FailIfClusterRunning", default_fail_if_cluster_running)
        )

        # check if job flow already exists
        if fail_if_cluster_running:
            cluster_name = event.get("Input", {}).get("Name", "")
            cluster_is_running = False
            logger.info(f'Checking if job flow "{cluster_name}" is running already')
            response = emr.list_clusters(ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING"])
            for job_flow_running in response["Clusters"]:
                jf_name = job_flow_running["Name"]
                cluster_id = job_flow_running["Id"]
                if jf_name == cluster_name:
                    logger.info(f"Job flow {cluster_name} is already running: terminate? {fail_if_cluster_running}")
                    cluster_is_running = True
                    break

            if cluster_is_running and fail_if_cluster_running:
                raise ClusterRunningError(
                    f"Found running Cluster with name {cluster_name}. "
                    f"ClusterId: {cluster_id}. FailIfClusterRunning is {fail_if_cluster_running}"
                )
            else:
                return cast(Dict[str, Any], event["Input"])

        else:
            return cast(Dict[str, Any], event["Input"])

    except Exception as e:
        logger.error(f"Error processing event {json.dumps(event)}")
        logger.exception(e)
        raise e