ansible/roles/slurm/files/jobs/submit_workflow.py (64 lines of code) (raw):
#!/usr/bin/env python3
# Copyright (C) SchedMD LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from pathlib import Path
import subprocess
import yaml
def dict_to_kv(dict):
"""convert dict to space-delimited slurm-style key-value pairs"""
return ",".join(
f"{k}={','.join(v) if isinstance(v, list) else v}"
for k, v in dict.items()
if v is not None
)
def run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
timeout=None,
check=True,
universal_newlines=True,
**kwargs,
):
"""Wrapper for subprocess.run() with convenient defaults"""
result = subprocess.run(
args,
stdout=stdout,
stderr=stderr,
shell=shell,
timeout=timeout,
check=check,
universal_newlines=universal_newlines,
**kwargs,
)
return result
def main(config):
try:
env = dict_to_kv(config["stage_in"]["environment"])
except Exception:
env = ""
script = config["stage_in"]["script"]
cmd = f"sbatch --parsable --export=ALL,{env} {script}"
job_id_stage_in = run(cmd, shell=True).stdout.strip()
print(f"stage_in : JobId={job_id_stage_in}")
try:
env = dict_to_kv(config["main"]["environment"])
except Exception:
env = ""
script = config["main"]["script"]
cmd = f"sbatch --parsable --export=ALL,{env} --dependency=afterok:{job_id_stage_in} {script}"
job_id_main = run(cmd, shell=True).stdout.strip()
print(f"main : JobId={job_id_main}")
try:
env = dict_to_kv(config["stage_out"]["environment"])
except Exception:
env = ""
script = config["stage_out"]["script"]
cmd = f"sbatch --parsable --export=ALL,{env} --dependency=afterok:{job_id_main} {script}"
job_id_stage_out = run(cmd, shell=True).stdout.strip()
print(f"stage_out : JobId={job_id_stage_out}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Submits data migration workflow based on yaml."
)
parser.add_argument("config", type=Path, help="yaml configuration")
args = parser.parse_args()
config = yaml.safe_load(open(args.config))
main(config)