def add_nodes_to_cluster()

in pbspro/src/pbspro/driver.py [0:0]


    def add_nodes_to_cluster(self, nodes: List[Node]) -> List[Node]:
        self.initialize()
        node_history = self.new_node_history(self.config)
        ignored_nodes = node_history.find_ignored()
        ignored_node_ids = [n[0] for n in ignored_nodes if n[0]]

        all_nodes = self.pbscmd.pbsnodes_parsed("-a")
        by_ccnodeid = partition(
            all_nodes, lambda x: x.get("resources_available.ccnodeid")
        )

        ret = []
        for node in nodes:
            if node.metadata.get("_marked_offline_this_iteration_"):
                continue

            if node.delayed_node_id.node_id in ignored_node_ids:
                node.metadata["pbs_state"] = "removed!"
                continue

            if not node.hostname:
                continue

            if not node.private_ip:
                continue

            if node.state == "Failed":
                continue

            # special handling of "keep_offline" created during preprocess_node_mgr
            if "keep_offline" in node.assignments:
                continue

            node_id = node.delayed_node_id.node_id

            if not node_id:
                logging.error("%s does not have a nodeid! Skipping", node)
                continue

            if node_id in by_ccnodeid:
                skip_node = False
                for ndict in by_ccnodeid[node_id]:
                    if ndict["name"].lower() != node.hostname.lower():
                        logging.error(
                            "Duplicate hostname found for the same node id! %s and %s. See 'valid_hostnames' in autoscale as a possible workaround.",
                            node,
                            ndict["name"],
                        )
                        skip_node = True
                        break
                if skip_node:
                    continue

            if not is_valid_hostname(self.config, node):
                continue

            if not self._validate_reverse_dns(node):
                logging.fine(
                    "%s still has a hostname that can not be looked via reverse dns. This should repair itself.",
                    node,
                )
                continue

            if not node.resources.get("ccnodeid"):
                logging.info(
                    "%s is not managed by CycleCloud, or at least 'ccnodeid' is not defined. Ignoring",
                    node,
                )
                continue
            try:
                try:
                    ndicts = self.pbscmd.qmgr_parsed("list", "node", node.hostname)
                    if ndicts and ndicts[0].get("resources_available.ccnodeid"):
                        comment = ndicts[0].get("comment", "")

                        if "offline" in ndicts[0].get("state", "") and (
                            comment.startswith("cyclecloud offline")
                            or comment.startswith("cyclecloud joined")
                            or comment.startswith("cyclecloud restored")
                        ):
                            logging.info(
                                "%s is offline. Setting it back to online", node
                            )
                            self.pbscmd.pbsnodes(
                                "-r", node.hostname, "-C", "cyclecloud restored"
                            )
                        else:
                            logging.fine(
                                "ccnodeid is already defined on %s. Skipping", node
                            )
                        continue
                    # TODO RDH should we just delete it instead?
                    logging.info(
                        "%s already exists in this cluster. Setting resources.", node
                    )
                except CalledProcessError:
                    logging.info(
                        "%s does not exist in this cluster yet. Creating.", node
                    )
                    self.pbscmd.qmgr("create", "node", node.hostname)

                for res_name, res_value in node.resources.items():
                    # we set ccnodeid last, so that we can see that we have completely joined a node
                    # if and only if ccnodeid has been set
                    if res_name == "ccnodeid":
                        continue

                    if res_value is None:
                        continue

                    # TODO RDH track down
                    if res_name == "group_id" and res_value == "None":
                        continue

                    # skip things like host which are useful to set default resources on non-existent
                    # nodes for autoscale packing, but not on actual nodes
                    if res_name in self.read_only_resources:
                        continue

                    if res_name not in self.resource_definitions:
                        # TODO bump to a warning?
                        logging.fine(
                            "%s is an unknown PBS resource for node %s. Skipping this resource",
                            res_name,
                            node,
                        )
                        continue
                    res_value_str: str

                    # pbs size does not support decimals
                    if isinstance(res_value, ht.Size):
                        res_value_str = "{}{}".format(
                            int(res_value.value), res_value.magnitude
                        )
                    elif isinstance(res_value, bool):
                        res_value_str = "1" if bool else "0"
                    else:
                        res_value_str = str(res_value)

                    self.pbscmd.qmgr(
                        "set",
                        "node",
                        node.hostname,
                        "resources_available.{}={}".format(res_name, res_value_str),
                    )

                self.pbscmd.qmgr(
                    "set",
                    "node",
                    node.hostname,
                    "resources_available.{}={}".format(
                        "ccnodeid", node.resources["ccnodeid"]
                    ),
                )
                self.pbscmd.pbsnodes("-r", node.hostname, "-C", "cyclecloud joined")
                ret.append(node)
            except SubprocessError as e:
                logging.error(
                    "Could not fully add %s to cluster: %s. Will attempt next cycle",
                    node,
                    e,
                )

        return ret