in pbspro/src/pbspro/driver.py [0:0]
def add_nodes_to_cluster(self, nodes: List[Node]) -> List[Node]:
self.initialize()
node_history = self.new_node_history(self.config)
ignored_nodes = node_history.find_ignored()
ignored_node_ids = [n[0] for n in ignored_nodes if n[0]]
all_nodes = self.pbscmd.pbsnodes_parsed("-a")
by_ccnodeid = partition(
all_nodes, lambda x: x.get("resources_available.ccnodeid")
)
ret = []
for node in nodes:
if node.metadata.get("_marked_offline_this_iteration_"):
continue
if node.delayed_node_id.node_id in ignored_node_ids:
node.metadata["pbs_state"] = "removed!"
continue
if not node.hostname:
continue
if not node.private_ip:
continue
if node.state == "Failed":
continue
# special handling of "keep_offline" created during preprocess_node_mgr
if "keep_offline" in node.assignments:
continue
node_id = node.delayed_node_id.node_id
if not node_id:
logging.error("%s does not have a nodeid! Skipping", node)
continue
if node_id in by_ccnodeid:
skip_node = False
for ndict in by_ccnodeid[node_id]:
if ndict["name"].lower() != node.hostname.lower():
logging.error(
"Duplicate hostname found for the same node id! %s and %s. See 'valid_hostnames' in autoscale as a possible workaround.",
node,
ndict["name"],
)
skip_node = True
break
if skip_node:
continue
if not is_valid_hostname(self.config, node):
continue
if not self._validate_reverse_dns(node):
logging.fine(
"%s still has a hostname that can not be looked via reverse dns. This should repair itself.",
node,
)
continue
if not node.resources.get("ccnodeid"):
logging.info(
"%s is not managed by CycleCloud, or at least 'ccnodeid' is not defined. Ignoring",
node,
)
continue
try:
try:
ndicts = self.pbscmd.qmgr_parsed("list", "node", node.hostname)
if ndicts and ndicts[0].get("resources_available.ccnodeid"):
comment = ndicts[0].get("comment", "")
if "offline" in ndicts[0].get("state", "") and (
comment.startswith("cyclecloud offline")
or comment.startswith("cyclecloud joined")
or comment.startswith("cyclecloud restored")
):
logging.info(
"%s is offline. Setting it back to online", node
)
self.pbscmd.pbsnodes(
"-r", node.hostname, "-C", "cyclecloud restored"
)
else:
logging.fine(
"ccnodeid is already defined on %s. Skipping", node
)
continue
# TODO RDH should we just delete it instead?
logging.info(
"%s already exists in this cluster. Setting resources.", node
)
except CalledProcessError:
logging.info(
"%s does not exist in this cluster yet. Creating.", node
)
self.pbscmd.qmgr("create", "node", node.hostname)
for res_name, res_value in node.resources.items():
# we set ccnodeid last, so that we can see that we have completely joined a node
# if and only if ccnodeid has been set
if res_name == "ccnodeid":
continue
if res_value is None:
continue
# TODO RDH track down
if res_name == "group_id" and res_value == "None":
continue
# skip things like host which are useful to set default resources on non-existent
# nodes for autoscale packing, but not on actual nodes
if res_name in self.read_only_resources:
continue
if res_name not in self.resource_definitions:
# TODO bump to a warning?
logging.fine(
"%s is an unknown PBS resource for node %s. Skipping this resource",
res_name,
node,
)
continue
res_value_str: str
# pbs size does not support decimals
if isinstance(res_value, ht.Size):
res_value_str = "{}{}".format(
int(res_value.value), res_value.magnitude
)
elif isinstance(res_value, bool):
res_value_str = "1" if bool else "0"
else:
res_value_str = str(res_value)
self.pbscmd.qmgr(
"set",
"node",
node.hostname,
"resources_available.{}={}".format(res_name, res_value_str),
)
self.pbscmd.qmgr(
"set",
"node",
node.hostname,
"resources_available.{}={}".format(
"ccnodeid", node.resources["ccnodeid"]
),
)
self.pbscmd.pbsnodes("-r", node.hostname, "-C", "cyclecloud joined")
ret.append(node)
except SubprocessError as e:
logging.error(
"Could not fully add %s to cluster: %s. Will attempt next cycle",
node,
e,
)
return ret