in hpcpack-autoscaler/src/cyclecloud-hpcpack/hpcnodehistory.py [0:0]
def synchronize(self, cc_nodes: Iterable[Node], hpc_nodes: Iterable[HpcNode]):
nhi_by_cc_id = partition_single(self.items, func = lambda n: n.cc_id)
now = datetime.utcnow()
# Refresh node history items with CC node list
for cc_node in cc_nodes:
nhi = nhi_by_cc_id.get(cc_node.delayed_node_id.node_id)
if nhi is None:
nhi = NodeHistoryItem(cc_node.delayed_node_id.node_id, cc_node.hostname)
self.insert(nhi)
nhi_by_cc_id[nhi.cc_id] = nhi
else:
if not nhi.hostname:
nhi.hostname = cc_node.hostname
elif not ci_equals(nhi.hostname, cc_node.hostname):
logging.warning("Node hostname changed for node {}, {} => {}".format(nhi.cc_id, nhi.hostname, cc_node.hostname))
# Somehow the node hostname changed, should not happen
# if the orig host name still in HPC node list, we shall remove the HPC node
if nhi.hpc_id:
hpc_node = ci_find_one(hpc_nodes, nhi.hpc_id, target_func=lambda n: n.id)
if hpc_node is not None:
hpc_node.cc_node_id = nhi.cc_id
self.__items.remove(nhi)
self.__items_to_archive.append(nhi)
nhi = NodeHistoryItem(cc_node.delayed_node_id.node_id, cc_node.hostname, nhi.emerge_time)
self.insert(nhi)
nhi_by_cc_id[nhi.cc_id] = nhi
if ci_equals(cc_node.target_state, 'Deallocated') or ci_equals(cc_node.target_state, 'Terminated'):
cc_node.create_time_remaining = self.__provisioning_timeout
cc_node.idle_time_remaining = self.__idle_timeout
if not nhi.stopped:
nhi.stop_time = now
else:
if nhi.stopped:
nhi.restart()
cc_node.create_time_unix = nhi.start_time.timestamp()
cc_node.create_time_remaining = max(0, self.__provisioning_timeout + cc_node.create_time_unix - now.timestamp())
if nhi.idle_from is None:
cc_node.idle_time_remaining = self.__idle_timeout
else:
cc_node.idle_time_remaining = max(0, self.__idle_timeout + nhi.idle_from.timestamp() - now.timestamp())
# Bound hpc nodes with CC nodes as per the info in node history
cc_node_by_id: Dict[str, Node] = partition_single(cc_nodes, func=lambda n: n.delayed_node_id.node_id)
nhi_by_hpc_id: Dict[str, NodeHistoryItem] = partition_single([nhi for nhi in self.__items if nhi.hpc_id], func = lambda n: n.hpc_id)
for hpc_node in hpc_nodes:
if hpc_node.is_cc_node:
continue
nhi = nhi_by_hpc_id.pop(hpc_node.id, None)
if nhi is not None:
hpc_node.cc_node_id = nhi.cc_id
hpc_node.idle_from = nhi.idle_from
hpc_node.bound_cc_node = cc_node_by_id.get(nhi.cc_id)
# For the nodes already removed from HPC Pack side, if they still exist in CC side
# We shall reset the hpc_id for the node history item
for nhi in nhi_by_hpc_id.values():
if ci_in(nhi.cc_id, cc_node_by_id):
nhi.reset_hpc_id()
hpc_node_to_bound = [n for n in hpc_nodes if not n.is_cc_node]
nhi_to_bound_with_hpc = [nhi for nhi in self.__items if nhi.hostname and not nhi.hpc_id]
if len(hpc_node_to_bound) > 0 and len(nhi_to_bound_with_hpc) > 0:
candidate_nhi = [nhi for nhi in nhi_to_bound_with_hpc if ci_in(nhi.cc_id, cc_node_by_id)]
candidate_nhi.extend([nhi for nhi in nhi_to_bound_with_hpc if not ci_in(nhi.cc_id, cc_node_by_id)])
# Map the HPC nodes with CC nodes by hostname
for hpc_node in hpc_node_to_bound:
# First search in active node history items
match_nhi = ci_find_one(candidate_nhi, hpc_node.name, target_func=lambda n: n.hostname)
if match_nhi:
match_nhi.reset_hpc_id(hpc_node.id)
hpc_node.cc_node_id = match_nhi.cc_id
hpc_node.bound_cc_node = cc_node_by_id.get(match_nhi.cc_id)
# Refresh the node history items, archive the stale items
hpc_ids = [hpc_node.id for hpc_node in hpc_nodes]
self.__items_to_archive.extend([nhi for nhi in self.__items if ci_notin(nhi.cc_id, cc_node_by_id) and ci_notin(nhi.hpc_id, hpc_ids)])
self.__items[:] = [nhi for nhi in self.__items if ci_in(nhi.cc_id, cc_node_by_id) or ci_in(nhi.hpc_id, hpc_ids)]