community/front-end/ofe/website/ghpcfe/views/images.py (256 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" clusters.py """
import os
from asgiref.sync import sync_to_async
from django.shortcuts import get_object_or_404
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.views import redirect_to_login
from django.contrib.auth.mixins import UserPassesTestMixin
from django.core.exceptions import PermissionDenied
from django.urls import reverse_lazy
from django.conf import settings
from django.http import (
HttpResponseRedirect,
JsonResponse,
)
from django.urls import reverse
from django.views import generic
from django.views.generic.edit import CreateView
from ..models import StartupScript, Image, Credential
from ..forms import StartupScriptForm, ImageForm, ImageImportForm
from ..cluster_manager.image import ImageBackend
from ..cluster_manager.cloud_info import get_region_zone_info
from ..cluster_manager.image_import import *
from ..views.asyncview import BackendAsyncView
from pathlib import Path
import json
from django.contrib import messages
import logging
logger = logging.getLogger(__name__)
class ImagesListView(LoginRequiredMixin, generic.ListView):
"""Custom ListView for StartupScript and Images model"""
model = StartupScript
template_name = "image/list.html"
def get_queryset(self):
# If user is admin, return all objects.
if self.request.user.has_admin_role():
startup_scripts = StartupScript.objects.all()
images = Image.objects.all()
return startup_scripts, images
else:
# Retrieve startup scripts and images owned by the user
startup_scripts = StartupScript.objects.filter(owner=self.request.user)
images = Image.objects.filter(owner=self.request.user)
# Retrieve startup scripts and images authorized for the user
authorized_startup_scripts = StartupScript.objects.filter(authorised_users=self.request.user)
authorized_images = Image.objects.filter(authorised_users=self.request.user)
# Combine the owned and authorized objects
startup_scripts |= authorized_startup_scripts
images |= authorized_images
return startup_scripts, images
def get_context_data(self, *args, **kwargs):
loading = 0
admin_view = 0
if self.request.user.has_admin_role():
admin_view = 1
context = super().get_context_data(*args, **kwargs)
context["loading"] = loading
context["admin_view"] = admin_view
context["navtab"] = "image"
startup_scripts, images = self.get_queryset()
context["startupscripts"] = startup_scripts
context["images"] = images
return context
class StartupScriptDetailView(LoginRequiredMixin, generic.DetailView):
"""Custom DetailView for StartupScript model"""
model = StartupScript
template_name = "image/startup-script-view.html"
def is_admin_or_authorized_user(self, startup_script):
user = self.request.user
return (
user.has_admin_role()
or user == startup_script.owner
or user in startup_script.authorised_users.all()
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
startup_script = self.get_object()
# Check if the user is an admin, the owner, or authorized for the startup script
if self.is_admin_or_authorized_user(startup_script):
file_path = Path(settings.MEDIA_ROOT) / startup_script.content.name
try:
with open(file_path, 'r') as file:
try:
context["file_contents"] = file.read()
except UnicodeDecodeError:
context["file_contents"] = "Error: Unable to decode file"
except IOError:
context["file_contents"] = "Error: Unable to read file"
else:
raise PermissionDenied()
context["navtab"] = "image"
return context
class StartupScriptCreateView(LoginRequiredMixin, generic.CreateView):
"""Custom CreateView for StartupScript model"""
success_url = reverse_lazy("images")
form_class = StartupScriptForm
template_name = "image/startup-script-create.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["navtab"] = "image"
return context
# Set currently logged-in user as owner.
def form_valid(self, form):
form.instance.owner = self.request.user
return super().form_valid(form)
class StartupScriptDeleteView(UserPassesTestMixin, generic.View):
"""Custom view for deleting StartupScript objects"""
def test_func(self):
return self.request.user.is_superuser
def post(self, request, *args, **kwargs):
startup_script = StartupScript.objects.get(pk=self.kwargs['pk'])
file_path = Path(settings.MEDIA_ROOT) / startup_script.content.name
try:
os.remove(file_path)
logger.info("File deleted successfully.")
except FileNotFoundError:
logger.error("Error: File not found.")
except PermissionError:
logger.error("Error: Permission denied.")
except Exception as e:
logger.exception(f"Error: {str(e)}")
startup_script.delete()
response = {'success': True}
return JsonResponse(response)
class ImageCreateView(LoginRequiredMixin, CreateView):
"""Custom CreateView for Image model"""
form_class = ImageForm
template_name = "image/image-create.html"
def get_success_url(self):
image = self.object
success_url = reverse("backend-create-image", kwargs={"pk": image.pk})
return success_url
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["user"] = self.request.user
return kwargs
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["navtab"] = "image"
return context
# Set currently logged-in user as owner.
def form_valid(self, form):
form.instance.owner = self.request.user
return super().form_valid(form)
class ImageDetailView(LoginRequiredMixin, generic.DetailView):
"""Custom DetailView for Image model"""
model = Image
template_name = "image/image-view.html"
def is_admin_or_authorized_user(self, image):
user = self.request.user
return (
user.has_admin_role()
or user == image.owner
or user in image.authorised_users.all()
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
image = self.get_object()
startup_scripts = image.startup_script.all()
context["startup_scripts"] = startup_scripts
# Check if the user is an admin, the owner, or authorized for the image
if self.is_admin_or_authorized_user(image):
context["navtab"] = "image"
return context
else:
raise PermissionDenied()
class ImageDeleteView(UserPassesTestMixin, generic.View):
"""Custom view for deleting Image objects"""
def test_func(self):
return self.request.user.is_superuser
def post(self, request, *args, **kwargs):
image = Image.objects.get(pk=self.kwargs['pk'])
if image.source_image_project == "Imported":
image.delete()
response = {'success': True, 'import': True}
else:
img_backend = ImageBackend(image)
img_backend.delete_image()
image.delete()
response = {'success': True}
return JsonResponse(response)
class ImageStatusView(LoginRequiredMixin, generic.View):
"""Custom view for Image model that returns Image status"""
def is_admin_or_authorized_user(self, image):
user = self.request.user
return (
user.has_admin_role()
or user == image.owner
or user in image.authorised_users.all()
)
def get(self, request, pk, *args, **kwargs):
image = get_object_or_404(Image, pk=pk)
# Check if the user is an admin, the owner, or authorized for the image
if self.is_admin_or_authorized_user(image):
response = {'status': image.status}
return JsonResponse(response)
else:
raise PermissionDenied()
class BackendCreateImage(BackendAsyncView):
"""A view to make async call to create a new image"""
@sync_to_async
def get_orm(self, image_id):
image = Image.objects.get(pk=image_id)
creds = image.cloud_credential
return (image, creds)
def cmd(self, unused_task_id, unused_token, image, creds):
img_backend = ImageBackend(image)
img_backend.prepare()
async def get(self, request, pk):
"""this will invoke the background tasks and return immediately"""
# Mixins don't yet work with Async views
if not await sync_to_async(lambda: request.user.is_authenticated)():
return redirect_to_login(request.get_full_path)
await self.test_user_is_cluster_admin(request.user)
args = await self.get_orm(pk)
await self.create_task("Create Image", *args)
return HttpResponseRedirect(
reverse("images")
)
class BackendListRegions(LoginRequiredMixin, generic.View):
"""Custom view that returns json of available GCP regions."""
def get(self, request, pk, *args, **kwargs):
credentials = get_object_or_404(Credential, pk=pk)
regions = get_region_zone_info("GCP", credentials.detail)
return JsonResponse(regions)
class ImageImportView(LoginRequiredMixin, CreateView):
"""Custom CreateView for Image model"""
form_class = ImageImportForm
template_name = "image/image-import.html"
def get_success_url(self):
success_url = reverse("images")
return success_url
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["user"] = self.request.user
return kwargs
def get_image_list(self):
prexisting_images_list = []
for cred in Credential.objects.all():
prexisting_images = list_project_images(cred)
prexisting_images_list += prexisting_images
return prexisting_images_list
def get_context_data(self, **kwargs):
prexisting_images_list = self.get_image_list()
prexisting_images_json = json.dumps(prexisting_images_list)
context = super().get_context_data(**kwargs)
context["navtab"] = "image"
context["image_choice"] = prexisting_images_list
context["image_jsonstr"] = prexisting_images_json
return context
# Set currently logged-in user as owner.
def form_valid(self, form):
form.instance.owner = self.request.user
form.instance.source_image_project = "Imported" #form.cleaned_data['name']
form.instance.source_image_family = "Imported" #form.cleaned_data['family']
selected_cred_id = int(form.data["cloud_credential"])
selected_cred_obj = get_object_or_404(Credential, pk=selected_cred_id)
usr_input_select = form.data["inputOption"]
if usr_input_select == "text":
form_name = form.data["textInputName"]
form_family = form.data["textInputFamily"]
elif usr_input_select == "dropdown":
image_list = form.data["dropdown"].split(",")
form_name = image_list[1]
form_family = image_list[3]
else:
messages.error(self.request, 'Please choose how you want to select an image')
return self.form_invalid(form)
if form_name != "" and form_family != "":
form.instance.name = form_name
form.instance.family = form_family
else:
messages.error(self.request, 'Please enter an image and family name')
return self.form_invalid(form)
image_isvalid = verify_image(selected_cred_obj,form_name,form_family)
if image_isvalid:
form.instance.status = "r"
else:
messages.error(self.request, 'Image name/family does not match any existing images, using given credential')
return self.form_invalid(form)
return super().form_valid(form)