community/front-end/ofe/website/ghpcfe/views/jobs.py (380 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ jobs.py """ from decimal import Decimal from rest_framework import viewsets from rest_framework.permissions import IsAuthenticated from django.contrib.auth.mixins import LoginRequiredMixin from django.contrib import messages from django.http import HttpResponseRedirect from django.urls import reverse, reverse_lazy from django.views import generic from django.shortcuts import get_object_or_404 from ..permissions import SuperUserRequiredMixin from ..models import Application, Job, Role, Cluster from ..serializers import JobSerializer from ..forms import JobForm from ..cluster_manager import c2, cloud_info, utils from .view_utils import GCSFile, StreamingFileView import logging logger = logging.getLogger(__name__) class JobListView(LoginRequiredMixin, generic.ListView): """Custom ListView for Job model""" template_name = "job/list.html" def get_queryset(self): jobs = Job.objects.filter( user=self.request.user ) # user only sees its own jobs roles = [] for role in list(self.request.user.roles.all()): roles.append(role.id) if Role.CLUSTERADMIN in roles: jobs = Job.objects.all() # admin gets to see everything return jobs def get_context_data(self, *args, **kwargs): loading = 0 jobs = self.get_queryset() for job in jobs: if job.status in ["p", "q", "d", "r", "u"]: loading = 1 break context = super().get_context_data(*args, **kwargs) context["loading"] = loading context["navtab"] = "job" return context class JobDetailView(LoginRequiredMixin, generic.DetailView): """Custom DetailView for Job model""" model = Job template_name = "job/detail.html" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["navtab"] = "job" return context class JobCreateView(LoginRequiredMixin, generic.ListView): """Custom CreateView for Job model""" template_name = "job/select_cluster.html" model = Cluster def get_queryset(self): # Select clusters which are valid for this application app = get_object_or_404(Application, pk=self.kwargs["app"]) if app.install_loc: return app.install_loc.clusters_using.all() else: # QuerySet of just our cluster return Cluster.objects.filter(id=app.cluster.id) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) application = get_object_or_404(Application, pk=self.kwargs["app"]) context["application"] = application context["navtab"] = "job" return context def render_to_response(self, context, **response_kwargs): if len(self.object_list) == 1: cluster_id = self.object_list[0].id return HttpResponseRedirect( reverse( "job-create-2", kwargs={"app": self.kwargs["app"], "cluster": cluster_id}, ) ) else: return super().render_to_response(context, **response_kwargs) def post(self, request, app): app = get_object_or_404(Application, pk=app) cluster_id = request.POST.get("cluster") return HttpResponseRedirect( reverse( "job-create-2", kwargs={"app": self.kwargs["app"], "cluster": cluster_id}, ) ) class JobCreateView2(LoginRequiredMixin, generic.CreateView): """Custom CreateView for Job model""" template_name = "job/create_form.html" form_class = JobForm def form_valid(self, form): self.object = form.save(commit=False) self.object.user = self.request.user # Can't trust client side input for these... (bad user, no cookie) # self.object.node_price = self.request.POST.get('node_price') # self.object.job_cost = self.request.POST.get('job_cost') cluster = self.object.cluster instance_type = self.object.partition.machine_type try: node_price_float = cloud_info.get_instance_pricing( "GCP", cluster.cloud_credential.detail, cluster.cloud_region, cluster.cloud_zone, instance_type, ) self.object.node_price = Decimal(node_price_float) logger.debug( "Got api price %0.2f for %s in %s-%s", self.object.node_price, instance_type, cluster.cloud_region, cluster.cloud_zone, ) # No sense in second guessing the possible error states, if the API call # fails just pass the error along regardless of how we failed except Exception as err: # pylint: disable=broad-except form.add_error( None, f"Error: Pricing API unavailable - please retry later ({err})", ) return self.form_invalid(form) self.object.job_cost = ( self.object.node_price * self.object.number_of_nodes * self.object.wall_clock_time_limit / Decimal(60) ) if self.object.user.quota_type == "d": form.add_error( None, "Error: Cannot submit job. User quota disabled" ) return self.form_invalid(form) if self.object.user.quota_type == "l": quota_remaining = ( self.object.user.quota_amount - self.object.user.total_spend() ) # Fudge to nearest cent to avoid "apparently equal" issues in user # display if self.object.job_cost > (quota_remaining - Decimal(0.005)): form.add_error( None, "Error: Insufficient quota remaining (have " f"${quota_remaining:0.2f}, job would require " f"${self.object.job_cost:0.2f})", ) return self.form_invalid(form) self.object.save() return HttpResponseRedirect(self.get_success_url()) def get_initial(self): cluster = get_object_or_404(Cluster, pk=self.kwargs["cluster"]) return { "cluster": cluster, "application": Application.objects.get(pk=self.kwargs["app"]), "wall_clock_time_limit": 120, } def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) application = get_object_or_404(Application, pk=self.kwargs["app"]) cluster = get_object_or_404(Cluster, pk=self.kwargs["cluster"]) context["user_quota_type"] = self.request.user.quota_type context["user_quota_remaining"] = ( self.request.user.quota_amount - self.request.user.total_spend() ) context["application"] = application context["cluster"] = cluster context["navtab"] = "job" return context def get_success_url(self): return reverse("backend-job-run", kwargs={"pk": self.object.pk}) class JobRerunView(LoginRequiredMixin, generic.CreateView): """Custom CreateView for rerunning job based on existing job""" template_name = "job/rerun_form.html" form_class = JobForm def form_valid(self, form): self.object = form.save(commit=False) self.object.user = self.request.user # Can't trust client side input for these... (bad user, no cookie) # self.object.node_price = self.request.POST.get('node_price') # self.object.job_cost = self.request.POST.get('job_cost') cluster = self.object.cluster instance_type = self.object.partition.machine_type try: node_price_float = cloud_info.get_instance_pricing( "GCP", cluster.cloud_credential.detail, cluster.cloud_region, cluster.cloud_zone, instance_type, ) self.object.node_price = Decimal(node_price_float) logger.debug( "Got api price %0.2f for %s in %s-%s", self.object.node_price, instance_type, cluster.cloud_region, cluster.cloud_zone, ) except Exception as err: # pylint: disable=broad-except form.add_error( None, f"Error: Pricing API unavailable - please retry later ({err})", ) return self.form_invalid(form) self.object.job_cost = ( self.object.node_price * self.object.number_of_nodes * self.object.wall_clock_time_limit / Decimal(60) ) if self.object.user.quota_type == "d": form.add_error( None, "Error: Cannot submit job. User quota disabled" ) return self.form_invalid(form) if self.object.user.quota_type == "l": quota_remaining = ( self.object.user.quota_amount - self.object.user.total_spend() ) # Fudge to nearest cent to avoid "apparently equal" issues in user # display if self.object.job_cost > (quota_remaining - Decimal(0.005)): form.add_error( None, "Error: Insufficient quota remaining (have " f"${quota_remaining:0.2f}, job would require " f"${self.object.job_cost:0.2f})", ) return self.form_invalid(form) self.object.save() return HttpResponseRedirect(self.get_success_url()) def get_initial(self): initial = super().get_initial() initial = initial.copy() existing_job = Job.objects.get(pk=self.kwargs["job"]) initial["application"] = existing_job.application initial["cluster"] = existing_job.cluster initial["partition"] = existing_job.partition initial["number_of_nodes"] = existing_job.number_of_nodes initial["ranks_per_node"] = existing_job.ranks_per_node initial["threads_per_rank"] = existing_job.threads_per_rank initial["wall_clock_time_limit"] = existing_job.wall_clock_time_limit initial["input_data"] = existing_job.input_data initial["result_data"] = existing_job.result_data initial["run_script"] = existing_job.run_script initial["benchmark"] = existing_job.benchmark return initial def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) job = Job.objects.get(pk=self.kwargs["job"]) application = job.application cluster = application.cluster run_script = job.run_script if run_script.startswith("#!"): run_script_type = "raw" else: run_script_type = "url" context["user_quota_type"] = self.request.user.quota_type context["user_quota_remaining"] = self.request.user.quota_remaining() context["application"] = application context["cluster"] = cluster context["navtab"] = "job" context["run_script_type"] = run_script_type context["run_script"] = run_script return context def get_success_url(self): return reverse("backend-job-run", kwargs={"pk": self.object.pk}) class JobUpdateView(LoginRequiredMixin, generic.UpdateView): """Custom UpdateView for Job model""" model = Job def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["navtab"] = "job" return context class JobDeleteView(SuperUserRequiredMixin, generic.DeleteView): """Custom DeleteView for Job model""" # Note on SuperUserRequiredMixin use here: # Current cost management model means spend is tied to job records users # deleting their own jobs would therefore allow them to delete their spend model = Job success_url = reverse_lazy("jobs") template_name = "job/confirm_delete.html" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["navtab"] = "job" return context class JobLogFileView(LoginRequiredMixin, StreamingFileView): """View job various job scripts and logs""" bucket = utils.load_config()["server"]["gcs_bucket"] valid_logs = [ {"title": "Job Output", "type": GCSFile, "args": (bucket, "stdout")}, {"title": "Job Error Log", "type": GCSFile, "args": (bucket, "stderr")}, { "title": "Job Submit Script", "type": GCSFile, "args": (bucket, "submit.sh"), }, ] def _create_file_info_object(self, logfile_info, *args, **kwargs): return logfile_info["type"](*logfile_info["args"], *args, **kwargs) def get_file_info(self): logid = self.kwargs.get("logid", -1) job_id = self.kwargs.get("pk") job = get_object_or_404(Job, pk=job_id) cluster_id = job.application.cluster.id bucket_prefix = f"clusters/{cluster_id}/jobs/{job.id}" entry = self.valid_logs[logid] extra_args = [bucket_prefix] return self._create_file_info_object(entry, *extra_args) class JobLogView(LoginRequiredMixin, generic.DetailView): """View to display job log files""" model = Job template_name = "job/log.html" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["log_files"] = [ {"id": n, "title": entry["title"]} for n, entry in enumerate(JobLogFileView.valid_logs) ] context["navtab"] = "job" return context # For APIs class JobViewSet(viewsets.ModelViewSet): """Custom ModelViewSet for Job model""" permission_classes = (IsAuthenticated,) queryset = Job.objects.all().order_by("name") serializer_class = JobSerializer # Other supporting views class BackendJobRun(LoginRequiredMixin, generic.View): """Backend handler to push job info to c2daemon on the cluster""" def get(self, request, pk): job = get_object_or_404(Job, pk=pk) job.status = "p" job.save() cluster_id = job.cluster.id try: user_uid = job.user.socialaccount_set.first().uid except AttributeError: if job.user.is_superuser: user_uid = "0" else: # User doesn't have a Google SocialAccount. messages.error( request, "You are not signed in with a Google Account. This is " "required for job submission.", ) job.status = "n" return HttpResponseRedirect( reverse("job-detail", kwargs={"pk": pk}) ) def response(message): if message.get("cluster_id") != cluster_id: logger.error( "Cluster ID mismatch versus callback: expected %s, " "received %s", pk, message.get("cluster_id"), ) if message.get("job_id") != pk: logger.error( "Job ID mismatch versus callback: expected %s, " "received %s", pk, message.get("job_id"), ) job = Job.objects.get(pk=pk) job.status = message["status"] logger.info( "Processing job message, id %d, status %s", pk, job.status ) if "slurm_job_id" in message and not job.slurm_jobid: job.slurm_jobid = message["slurm_job_id"] if job.status in ["c", "e"]: job.runtime = message.get("job_runtime", None) job.result_unit = message.get("result_unit", "") job.result_value = message.get("result_value", None) job.job_cost = ( job.number_of_nodes * job.runtime / Decimal(3600) * job.node_price ) job.save() # N.B not base64 encoding the job script because the pubsub library uses # protobuf anyway message_data = { "job_id": job.id, "login_uid": user_uid, "run_script": job.run_script, "num_nodes": job.number_of_nodes, "partition": job.partition.name, } if job.application.load_command: message_data["load_command"] = job.application.load_command if job.ranks_per_node: message_data["ranksPerNode"] = job.ranks_per_node if job.threads_per_rank: message_data["threadsPerRank"] = job.threads_per_rank if job.wall_clock_time_limit: message_data["wall_limit"] = job.wall_clock_time_limit if job.input_data: message_data["input_data"] = job.input_data if job.result_data: message_data["result_data"] = job.result_data if job.partition.GPU_per_node: message_data["gpus_per_node"] = job.partition.GPU_per_node c2.send_command( cluster_id, "RUN_JOB", on_response=response, data=message_data ) messages.success(request, "Job sent to Cluster") return HttpResponseRedirect(reverse("job-detail", kwargs={"pk": pk}))