cli/run-job-pipeline-all.py (139 lines of code) (raw):
import os
import json
import glob
import argparse
import re
import subprocess
import random
import sys
from tkinter.messagebox import NO
from typing import List
def get_all_files(path, valid_suffix):
"""
Get all files in a directory with a certain suffix
"""
files = []
for suffix in valid_suffix:
files.extend(glob.glob(path + "*/**/*" + suffix, recursive=True))
return files
class Job:
def __init__(self, pipeline_path):
self._pipeline_path = pipeline_path
@property
def pipeline_path(self):
return self._pipeline_path
@property
def pipeline_path_to_write(self):
return "./" + self.pipeline_path.replace("\\", "/")
@property
def name(self):
return os.path.basename(self.pipeline_path)
@property
def directory(self):
return os.path.dirname(self.pipeline_path)
@property
def scripts(self):
scripts = get_all_files(self.directory, [".py", ".R"])
if len(scripts) == 0:
scripts = get_all_files(self.directory, ["component.yml"])
assert len(scripts) > 0, "No scripts found in " + self.directory
return scripts
def update_script(self, random_value):
for script in self.scripts:
with open(script, "r") as f:
content = f.read()
if script.endswith(".py"):
content += f'\nprint("{random_value}")\n'
elif script.endswith(".R"):
content += f'\nprint("{random_value}")\n'
else:
content = content.replace("echo", f"echo {random_value} & echo")
with open(script, "w") as f:
f.write(content)
def recover_script(self):
for script in self.scripts:
with open(script, "r") as f:
content = f.read()
if script.endswith(".py") or script.endswith(".R"):
content = re.sub(f'\nprint\\("[0-9]+"\\)\n', "", content)
else:
while True:
next_content = re.sub("echo [0-9]+ & echo", "echo", content)
if next_content == content:
break
content = next_content
with open(script, "w") as f:
f.write(content)
def get_run_shell(self, experiment_name=None) -> str:
# return "az ml job create --file {}{}".format(
# self.pipeline_path_to_write,
# f" --set experiment_name={experiment_name}" if experiment_name else "",
# )
return "echo {0}\nbash run-job.sh {0}{1}".format(
self.pipeline_path_to_write,
f" {experiment_name} nowait" if experiment_name else "",
)
def get_run_and_wait_shell(self, experiment_name=None) -> str:
return "echo {0}\nbash run-job.sh {0}{1}".format(
self.pipeline_path_to_write,
f" {experiment_name}" if experiment_name else "",
)
class JobSet:
def __init__(self, jobs: List[Job], random_value: str = None) -> None:
self._random_value = random_value
self.jobs = jobs
@property
def random_value(self):
if self._random_value is None:
return "$target_version"
else:
return self._random_value
def update_script(self):
for job in self.jobs:
job.update_script(self.random_value)
def recover_script(self):
for job in self.jobs:
job.recover_script()
@property
def create_dependency_shell(self) -> str:
return """az ml compute create -n cpu-cluster --type amlcompute --min-instances 0 --max-instances 8 -o none
az ml compute create -n gpu-cluster --type amlcompute --min-instances 0 --max-instances 4 --size Standard_NC12 -o none
az ml data create --file assets/data/local-folder.yml --set version={0} -o none
az ml component create --file jobs/pipelines-with-components/basics/1b_e2e_registered_components/train.yml --set version={0} -o none
az ml component create --file jobs/pipelines-with-components/basics/1b_e2e_registered_components/score.yml --set version={0} -o none
az ml component create --file jobs/pipelines-with-components/basics/1b_e2e_registered_components/eval.yml --set version={0} -o none
az ml data create --file jobs/pipelines-with-components/rai_pipeline_adult_analyse/data/data_adult_test.yaml --set version={0} -o none
az ml data create --file jobs/pipelines-with-components/rai_pipeline_adult_analyse/data/data_adult_train.yaml --set version={0} -o none
az ml environment create --file jobs/pipelines-with-components/rai_pipeline_adult_analyse/environment/responsibleai-environment.yaml --set version={0} -o none""".format(
self.random_value
)
def generate_run_all_shell(self, target_path) -> str:
experiment_name = f"cli_samples_v2_{self.random_value}"
shells = [
"""
if [ -z "$1" ]
then
target_version="$RANDOM"
else
target_version=$1
fi""",
self.create_dependency_shell,
]
shells.extend(map(lambda x: x.get_run_shell(experiment_name), self.jobs))
shells[-1] = self.jobs[-1].get_run_and_wait_shell(experiment_name)
shells.append("az --version")
with open(target_path, "w", encoding="utf-8") as run_all_shell_file:
run_all_shell_file.write("\n\n".join(shells))
def main():
if len(sys.argv) >= 3:
random_value = sys.argv[2]
else:
random_value = None
# get list of jobs
jobs = list(
map(
lambda x: Job(x),
get_all_files(
os.path.join(os.path.dirname(__file__), "jobs", "basics"),
["hello-pipeline*.yml"],
),
)
)
jobs.extend(
map(
lambda x: Job(x),
get_all_files(
os.path.join(os.path.dirname(__file__), "jobs", "pipeline"),
["pipeline.yml", "pipeline.yaml"],
),
)
)
print(len(jobs), "pipelines found")
job_set = JobSet(jobs, random_value)
if sys.argv[1] == "update":
job_set.update_script()
elif sys.argv[1] == "recover":
job_set.recover_script()
elif sys.argv[1] == "generate":
job_set.generate_run_all_shell("run-job-pipeline-all.sh")
if __name__ == "__main__":
main()