def process_cloudformation_event()

in aws_sra_examples/solutions/ec2/ec2_default_ebs_encryption/lambda/src/app.py [0:0]


def process_cloudformation_event(event: Dict[str, Any], context: Any) -> str:
    """Process Event from AWS CloudFormation.

    Args:
        event: event data
        context: runtime information

    Returns:
        AWS CloudFormation physical resource id
    """
    request_type = event["RequestType"]
    LOGGER.info(f"{request_type} Event")

    params = get_validated_parameters(event)
    set_configuration_ssm_parameters(MANAGEMENT_ACCOUNT_SESSION, params)
    control_tower_regions_only = (params.get("CONTROL_TOWER_REGIONS_ONLY", "true")).lower() in "true"

    if params["action"] in ("Add"):
        account_ids = get_all_organization_accounts()
        available_regions = get_enabled_regions(
            customer_regions=params.get("ENABLED_REGIONS", ""), control_tower_regions_only=control_tower_regions_only
        )
        if len(available_regions) > 0:
            thread_cnt = MAX_THREADS
            if MAX_THREADS > len(account_ids):
                thread_cnt = max(len(account_ids) - 2, 1)

            processes = []
            with ThreadPoolExecutor(max_workers=thread_cnt) as executor:
                for account_id in account_ids:
                    processes.append(
                        executor.submit(
                            process_enable_ebs_encryption_by_default,
                            MANAGEMENT_ACCOUNT_SESSION,
                            params["ROLE_TO_ASSUME"],
                            params["ROLE_SESSION_NAME"],
                            account_id,
                            available_regions,
                        )
                    )
                for future in as_completed(processes, timeout=60):
                    try:
                        future.result()
                    except Exception as error:
                        LOGGER.error(f"{error}")
                        raise ValueError(f"There was an error updating the EC2 default EBS encryption setting")
        else:
            LOGGER.info("No valid enabled regions provided.")
    else:
        delete_ssm_parameter(SSM_CLIENT, SSM_PARAMETER_PREFIX)

    return f"EC2DefaultEBSEncryption-{params['ROLE_TO_ASSUME']}-{params['ROLE_SESSION_NAME']}-{len(params.get('ENABLED_REGIONS','').strip())}"