in scripts/resume.py [0:0]
def resume_nodes(nodes: List[str], resume_data=None):
"""resume nodes in nodelist"""
if not nodes:
log.info("No nodes to resume")
return
if resume_data is None and global_resume_data is not None:
resume_data = global_resume_data.deepcopy()
nodes = sorted(nodes, key=lkp.node_prefix)
grouped_nodes, grouped_tpu_nodes = group_nodes_bulk(nodes, resume_data)
if log.isEnabledFor(logging.DEBUG):
# grouped_nodelists is used in later debug logs too
grouped_nodelists = {
group: to_hostlist(chunk.nodes) for group, chunk in grouped_nodes.items()
}
grouped_tpu_nodelists = {
group: to_hostlist(chunk.nodes)
for group, chunk in grouped_tpu_nodes.items()
}
log.debug(
"node bulk groups: \n{}".format(yaml.safe_dump(grouped_nodelists).rstrip())
)
log.debug(
"TPU node bulk groups: \n{}".format(
yaml.safe_dump(grouped_tpu_nodelists).rstrip()
)
)
tpu_start_data = []
tpu_objs = {}
for group, chunk in grouped_tpu_nodes.items():
# do not create multiple tpu_objs if nodes with the same prefix are used
if chunk.prefix not in tpu_objs.keys():
model = chunk.nodes[0]
tpu_objs[chunk.prefix] = TPU(lkp.node_nodeset(model))
tpu_start_data.append({"tpu": tpu_objs[chunk.prefix], "node": chunk.nodes})
# make all bulkInsert requests and execute with batch
inserts = {
group: create_instances_request(
chunk.nodes, chunk.partition_name, chunk.placement_group, chunk.job_id
)
for group, chunk in grouped_nodes.items()
}
bulk_ops = dict(
zip(inserts.keys(), map_with_futures(ensure_execute, inserts.values()))
)
log.debug(f"bulk_ops={yaml.safe_dump(bulk_ops)}")
started = {
group: op for group, op in bulk_ops.items() if not isinstance(op, Exception)
}
failed = {
group: err for group, err in bulk_ops.items() if isinstance(err, Exception)
}
if failed:
failed_reqs = [str(e) for e in failed.items()]
log.error("bulkInsert API failures: {}".format("; ".join(failed_reqs)))
for ident, exc in failed.items():
down_nodes(grouped_nodes[ident].nodes, f"GCP Error: {exc._get_reason()}")
if log.isEnabledFor(logging.DEBUG):
for group, op in started.items():
group_nodes = grouped_nodelists[group]
name = op["name"]
gid = op["operationGroupId"]
log.debug(
f"new bulkInsert operation started: group={group} nodes={group_nodes} name={name} operationGroupId={gid}"
)
# wait for all bulkInserts to complete and log any errors
bulk_operations = {group: wait_for_operation(op) for group, op in started.items()}
# Start TPU after regular nodes so that regular nodes are not affected by the slower TPU nodes
log.debug(f"tpu_start_data={yaml.safe_dump(tpu_start_data)}")
execute_with_futures(start_tpu, tpu_start_data)
all_successful_inserts = []
for group, bulk_op in bulk_operations.items():
group_id = bulk_op["operationGroupId"]
bulk_op_name = bulk_op["name"]
if "error" in bulk_op:
error = bulk_op["error"]["errors"][0]
group_nodes = to_hostlist_fast(grouped_nodes[group].nodes)
log.warning(
f"bulkInsert operation errors: {error['code']} name={bulk_op_name} operationGroupId={group_id} nodes={group_nodes}"
)
successful_inserts, failed_inserts = separate(
lambda op: "error" in op, get_insert_operations(group_id)
)
# Apparently multiple errors are possible... so join with +.
by_error_inserts = util.groupby_unsorted(
failed_inserts,
lambda op: "+".join(err["code"] for err in op["error"]["errors"]),
)
for code, failed_ops in by_error_inserts:
failed_nodes = {trim_self_link(op["targetLink"]): op for op in failed_ops}
hostlist = util.to_hostlist(failed_nodes)
count = len(failed_nodes)
log.error(
f"{count} instances failed to start: {code} ({hostlist}) operationGroupId={group_id}"
)
failed_node, failed_op = next(iter(failed_nodes.items()))
msg = "; ".join(
f"{err['code']}: {err['message'] if 'message' in err else 'no message'}"
for err in failed_op["error"]["errors"]
)
if code != "RESOURCE_ALREADY_EXISTS":
down_nodes(hostlist, f"GCP Error: {msg}")
log.error(
f"errors from insert for node '{failed_node}' ({failed_op['name']}): {msg}"
)
ready_nodes = {trim_self_link(op["targetLink"]) for op in successful_inserts}
if len(ready_nodes) > 0:
ready_nodelist = to_hostlist_fast(ready_nodes)
log.info(f"created {len(ready_nodes)} instances: nodes={ready_nodelist}")
all_successful_inserts.extend(successful_inserts)