in community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py [0:0]
def _handle_bulk_insert_op(op: Dict, nodes: List[str], resume_data: Optional[ResumeData]) -> None:
"""
Handles **DONE** BulkInsert operations
"""
assert op["operationType"] == "bulkInsert" and op["status"] == "DONE", f"unexpected op: {op}"
group_id = op["operationGroupId"]
if "error" in op:
error = op["error"]["errors"][0]
log.warning(
f"bulkInsert operation error: {error['code']} name={op['name']} operationGroupId={group_id} nodes={to_hostlist(nodes)}"
)
# TODO: does it make sense to query for insert-ops in case of bulkInsert-op error?
created = 0
for status in op["instancesBulkInsertOperationMetadata"]["perLocationStatus"].values():
created += status.get("createdVmCount", 0)
if created == len(nodes):
log.info(f"created {len(nodes)} instances: nodes={to_hostlist(nodes)}")
return # no need to gather status of insert-operations.
# TODO:
# * don't perform globalOperations aggregateList request to gather insert-operations,
# instead use specific locations from `instancesBulkInsertOperationMetadata`,
# most of the time single zone should be sufficient.
# * don't gather insert-operations per bulkInsert request, instead aggregate it across
# all bulkInserts (goes one level above this function)
successful_inserts, failed_inserts = separate(
lambda op: "error" in op, get_insert_operations(group_id)
)
# Apparently multiple errors are possible... so join with +.
by_error_inserts = util.groupby_unsorted(
failed_inserts,
lambda op: "+".join(err["code"] for err in op["error"]["errors"]),
)
for code, failed_ops in by_error_inserts:
failed_ops = list(failed_ops)
failed_nodes = [trim_self_link(op["targetLink"]) for op in failed_ops]
hostlist = util.to_hostlist(failed_nodes)
log.error(
f"{len(failed_nodes)} instances failed to start: {code} ({hostlist}) operationGroupId={group_id}"
)
msg = "; ".join(
f"{err['code']}: {err['message'] if 'message' in err else 'no message'}"
for err in failed_ops[0]["error"]["errors"]
)
if code != "RESOURCE_ALREADY_EXISTS":
down_nodes_notify_jobs(failed_nodes, f"GCP Error: {msg}", resume_data)
log.error(
f"errors from insert for node '{failed_nodes[0]}' ({failed_ops[0]['name']}): {msg}"
)
ready_nodes = {trim_self_link(op["targetLink"]) for op in successful_inserts}
if len(ready_nodes) > 0:
log.info(f"created {len(ready_nodes)} instances: nodes={to_hostlist(ready_nodes)}")