pulseapi/users/adapter.py (107 lines of code) (raw):
from allauth.account.adapter import DefaultAccountAdapter
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
from allauth.account.utils import user_email
from allauth.socialaccount import providers
from allauth.socialaccount.models import SocialAccount
from allauth.socialaccount.providers.base import AuthProcess
from allauth.socialaccount.providers.google.provider import GoogleProvider
from allauth.exceptions import ImmediateHttpResponse
from allauth.utils import (
email_address_exists,
get_user_model,
)
from django.conf import settings
from django.http import HttpResponseRedirect, QueryDict
from django.urls import reverse
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
from pulseapi.utility.userpermissions import is_staff_address
from pulseapi.profiles.models import UserProfile
google_provider_id = GoogleProvider.id
class PulseAccountAdapter(DefaultAccountAdapter):
def is_open_for_signup(self, request):
return settings.ALLOW_SIGNUP
def is_safe_url(self, url):
"""
We override this because the default implementation only
allows redirects to same origin urls.
"""
from django.utils.http import is_safe_url
return is_safe_url(url, allowed_hosts=settings.LOGIN_ALLOWED_REDIRECT_DOMAINS)
def get_email_confirmation_redirect_url(self, request):
"""
Override this so that we can redirect to the `?next=` url
provided since allauth does not do this out of the box
"""
if request.user.is_authenticated:
next_url = request.GET.get('next')
if next_url and self.is_safe_url(next_url):
return next_url
return super().get_email_confirmation_redirect_url(request)
def get_email_confirmation_url(self, request, emailconfirmation):
"""
Override this so that we can set the `?next=` url in the email
confirmation url so that the user is redirected correctly after
confirming their email.
FIXME: This currently does not work because no `?next=` parameter
is passed to the email confirmation view. Our current workaround
for this is to set the
`ACCOUNT_EMAIL_CONFIRMATION_AUTHENTICATED_REDIRECT_URL` to be
the pulse front-end url via an environment veriable on production.
"""
url = super().get_email_confirmation_url(request, emailconfirmation)
next_url = request.GET.get('next')
if not (next_url and self.is_safe_url(next_url)):
return url
# Parse the url and add the `next` url to it as a query string
url_parts = list(urlparse(url))
qs = dict(parse_qsl(url_parts[4]))
qs.update({'next': next_url})
url_parts[4] = urlencode(qs)
return urlunparse(url_parts)
class PulseSocialAccountAdapter(DefaultSocialAccountAdapter):
"""
Override the default implementation for DefaultSocialAccountAdapter
for two reasons:
1. Auto populate our custom EmailUser model with information from
the social provider
2. Handle the transition from the old auth system to "upgrade" existing
accounts to the new allauth system.
"""
def is_open_for_signup(self, request, socialaccount):
return settings.ALLOW_SIGNUP
def populate_user(self, request, sociallogin, data):
user = super().populate_user(request, sociallogin, data)
name = data.get('name')
if not name:
first_name = data.get('first_name') or ''
last_name = data.get('last_name') or ''
name = f"{first_name} {last_name}".strip()
user.name = name if name else 'Unnamed Pulse user'
if sociallogin.account.provider == google_provider_id and is_staff_address(user_email(user)):
user.is_staff = True
return user
def save_user(self, request, sociallogin, form=None):
"""
For some reason the signal for creating a user is not triggered
by allauth, so we manually create a profile when a user first
logs in.
"""
user = super().save_user(request, sociallogin, form)
try:
UserProfile.objects.get(related_user=user)
except UserProfile.DoesNotExist:
# Is_active is False by default, so we can hide this
# users profile and entries, until set to active by a moderator.
profile = UserProfile.objects.create(is_active=False)
user.profile = profile
user.save()
return user
def pre_social_login(self, request, sociallogin):
email = user_email(sociallogin.user)
UserModel = get_user_model()
login_provider_id = sociallogin.account.provider
if (
not email or
not email_address_exists(email) or
sociallogin.state.get('process') != AuthProcess.LOGIN
):
# This is a new email address, or we're connecting social accounts
# so we don't need to do anything
return
try:
user = UserModel.objects.get(email=email)
except UserModel.DoesNotExist:
# This case shouldn't really happen, but even if it does, we
# don't do anything and let the default behavior kick in.
return
social_accounts = list(SocialAccount.objects.filter(
user=user
).values_list('provider', flat=True))
if len(social_accounts) == 0:
# This is a hack to associate existing accounts on pulse
# that were added via Google Auth the old way to the new allauth
# system. We only do this for new logins into this system.
request.migrate_user = user
elif login_provider_id in social_accounts:
# In this case, the existing user already has a social account
# and is logging into it using the same provider.
return
else:
# Here the user already has a Pulse social account (e.g. Google)
# but is logging in through a different social network
# (e.g. Github) that uses the same email for the first time.
# We redirect them to the login view where they have to login
# through their existing social account on Pulse before going to
# the Social Account Connections view to connect their secondary
# social account.
url = reverse('account_login')
qs = QueryDict(mutable=True)
next_url = sociallogin.get_redirect_url(request)
if next_url:
# We encode the final destination url in the connection
# view url so that users are correctly rerouted after
# connecting their accounts.
qs['next'] = next_url
next_url = '{url}?{qs}'.format(
url=reverse('socialaccount_connections'),
qs=qs.urlencode()
)
qs = QueryDict(mutable=True)
qs['next'] = next_url
qs['promptconnection'] = True
qs['provider'] = providers.registry.by_id(login_provider_id).name
raise ImmediateHttpResponse(
response=HttpResponseRedirect(f'{url}?{qs.urlencode()}')
)
def is_auto_signup_allowed(self, request, sociallogin):
if hasattr(request, 'migrate_user'):
# Associate the existing user with the SocialLogin object
# being created
sociallogin.user = request.migrate_user
return True
return super().is_auto_signup_allowed(request, sociallogin)