scripts/setup_network_storage.py (225 lines of code) (raw):

#!/usr/bin/env python3 # Copyright (C) SchedMD LLC. # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import stat import time import shutil from pathlib import Path from concurrent.futures import as_completed from addict import Dict as NSDict import util from util import lkp, run, cfg, dirs, separate def mounts_by_local(mounts): """convert list of mounts to dict of mounts, local_mount as key""" return {str(Path(m.local_mount).resolve()): m for m in mounts} def resolve_network_storage(nodeset=None): """Combine appropriate network_storage fields to a single list""" if lkp.instance_role == "compute": try: nodeset = lkp.node_nodeset() except Exception: # External nodename, skip lookup nodeset = None # seed mounts with the default controller mounts if cfg.disable_default_mounts: default_mounts = [] else: default_mounts = [ NSDict( { "server_ip": lkp.control_addr or lkp.control_host, "remote_mount": str(path), "local_mount": str(path), "fs_type": "nfs", "mount_options": "defaults,hard,intr", } ) for path in ( dirs.home, dirs.apps, ) ] # create dict of mounts, local_mount: mount_info mounts = mounts_by_local(default_mounts) # On non-controller instances, entries in network_storage could overwrite # default exports from the controller. Be careful, of course mounts.update(mounts_by_local(cfg.network_storage)) if lkp.instance_role in ("login", "controller"): mounts.update(mounts_by_local(cfg.login_network_storage)) if nodeset is not None: mounts.update(mounts_by_local(nodeset.network_storage)) return list(mounts.values()) def separate_external_internal_mounts(mounts): """separate into cluster-external and internal mounts""" def internal_mount(mount): # NOTE: Valid Lustre server_ip can take the form of '<IP>@tcp' server_ip = mount.server_ip.split("@")[0] mount_addr = util.host_lookup(server_ip) return mount_addr == lkp.control_host_addr return separate(internal_mount, mounts) def setup_network_storage(log): """prepare network fs mounts and add them to fstab""" log.info("Set up network storage") # filter mounts into two dicts, cluster-internal and external mounts all_mounts = resolve_network_storage() ext_mounts, int_mounts = separate_external_internal_mounts(all_mounts) if lkp.instance_role == "controller": mounts = ext_mounts else: mounts = ext_mounts + int_mounts # Determine fstab entries and write them out fstab_entries = [] for mount in mounts: local_mount = Path(mount.local_mount) remote_mount = mount.remote_mount fs_type = mount.fs_type server_ip = mount.server_ip or "" util.mkdirp(local_mount) log.info( "Setting up mount ({}) {}{} to {}".format( fs_type, server_ip + ":" if fs_type != "gcsfuse" else "", remote_mount, local_mount, ) ) mount_options = mount.mount_options.split(",") if mount.mount_options else [] if not mount_options or "_netdev" not in mount_options: mount_options += ["_netdev"] if fs_type == "gcsfuse": fstab_entries.append( "{0} {1} {2} {3} 0 0".format( remote_mount, local_mount, fs_type, ",".join(mount_options) ) ) else: fstab_entries.append( "{0}:{1} {2} {3} {4} 0 0".format( server_ip, remote_mount, local_mount, fs_type, ",".join(mount_options), ) ) fstab = Path("/etc/fstab") if not Path(fstab.with_suffix(".bak")).is_file(): shutil.copy2(fstab, fstab.with_suffix(".bak")) shutil.copy2(fstab.with_suffix(".bak"), fstab) with open(fstab, "a") as f: f.write("\n") for entry in fstab_entries: f.write(entry) f.write("\n") mount_fstab(mounts_by_local(mounts), log) munge_mount_handler(log) def mount_fstab(mounts, log): """Wait on each mount, then make sure all fstab is mounted""" from more_executors import Executors, ExceptionRetryPolicy def mount_path(path): log.info(f"Waiting for '{path}' to be mounted...") try: run(f"mount {path}", timeout=120) except Exception as e: exc_type, _, _ = sys.exc_info() log.error(f"mount of path '{path}' failed: {exc_type}: {e}") raise e log.info(f"Mount point '{path}' was mounted.") MAX_MOUNT_TIMEOUT = 60 * 5 future_list = [] retry_policy = ExceptionRetryPolicy( max_attempts=40, exponent=1.6, sleep=1.0, max_sleep=16.0 ) with Executors.thread_pool().with_timeout(MAX_MOUNT_TIMEOUT).with_retry( retry_policy=retry_policy ) as exe: for path in mounts: future = exe.submit(mount_path, path) future_list.append(future) # Iterate over futures, checking for exceptions for future in as_completed(future_list): try: future.result() except Exception as e: raise e def munge_mount_handler(log): if not cfg.munge_mount: log.error("Missing munge_mount in cfg") elif lkp.instance_role == "controller": return mount = cfg.munge_mount server_ip = ( mount.server_ip if mount.server_ip else (cfg.slurm_control_addr or cfg.slurm_control_host) ) remote_mount = mount.remote_mount local_mount = Path("/mnt/munge") fs_type = mount.fs_type if mount.fs_type is not None else "nfs" mount_options = ( mount.mount_options if mount.mount_options is not None else "defaults,hard,intr,_netdev" ) munge_key = Path(dirs.munge / "munge.key") log.info(f"Mounting munge share to: {local_mount}") local_mount.mkdir() if fs_type.lower() == "gcsfuse".lower(): if remote_mount is None: remote_mount = "" cmd = [ "gcsfuse", f"--only-dir={remote_mount}" if remote_mount != "" else None, server_ip, str(local_mount), ] else: if remote_mount is None: remote_mount = Path("/etc/munge") cmd = [ "mount", f"--types={fs_type}", f"--options={mount_options}" if mount_options != "" else None, f"{server_ip}:{remote_mount}", str(local_mount), ] # wait max 120s for munge mount timeout = 120 for retry, wait in enumerate(util.backoff_delay(0.5, timeout), 1): try: run(cmd, timeout=timeout) break except Exception as e: log.error( f"munge mount failed: '{cmd}' {e}, try {retry}, waiting {wait:0.2f}s" ) time.sleep(wait) err = e continue else: raise err log.info(f"Copy munge.key from: {local_mount}") shutil.copy2(Path(local_mount / "munge.key"), munge_key) log.info("Restrict permissions of munge.key") shutil.chown(munge_key, user="munge", group="munge") os.chmod(munge_key, stat.S_IRUSR) log.info(f"Unmount {local_mount}") if fs_type.lower() == "gcsfuse".lower(): run(f"fusermount -u {local_mount}", timeout=120) else: run(f"umount {local_mount}", timeout=120) shutil.rmtree(local_mount) def setup_nfs_exports(): """nfs export all needed directories""" # The controller only needs to set up exports for cluster-internal mounts # switch the key to remote mount path since that is what needs exporting mounts = resolve_network_storage() # manually add munge_mount mounts.append( NSDict( { "server_ip": cfg.munge_mount.server_ip, "remote_mount": cfg.munge_mount.remote_mount, "local_mount": Path(f"{dirs.munge}_tmp"), "fs_type": cfg.munge_mount.fs_type, "mount_options": cfg.munge_mount.mount_options, } ) ) # controller mounts _, con_mounts = separate_external_internal_mounts(mounts) con_mounts = {m.remote_mount: m for m in con_mounts} for nodeset in cfg.nodeset.values(): # get internal mounts for each nodeset by calling # resolve_network_storage as from a node in each nodeset ns_mounts = resolve_network_storage(nodeset=nodeset) _, int_mounts = separate_external_internal_mounts(ns_mounts) con_mounts.update({m.remote_mount: m for m in int_mounts}) # export path if corresponding selector boolean is True exports = [] for path in con_mounts: util.mkdirp(Path(path)) run(rf"sed -i '\#{path}#d' /etc/exports", timeout=30) exports.append(f"{path} *(rw,no_subtree_check,no_root_squash)") exportsd = Path("/etc/exports.d") util.mkdirp(exportsd) with (exportsd / "slurm.exports").open("w") as f: f.write("\n") f.write("\n".join(exports)) run("exportfs -a", timeout=30)