def check_cluster_status()

in source/idea/idea-administrator/src/ideaadministrator/app_main.py [0:0]


def check_cluster_status(cluster_name: str, aws_region: str, aws_profile: str, wait: bool, wait_timeout: int, debug: bool, module_set: str):
    """
    check status for all applicable cluster endpoints
    """

    def check_status(endpoint_url):
        def get_status() -> bool:
            try:
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    result = requests.get(url=endpoint_url, verify=False)  # nosec B501
                    if debug:
                        print(f'{endpoint_url} - {result.status_code} - {Utils.to_json(result.text)}')
                    if result.status_code == 200:
                        return True
                    else:
                        return False
            except Exception as e:
                if debug:
                    print(f'{e}')
                return False

        return get_status

    cluster_config = ClusterConfig(
        cluster_name=cluster_name,
        aws_region=aws_region,
        aws_profile=aws_profile,
        module_set=module_set
    )

    context = SocaCliContext()
    module_metadata = ModuleMetadataHelper()

    cluster_endpoint = cluster_config.get_cluster_external_endpoint()
    cluster_modules = cluster_config.db.get_cluster_modules()

    endpoints = []

    for cluster_module in cluster_modules:
        module_id = cluster_module['module_id']
        module_name = cluster_module['name']
        module_type = cluster_module['type']
        if module_type == constants.MODULE_TYPE_APP:
            url = f'{cluster_endpoint}/{module_id}/healthcheck'
            endpoints.append({
                'name': module_metadata.get_module_title(module_name),
                'endpoint': url,
                'check_status': check_status(url)
            })

    current_time = Utils.current_time_ms()
    end_time = current_time + (wait_timeout * 1000)
    fail_count = 0
    keyboard_interrupt = False

    while current_time < end_time:

        context.info(f'checking endpoint status for cluster: {cluster_name}, url: {cluster_endpoint} ...')

        table = PrettyTable(['Module', 'Endpoint', 'Status'])
        table.align = 'l'

        fail_count = 0
        for endpoint in endpoints:
            success = endpoint['check_status']()
            if success:
                status = 'SUCCESS'
            else:
                status = 'FAIL'
                fail_count += 1

            table.add_row([
                endpoint['name'],
                endpoint['endpoint'],
                status
            ])

        print(table)

        if not wait:
            break

        if fail_count == 0:
            break

        print('failed to verify all cluster endpoints. wait ... (Press Ctrl + C to exit) ')
        try:
            time.sleep(60)
        except KeyboardInterrupt:
            keyboard_interrupt = True
            print('Check-endpoint status aborted.')
            break
        current_time = Utils.current_time_ms()

    if not keyboard_interrupt:
        if wait and current_time >= end_time:
            context.warning('check endpoint status timed-out. please verify your cluster\'s External ALB Security Group configuration and check correct ingress rules have been configured.')

        if fail_count > 0:
            raise SystemExit(1)