cookbooks/aws-parallelcluster-slurm/files/default/head_node_checks/check_cluster_ready.py (82 lines of code) (raw):

# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with # the License. A copy of the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and # limitations under the License. import logging import click from common.constants import CLUSTER_CONFIG_DDB_ID from common.ddb_utils import get_cluster_config_records from common.ec2_utils import list_cluster_instance_ids_iterator from common.exceptions import CheckFailedError logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) BATCH_SIZE = 500 def _check_cluster_config_items(instance_ids: [str], items: [{}], expected_config_version: str): missing = [] incomplete = [] wrong = [] if not instance_ids: logger.warning("No instances to check cluster config version for") return missing, incomplete, wrong # Transform DDB items to make it easier to search. # Example: the original items: # [ # { "Id": { "S": "CLUSTER_CONFIG.i-123456789" }, # "Data": { # "M": { # "cluster_config_version": { "HoqyEZYBkMig3gSxaMARUv0NGcG0rrso" }, # "lastUpdateTime": { "2024-01-16 18:59:18 UTC" } # } # } # } # ] # # is transformed into items_by_id: # # { # "CLUSTER_CONFIG.i-123456789": { # "cluster_config_version": { "HoqyEZYBkMig3gSxaMARUv0NGcG0rrso" }, # "lastUpdateTime": { "2024-01-16 18:59:18 UTC" } # } # } items_by_id = {item["Id"]["S"]: item["Data"]["M"] for item in items} for instance_id in instance_ids: key = CLUSTER_CONFIG_DDB_ID.format(instance_id=instance_id) data = items_by_id.get(key) if data is None: missing.append(instance_id) continue cluster_config_version = data.get("cluster_config_version", {}).get("S") if cluster_config_version is None: incomplete.append(instance_id) continue if cluster_config_version != expected_config_version: wrong.append((instance_id, cluster_config_version)) return missing, incomplete, wrong def check_deployed_config_version(cluster_name: str, table_name: str, expected_config_version: str, region: str): """ Verify that every compute/login node in the cluster has deployed the expected config version. The verification is made by checking the config version reported by compute/login nodes on the cluster DDB table. A RuntimeError exception is raised if the check fails. The function is retried and the wait time is expected to be in the interval (cfn_hup_time, 2*cfn_hup_time), where cfn_hup_time is the wait time for the cfn-hup daemon (as of today it is 120 seconds). :param cluster_name: name of the cluster. :param table_name: DDB table to read the deployed config version from. :param expected_config_version: expected config version. :param region: AWS region name (eg: us-east-1). :return: None """ logger.info( "Checking that cluster configuration deployed on cluster nodes for cluster %s is %s", cluster_name, expected_config_version, ) for instance_ids in list_cluster_instance_ids_iterator( cluster_name=cluster_name, node_type=["Compute", "LoginNode"], instance_state=["running"], region=region, ): n_instance_ids = len(instance_ids) if not n_instance_ids: logger.warning("Found empty batch of cluster nodes: nothing to check") continue logger.info("Found batch of %s cluster node(s): %s", n_instance_ids, instance_ids) items = get_cluster_config_records(table_name, instance_ids, region) logger.info("Retrieved %s DDB item(s):\n\t%s", len(items), "\n\t".join([str(i) for i in items])) missing, incomplete, wrong = _check_cluster_config_items(instance_ids, items, expected_config_version) if missing or incomplete or wrong: raise CheckFailedError( f"Check failed due to the following erroneous records:\n" f" * missing records ({len(missing)}): {missing}\n" f" * incomplete records ({len(incomplete)}): {incomplete}\n" f" * wrong records ({len(wrong)}): {wrong}" ) logger.info("Verified cluster configuration for cluster node(s) %s", instance_ids) @click.command(help="Verify that the cluster has completed the deployment of the expected cluster configuration.") @click.option("--cluster-name", required=True, help="Name of the cluster.") @click.option("--table-name", required=True, help="Name of the DDB table.") @click.option("--config-version", required=True, help="Expected cluster config version.") @click.option("--region", required=True, help="Name of AWS region.") def check_cluster_ready(cluster_name: str, table_name: str, config_version: str, region: str): logger.info( "Checking cluster readiness with arguments: cluster_name=%s, table_name=%s, config_version=%s, region=%s", cluster_name, table_name, config_version, region, ) try: check_deployed_config_version(cluster_name, table_name, config_version, region) except CheckFailedError as e: logger.error("Some cluster readiness checks failed: %s", e) raise e except Exception as e: logger.error("Cannot complete the cluster readiness checks due to internal errors: %s", e) raise e logger.info("All checks succeeded!") if __name__ == "__main__": check_cluster_ready() # pylint: disable=no-value-for-parameter