k8s/cmds.py (234 lines of code) (raw):

from collections import namedtuple import subprocess import jinja2 import click import os import signal import textwrap Shell = namedtuple("Shell", ["cmd", "desc"]) BackgroundShell = namedtuple("BackgroundShell", ["cmd", "desc"]) Template = namedtuple("Template", ["path", "desc"]) ChangeDir = namedtuple("ChangeDir", ["path", "desc"]) Venv = namedtuple("Venv", ["cmd", "path", "desc"]) MY_DIR = os.path.dirname(os.path.abspath(__file__)) # TODO: assert that commands we require like python3, jq, curl, are all present cmds = { "echo": [ Shell("echo hello 1", "echoing first"), Shell("echo hello 2", "echoing second"), Shell("bad_command_garbage", "Something that will fail"), Shell("echo hello 3", "echoing third which we wont see"), ], "k3s_setup": [ Shell( """sudo curl -sfL https://get.k3s.io | {{ k3s_url if k3s_url else "" }} {{ k3s_token if k3s_token else ""}} sh -s -""", "Installing K3s", ), Shell( "sudo chmod a+r /etc/rancher/k3s/k3s.yaml", "Allow read access to chmod a+r /etc/rancher/k3s/k3s.yaml", ), Shell( "kubectl get all", "Check for access to cluster", ), Shell( "curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash", "Installing Helm", ), Shell( "helm --kubeconfig /etc/rancher/k3s/k3s.yaml repo add kuberay https://ray-project.github.io/kuberay-helm/", "Adding kube ray helm repo", ), Shell( "helm --kubeconfig /etc/rancher/k3s/k3s.yaml repo add spark-operator https://kubeflow.github.io/spark-operator", "Adding spark operator helm repo", ), Shell("helm repo update", "Updating helm repos"), Shell( "helm --kubeconfig /etc/rancher/k3s/k3s.yaml install kuberay-operator kuberay/kuberay-operator --version 1.3.0 --wait", "Installing kuberay-operator", ), Shell( """helm --kubeconfig /etc/rancher/k3s/k3s.yaml install --set-json='controller.env=[{"name":"SPARK_SUBMIT_OPTS","value":"-Divy.cache.dir=/tmp/ivy2/cache -Divy.home=/tmp/ivy2"}]' spark-operator spark-operator/spark-operator""", "Installing spark-operator", ), Template("pvcs.yaml.template", "rewrite pvcs.yaml.template"), Shell("kubectl apply -f pvcs.yaml", "Apply pvcs"), ], "generate": [ Shell( "mkdir -p /data/sf{{scale_factor}}", "make directory /data/sf{{scale_factor}}", ), Shell( "python {{ MY_DIR }}/../tpch/make_data.py {{scale_factor}} {{partitions}} {{data_path}}/sf{{scale_factor}} {{pool_size}}", "generate data", ), ], "bench_spark": [ Template("spark_job.yaml.template", "rewrite spark_job.yaml.template"), Shell( "cp {{ MY_DIR }}/spark_tpcbench.py {{ output_path }}", "copy spark_tpcbench.py to data_path dir", ), Shell( "cp -a {{ MY_DIR }}/../tpch/queries {{ output_path }}", "copy tpch queries to data_path dir", ), Shell( """ {% if data_path.startswith("s3") %} wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ aws s3 cp aws-java-sdk-bundle-1.12.262.jar {{ data_path.replace('s3a','s3') }}/aws-java-sdk-bundle-1.12.262.jar && \ aws s3 cp hadoop-aws-3.3.4.jar {{ data_path.replace('s3a','s3') }}/hadoop-aws-3.3.4.jar {% else %} wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ mv aws-java-sdk-bundle-1.12.262.jar {{ data_path }}/aws-java-sdk-bundle-1.12.262.jar && \ mv hadoop-aws-3.3.4.jar {{ data_path }}/hadoop-aws-3.3.4.jar {% endif %} """, "getting additional spark jars", ), Shell( "kubectl apply -f spark_job.yaml", "Submit spark job", ), Shell( """ while true; do sleep 10 STATE=$(kubectl get sparkapp/spark-tpch-bench -o json |jq -r '.status.applicationState.state') echo "Checking on job status...got $STATE looking for COMPLETED" if [[ $STATE != "RUNNING" ]]; then break fi done """, "checking on job status", ), Shell( "kubectl delete -f spark_job.yaml", "tear down job", ), ], "bench_df_ray": [ Template("ray_cluster.yaml.template", "rewrite ray_cluster.yaml.template"), Shell( "kubectl apply -f ray_cluster.yaml", "deploying ray cluster", ), Shell( "kubectl wait raycluster/datafusion-ray-cluster --for='jsonpath={.status.state}'=ready --timeout=300s", "wait for ray cluster to be ready", ), Template("requirements.txt.template", "rewrite requirements.txt.template"), Template("ray_job.sh.template", "rewrite ray_job.sh.template"), BackgroundShell( "kubectl port-forward svc/datafusion-ray-cluster-head-svc 8265:8265", "port forwarding from cluster", ), Shell( "cp {{ MY_DIR }}/../tpch/tpcbench.py .", "copy tpcbench.py to .", ), Shell( "cp -a {{ MY_DIR }}/../tpch/queries .", "copy tpch queries to .", ), Shell( ". ./ray_job.sh", "running ray job", ), Shell( "kubectl delete -f ray_cluster.yaml", "tear down ray cluster", ), ], } class Runner: def __init__(self, dry_run: bool = False, verbose: bool = False): self.dry_run = dry_run self.verbose = verbose self.cwd = os.getcwd() self.venv: str | None = None self.backgrounded = [] def set_cwd(self, path: str): if os.path.isabs(path): self.cwd = path else: self.cwd = os.path.join(self.cwd, path) def activate_venv(self, path: str): self.venv = path def run_commands( self, commands: list[dict[str, str]], substitutions: dict[str, str] | None = None, ): if not substitutions: substitutions = {} substitutions["MY_DIR"] = MY_DIR for command in commands: match (self.dry_run, command): case (False, Shell(cmd, desc)): self.run_shell_command(textwrap.dedent(cmd), desc, substitutions) case (True, Shell(cmd, desc)): click.secho(f"[dry run] {desc} ...") click.secho(f" {cmd}", fg="yellow") case (False, BackgroundShell(cmd, desc)): self.run_shell_command( textwrap.dedent(cmd), desc, substitutions, background=True ) case (True, BackgroundShell(cmd, desc)): click.secho(f"[dry run] {desc} ...") click.secho(f"[backgrounding] {cmd}", fg="yellow") case (False, Template(path, desc)): click.secho(f"{desc} ...") self.process_template(path, ".", substitutions) case (True, Template(path, desc)): click.secho(f"[dry run] {desc} ...") click.secho(f" {path} subs:{substitutions}", fg="yellow") case (False, ChangeDir(path, desc)): click.secho(f"{desc} ...") self.set_cwd(path) case (True, ChangeDir(path, desc)): click.secho(f"[dry run] {desc} ...") case (False, Venv(cmd, path, desc)): self.run_shell_command(cmd, desc) self.venv = os.path.abspath(path) case (True, Venv(cmd, path, desc)): click.secho(f"[dry run] {desc} ...") case _: raise Exception("Unhandled case in match. Shouldn't happen") def run_shell_command( self, command: str, desc: str, substitutions: dict[str, str] | None = None, background: bool = False, ): click.secho(f"{desc} ...") if self.venv: venv_path = os.path.join(self.cwd, self.venv, "bin/activate") command = f"source {venv_path} && {command}" if substitutions: command = jinja2.Template(command).render(substitutions) if self.verbose: back = " background" if background else "" click.secho(f"[Running command{back}] {command}", fg="yellow") process = subprocess.Popen( command, shell=True, cwd=self.cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, executable="/bin/bash", ) if background: self.backgrounded.append(process) return stdout, stderr = process.communicate() stdout = stdout.decode() stderr = stderr.decode() if process.returncode == 0: click.secho(f" {stdout}", fg="green") else: click.secho(f" stdout = {stdout}", fg="red") click.secho(f" stderr = {stderr}", fg="red") click.secho(f"Error running command {command}") exit(1) def process_template( self, template_name: str, output_path: str, substitutions: dict[str, str] | None ): template_out = template_name[: template_name.index(".template")] output_path = os.path.join(output_path, template_out) template_path = os.path.join(MY_DIR, template_name) template = jinja2.Template(open(template_path).read()) with open(output_path, "w") as f: f.write(template.render(substitutions)) def __del__(self): for process in self.backgrounded: try: os.killpg(os.getpgid(process.pid), signal.SIGTERM) except Exception as e: print(f"Failed to kill process {process.pid}: {e}")