moderator/moderate/views.py (255 lines of code) (raw):

from dal import autocomplete from dateutil.relativedelta import relativedelta from django.conf import settings from django.contrib import auth, messages from django.contrib.auth.decorators import login_required from django.contrib.auth.models import User from django.core.exceptions import ValidationError from django.core.mail import send_mail from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator from django.db.models import Count, Q from django.http import Http404, JsonResponse from django.http.response import HttpResponseRedirect from django.shortcuts import get_object_or_404, redirect, render from django.urls import reverse from django.utils.timezone import now as django_now from mozilla_django_oidc.views import OIDCAuthenticationCallbackView from moderator.moderate.forms import EventForm, QuestionForm from moderator.moderate.models import Event, Question, Vote from moderator.moderate.templatetags.helpers import can_moderate_event SUBJECT = "Question moderation update" class OIDCCallbackView(OIDCAuthenticationCallbackView): """Override OIDC callback view.""" def login_failure(self, msg=""): """Returns a custom message in case of login failure.""" if not msg: msg = "Login failed. Access is allowed only to staff and NDA members." messages.error(self.request, msg) return super(OIDCCallbackView, self).login_failure() def main(request): """Render main page.""" user = request.user if user.is_authenticated: events = ( Event.objects.filter(archived=False) .prefetch_related("moderators") .annotate( approved_count=Count( "questions", filter=Q(questions__is_accepted=True) ), rejected_count=Count( "questions", filter=Q(questions__is_accepted=False) ), pending_count=Count("questions", filter=Q(questions__is_accepted=None)), ) ) if not user.userprofile.is_nda_member: events = events.exclude(is_nda=True) return render(request, "index.jinja", {"events": events, "user": user}) return render(request, "index.jinja", {"user": user}) @login_required(login_url="/") def archive(request): """List of all archived events.""" q_args = { "archived": True, } # Filter out NDA events for non-NDA users if not request.user.userprofile.is_nda_member and not request.user.is_superuser: q_args["is_nda"] = False events_list = ( Event.objects.filter(**q_args) .annotate( approved_count=Count("questions", filter=Q(questions__is_accepted=True)) ) .order_by("-created_at") ) paginator = Paginator(events_list, settings.ITEMS_PER_PAGE) page = request.GET.get("page") try: events = paginator.page(page) except PageNotAnInteger: events = paginator.page(1) except EmptyPage: events = paginator.page(paginator.num_pages) return render(request, "archive.jinja", {"events": events}) @login_required(login_url="/") def edit_event(request, slug=None): """Creates a new event.""" user = request.user query_args = [] user_can_edit = False if not user.is_superuser: query_args = [Q(moderators=user)] event = get_object_or_404(Event, *query_args, slug=slug) if slug else None # if there is an event or no slug, the user can edit if event or not slug: user_can_edit = True event_form = EventForm(request.POST or None, instance=event, user=user) if event_form.is_valid(): e = event_form.save(commit=False) if not event: e.created_by = user e.save() event_form.save_m2m() if slug: msg = "Event successfully edited." else: msg = "Event successfully created." messages.success(request, msg) if e.archived: return redirect(reverse("archive")) return redirect(reverse("main")) ctx = { "slug": event.slug if event else None, "event_form": event_form, "event": event, "profile": user.userprofile, "user_can_edit": user_can_edit, } return render(request, "create_event.jinja", ctx) @login_required(login_url="/") def moderate_event(request, slug, q_id=None, accepted=None): event = get_object_or_404(Event, slug=slug) user = request.user if not (event.moderators.filter(pk=user.pk).exists() or user.is_superuser): raise Http404 questions = Question.objects.filter(event=event, is_accepted__isnull=True) question_form = QuestionForm() # Update the question if it's accepted or rejected if q_id: try: question = Question.objects.get(id=q_id) except Question.DoesNotExist: raise ValidationError("This question is not valid") question.is_accepted = accepted question.save() if request.method == "POST": question_form = QuestionForm(request.POST, instance=question) # update the question with moderator's reply if question_form.is_valid(): reason = question_form.cleaned_data.get("rejection_reason") Question.objects.filter(id=question.pk).update(rejection_reason=reason) if ( reason and not question.is_accepted and question.submitter_contact_info ): send_mail( SUBJECT, reason, settings.FROM_NOREPLY, [question.submitter_contact_info], ) messages.success(request, "Email sent successfully") return redirect(reverse("moderate_event", kwargs={"slug": event.slug})) return render( request, "moderation.jinja", {"user": user, "event": event, "questions": questions, "q_form": question_form}, ) @login_required(login_url="/") def delete_event(request, slug): """Delete an event.""" user = request.user query_args = {"slug": slug, "moderators__in": [user]} # Allow superusers to edit all events if user.is_superuser: del query_args["moderators__in"] event = get_object_or_404(Event, **query_args) event.delete() msg = "Event successfully deleted." messages.success(request, msg) return redirect(reverse("main")) @login_required(login_url="/") def show_event(request, e_slug, q_id=None): """Render event questions.""" event = get_object_or_404(Event, slug=e_slug) question = None user = request.user # Do not display NDA events to non NDA members or non employees. if event.is_nda and not user.userprofile.is_nda_member: raise Http404 if q_id: question = Question.objects.get(id=q_id) questions_q = Question.objects.filter(event=event, is_accepted=True).annotate( vote_count=Count("votes") ) if user.userprofile.is_admin or event.archived: questions = questions_q.order_by("-vote_count") else: questions = questions_q.order_by("?") question_form = QuestionForm( request.POST or None, instance=question, **{"is_locked": True} ) is_new_question = False if question_form.is_valid() and not event.archived: question_obj = question_form.save(commit=False) # Do not change the user if posting a reply moderator_ids = event.moderators.values_list("id", flat=True) accept_question = not event.is_moderated or user.id in moderator_ids if not question_obj.id: is_new_question = True # mark as accepted for non moderated events if accept_question: question_obj.is_accepted = True if not question_obj.is_anonymous: question_obj.asked_by = user question_obj.submitter_contact_info = user.email elif not can_moderate_event(event, user): raise Http404 question_obj.event = event question_obj.save() msg = "Your question has been successfully submitted. " if not accept_question: msg += "Review is pending by an event moderator." if is_new_question: messages.success(request, msg) return redirect(reverse("event", kwargs={"e_slug": event.slug})) return render( request, "questions.jinja", { "user": user, "open": not event.archived, "event": event, "questions": questions, "q_form": question_form, }, ) @login_required def upvote(request, q_id): """Upvote question""" question = Question.objects.get(pk=q_id) event = question.event user = request.user if not ( user_can_vote := ( event.users_can_vote or (event.is_nda and user.userprofile.is_nda_member) ) ): msg = "Voting is not allowed for this event." messages.warning(request, msg) if ( request.headers.get("x-requested-with") == "XMLHttpRequest" and not question.event.archived and user_can_vote ): if not Vote.objects.filter(user=user, question=question).exists(): Vote.objects.create(user=user, question=question) else: Vote.objects.filter(user=user, question=question).delete() response_dict = {} if request.user.is_superuser or event.created_by == request.user: response_dict = {"current_vote_count": question.votes.count()} return JsonResponse(response_dict) return redirect(reverse("event", kwargs={"e_slug": event.slug})) class ModeratorsAutocomplete(autocomplete.Select2QuerySetView): def get_queryset(self): if not self.request.user.is_authenticated: return User.objects.none() # exclude users that are not admins # and are not active or haven't logged in for 6 months last_login_date = django_now().date() - relativedelta(months=6) qs = User.objects.exclude( Q(is_active=False) | Q(is_superuser=False, last_login__lt=last_login_date) ).filter() if self.q: qs = qs.filter( Q(first_name__icontains=self.q) | Q(email__icontains=self.q) | Q(userprofile__username__icontains=self.q) ) return qs def login_local_user(request, username=""): """Allow a user to login for local dev.""" if not (settings.DEV and settings.ENABLE_DEV_LOGIN) or not username: raise Http404 user = None try: user = User.objects.get(username=username) except User.DoesNotExist: raise Http404 if user: auth.login(request, user, backend="django.contrib.auth.backends.ModelBackend") return HttpResponseRedirect(reverse("main"))