def handler()

in automation/lambda/handlers/index.py [0:0]


def handler(event, context):
    logger.info(f'helper received event: {json.dumps(event)}')

    response_data = dict({
        "Data": "DONE",
        "Status": "SUCCESS",
        "CustomResourcePhysicalID": None
    })

    response_data["CustomResourcePhysicalID"] = \
        context.aws_request_id if context and context.aws_request_id else uuid.uuid4()

    resource_properties = event.get('ResourceProperties')
    resource_type = event.get("ResourceType")

    if resource_type == "Custom::SendSecrets":
        region_name = resource_properties.get("RegionName")
        secret_name = resource_properties.get("ESSecretName")
        topic = resource_properties.get("Topic")
        kibana_url = resource_properties.get("KibanaURL")

        if event["RequestType"] == "Create":
            secrets = get_secret(secret_name, None, region_name)
            if secrets:
                secrets = json.loads(secrets)
            message = f"AWS LogAggregator Kibana Dashboard URL: {kibana_url} and secrets are as below \n" \
                      f" Username: {secrets['username']} \n Password:{secrets['password']}\n "
            sns_client.publish(
                TopicArn=topic,
                Message=message,
                Subject='AWS LogAggregator Kibana Dashboard'
            )
        if event["RequestType"] == "Delete":
            pass

    elif resource_type == "Custom::GenerateConfig":
        region = resource_properties.get("RegionName")
        dynamodb_table = resource_properties.get("ConfigTable")
        cluster_id = resource_properties.get("EMRClusterId")

        if event["RequestType"] == "Create":
            config = prepare_log_config(event)
            dynamodb_client = boto3.resource("dynamodb", region)
            table = dynamodb_client.Table(dynamodb_table)
            table.put_item(Item=config)
        if event["RequestType"] == "Delete":
            response_data["CustomResourcePhysicalID"] = event["PhysicalResourceId"]
            try:
                emr_client = session.client('emr')
                emr_client.set_termination_protection(
                    JobFlowIds=[cluster_id],
                    TerminationProtected=False
                )
            except Exception:
                pass

    else:
        new_step_id = None
        step_id = None
        topic = None
        # Get event name to confirm whether this is an add step workflow and cancel step workflow
        for record in event['Records']:

            event_name = record['eventName']
            event_source = record['eventSource']

            if event_source == 'aws:dynamodb':

                # Add step workflow via DynamoDB insert or modify trigger
                if event_name in ['INSERT', 'MODIFY']:

                    # Extract log source name and type
                    source_name = record['dynamodb']['Keys']['source_name']['S']
                    source_type = record['dynamodb']['NewImage']['source_type']['S']

                    # Get dynamodb name
                    dynamodb_table = record['eventSourceARN'].split('/')[1]

                    # Confirm if config_json has been provided
                    try:
                        config_json = record['dynamodb']['NewImage']['config_json']['M']
                        cluster_id = record['dynamodb']['NewImage']['cluster_id']['S']
                        region = record['dynamodb']['NewImage']['region']['S']
                        topic = record['dynamodb']['NewImage']['topic']['S']
                    except Exception as e:
                        logger.error(e)
                        error_message = f'Failed to validate Config JSON for {source_name}. ' \
                                        f'config_json has to be provided in the DynamoDB item as a map. ' \
                                        f'String will not be accepted.'

                        # Notify admin if failed to prepare log config json
                        sns_client.publish(
                            TopicArn=topic,
                            Message=error_message,
                            Subject='Error: Config JSON',
                        )
                        return None

                    emr_client = session.client('emr')
                    # Prepare inputs for state machine / step functions
                    # Validate if EMR cluster (by cluster ID) is running
                    try:
                        emr_client.describe_cluster(ClusterId=cluster_id)
                    except Exception as e:
                        logger.error(str(e))
                        error_message = f'Not able to find any running EMR cluster {cluster_id}'
                        sns_client.publish(
                            TopicArn=topic,
                            Message=error_message,
                            Subject='Error: EMR Cluster not found.',
                        )
                        return None

                    # INFO: Duplicate step check
                    logger.info('Confirm if an EMR step with name \'' + source_name + '\' has been added before')
                    step_id = get_step_id_by_name(source_name, cluster_id, emr_client)

                    if step_id:
                        error_message = f'Step {source_name} already been added as step ID {step_id}'
                        logger.info(error_message)

                        # Prepare notification message and send over to Admin
                        if topic:
                            sns_client.publish(
                                TopicArn=topic,
                                Message=error_message,
                                Subject='Config JSON error',
                            )
                        return None

                    else:
                        logger.info(f'EMR step with name {source_name} has not been added')

                    # Set termination protection for EMR cluster
                    logger.info('Turn on termination protection for EMR cluster')
                    emr_client.set_termination_protection(
                        JobFlowIds=[cluster_id],
                        TerminationProtected=True
                    )
                    # Get spark submit arguments if available
                    try:
                        spark_submit_args = ('spark-submit ' + record['dynamodb']['NewImage']['spark_submit_args']['S'])
                    except Exception as e:
                        logger.info(
                            'Spark submit arguement was not provided by user. Default spark submit arguement will be applied')

                    # Add job/step to EMR cluster
                    new_step_id = submit_emr_step(dynamodb_table, source_name, source_type, spark_submit_args,
                                                  cluster_id, emr_client, region)
                    logger.info('New step ID ' + new_step_id)

                    message = f'New Job by {new_step_id} is being added to Cluster {cluster_id}'

                    # TODO: iftik, send step ID over to step functions for monitoring

                    # Notify admin of newly added EMR log job.
                    sns_client.publish(
                        TopicArn=topic,
                        Message=message,
                        Subject='Info:  New Job added',
                    )

                    # Terminate this Lambda execution after sending the message to Step Function
                    return None

                # Cancel step workflow. To be defined.
                elif event_name == 'REMOVE':
                    logger.info('REMOVED ' + record['dynamodb']['Keys']['source_name']['S'])

            response_data["CustomResourcePhysicalID"] = new_step_id or step_id

        return None

    send(event, context, response_data["Status"], response_data,
         physical_resource_id=response_data["CustomResourcePhysicalID"])