in src/hpc/autoscale/node/nodehistory.py [0:0]
def _update(self, nodes: typing.Iterable[Node]) -> None:
if self.read_only:
return
now = self.now()
rows = list(
self._execute(
"""SELECT node_id, instance_id, hostname, create_time, last_match_time, ready_time, ignore
from nodes where delete_time IS NULL"""
)
)
rows_by_id = partition_single(rows, lambda r: r[0])
nodes_with_ids = [n for n in nodes if n.delayed_node_id.node_id]
nodes_by_id: typing.Dict[typing.Optional[NodeId], Node] = partition_single(
nodes_with_ids,
lambda n: n.delayed_node_id.node_id,
)
to_delete = set(rows_by_id.keys()) - set(nodes_by_id.keys())
for node in nodes:
node_id = node.delayed_node_id.node_id
if node_id not in rows_by_id:
# first time we see it, just put an entry
rows_by_id[node_id] = tuple(
[node_id, node.instance_id, node.hostname, now, now, 0, False]
)
if node.required or node.state != "Ready":
rec = list(rows_by_id[node_id])
rec[-3] = now
rows_by_id[node_id] = tuple(rec)
# if a node is running a job according to the scheduler, assume it
# is 'ready' for boot timeout purposes.
if node.state == "Ready" or node.metadata.get("_running_job_"):
(
node_id,
instance_id,
hostname,
create_time,
match_time,
ready_time,
ignore,
) = rows_by_id[node_id]
if ready_time < 1:
rows_by_id[node_id] = tuple(
[
node_id,
instance_id,
hostname,
create_time,
match_time,
now,
ignore,
]
)
if rows_by_id:
exprs = []
for row in rows_by_id.values():
(
node_id,
instance_id,
hostname,
create_time,
match_time,
ready_time,
ignore,
) = row
ignore_int = SQL_TRUE if ignore else SQL_FALSE
expr = f"('{node_id}', '{instance_id}', '{hostname}', {create_time}, {match_time}, {ready_time}, NULL, {ignore_int})".lower()
exprs.append(expr)
block_size = int(os.getenv("SCALELIB_SQLITE_INSERT_BLOCK", "25"))
for i in range(0, len(exprs), block_size):
sub_exprs = exprs[i : i + block_size]
values_expr = ",".join(sub_exprs)
stmt = "INSERT OR REPLACE INTO nodes (node_id, instance_id, hostname, create_time, last_match_time, ready_time, delete_time, ignore) VALUES {}".format(
values_expr
)
self._execute(stmt)
if to_delete:
to_delete_expr = " OR ".join(
['node_id="{}"'.format(node_id) for node_id in to_delete]
)
now = self.now()
self._execute(
"UPDATE nodes set delete_time={} where {}".format(now, to_delete_expr)
)
self.retire_records(commit=True)