def get_validated_securityhub_regions()

in aws_sra_examples/solutions/securityhub/securityhub_enabler_acct/lambda/src/app.py [0:0]


def get_validated_securityhub_regions(user_regions: str, control_tower_regions_only: bool = False):
    """
    Get the SecurityHub regions and check if they are enabled
    :param user_regions: User provided regions
    :param control_tower_regions_only: Control Tower regions only
    :return: validated SecurityHub regions
    """
    enabled_regions = []

    try:
        if user_regions:
            securityhub_regions = [value.strip() for value in user_regions.split(",") if value != '']
        elif control_tower_regions_only:
            cf_client = SESSION.client('cloudformation')
            paginator = cf_client.get_paginator("list_stack_instances")
            region_set = set()
            for page in paginator.paginate(
                StackSetName="AWSControlTowerBP-BASELINE-CLOUDWATCH"
            ):
                for summary in page["Summaries"]:
                    region_set.add(summary["Region"])
            securityhub_regions = list(region_set)
        else:
            securityhub_regions = SESSION.get_available_regions("securityhub")

        logging.info(f"SecurityHub regions: {securityhub_regions}")
    except ClientError as ce:
        logger.error(f"Error getting available regions: {str(ce)}")
        raise

    for region in securityhub_regions:
        sts_client = SESSION.client("sts", region_name=region)
        try:
            sts_client.get_caller_identity()
            enabled_regions.append(region)
        except ClientError as ce:
            if ce.response["Error"]["Code"] == "InvalidClientTokenId":
                logger.info(f"{region} region is disabled")
            else:
                err = ce.response["Error"]
                logger.error(f"Error {err} occurred testing region {region}")
    return enabled_regions