def _handle_bulk_insert_op()

in community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py [0:0]


def _handle_bulk_insert_op(op: Dict, nodes: List[str], resume_data: Optional[ResumeData]) -> None:
    """
    Handles **DONE** BulkInsert operations
    """
    assert op["operationType"] == "bulkInsert" and op["status"] == "DONE", f"unexpected op: {op}"

    group_id = op["operationGroupId"]
    if "error" in op:
        error = op["error"]["errors"][0]
        log.warning(
            f"bulkInsert operation error: {error['code']} name={op['name']} operationGroupId={group_id} nodes={to_hostlist(nodes)}"
        )
        # TODO: does it make sense to query for insert-ops in case of bulkInsert-op error?
    
    created = 0
    for status in op["instancesBulkInsertOperationMetadata"]["perLocationStatus"].values():
        created += status.get("createdVmCount", 0)
    if created == len(nodes):
        log.info(f"created {len(nodes)} instances: nodes={to_hostlist(nodes)}")
        return # no need to gather status of insert-operations.

    # TODO:
    # * don't perform globalOperations aggregateList request to gather insert-operations,
    #   instead use specific locations from `instancesBulkInsertOperationMetadata`,
    #   most of the time single zone should be sufficient.
    # * don't gather insert-operations per bulkInsert request, instead aggregate it across
    #   all bulkInserts (goes one level above this function) 
    successful_inserts, failed_inserts = separate(
        lambda op: "error" in op, get_insert_operations(group_id)
    )
    # Apparently multiple errors are possible... so join with +.
    by_error_inserts = util.groupby_unsorted(
        failed_inserts,
        lambda op: "+".join(err["code"] for err in op["error"]["errors"]),
    )
    for code, failed_ops in by_error_inserts:
        failed_ops = list(failed_ops)
        failed_nodes = [trim_self_link(op["targetLink"]) for op in failed_ops]
        hostlist = util.to_hostlist(failed_nodes)
        log.error(
            f"{len(failed_nodes)} instances failed to start: {code} ({hostlist}) operationGroupId={group_id}"
        )

        msg = "; ".join(
            f"{err['code']}: {err['message'] if 'message' in err else 'no message'}"
            for err in failed_ops[0]["error"]["errors"]
        )
        if code != "RESOURCE_ALREADY_EXISTS":
            down_nodes_notify_jobs(failed_nodes, f"GCP Error: {msg}", resume_data)
        log.error(
            f"errors from insert for node '{failed_nodes[0]}' ({failed_ops[0]['name']}): {msg}"
        )

    ready_nodes = {trim_self_link(op["targetLink"]) for op in successful_inserts}
    if len(ready_nodes) > 0:
        log.info(f"created {len(ready_nodes)} instances: nodes={to_hostlist(ready_nodes)}")