In my Django project with two superusers called root and supervisor, I’m trying to implement the following flow:
-
When root logs into the application, he is directed to a ‘waiting for authorisation’ page
-
In the meantime, an email is sent to the supervisor containing the OTP in plain text as well as a parameterised link (containing the OTP and session token)
-
When the supervisor clicks this link, approval via OTP verification for root occurs
-
If authentication is successful, the supervisor sees a message saying OTP authentication successful, and in the meantime, the root user is redirected from the waiting page to the landing page
-
If authentication fails, root and the supervisor are shown a message on the page saying that the OTP authorisation failed
The only user who needs OTP authentication is root and approval can only be granted by supervisor.
My first approach had been to trigger OTP generation when root logs in, root gets redirected to a waiting page. Once the supervisor clicks the link, root is redirected to the landing page
All my code was working except for the part where root gets redirected from the waiting area to the landing page.
My current approach is to set root as inactive when the session expires/when root logs out. The idea is to make root’s is_active flag True only when the supervisor approves.
The challenge is that when an inactive user tries logging in, the form is not valid.
I am unsure of what to add to my else condition when ‘if form.is_valid’ evaluates to false: I want to identify the inactive user attempting to login, and if it is root, I want to trigger the OTP generation process, and once approved by the supervisor, activate root.
I am sharing the relevant parts of my code below:
custom_auth_backend.py:
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.models import User
from django.contrib.auth import get_user_model
import logging
from django.contrib import messages
# other required imports
logger = logging.getLogger(__name__)
def get_supervisor_email():
try:
supervisor_user = User.objects.get(username='supervisor')
return supervisor_user.email
except User.DoesNotExist as e:
# If 'supervisor' does not exist
raise e
def get_root_email():
try:
root_user = User.objects.get(username='root')
return root_user.email
except User.DoesNotExist as e:
# If 'root' does not exist
raise e
def generate_otp(request, user):
logger.debug('inside generate_otp method')
try:
supervisor_email = get_supervisor_email()
root_email = get_root_email()
logger.debug('inside the try block in generate_otp method')
session_token = signing.dumps({"user_id": user.id, "role": "root", "session_key":request.session.session_key})
logger.debug('session_token in generate_otp method generated')
otp_secret = pyotp.random_base32()
otp = pyotp.TOTP(otp_secret)
otp_code = otp.now()
logger.debug('otp_code generated from within generate_otp is %s', otp_code)
# Save OTP to the database
otp_obj, created = OTP.objects.get_or_create(user=request.user)
otp_obj.otp_secret = otp_secret
logger.debug('otp_secret is %s', otp_secret)
otp_obj.save()
logger.debug('otp saved to db, in generate_otp method')
# Prepare the verification link
current_site = get_current_site(request)
verification_link = f"http://{current_site}{reverse('verify_otp_from_link')}?otp={int(otp_code)}&session_token={quote(session_token)}"
logger.debug('verification link prepared, in generate_otp method')
# Prepare email
subject = 'OTP for Root Super User Login'
message = f'The OTP for login is: {otp_code}. Please provide this OTP to the superuser ({root_email}). Click this link to verify: {verification_link}'
from_email = settings.DEFAULT_FROM_EMAIL
recipient_list = [supervisor_email]
send_mail(subject, message, from_email, recipient_list)
logger.debug('Email message: %s %s', message, user.username)
messages.success(request, f'OTP has been shared with your supervisor on email at {supervisor_email}')
return render(request, 'login_redirect.html')
except Exception as e:
logger.error(f'An error occurred during OTP generation and email sending: {str(e)} %s', user.username)
messages.error(request, 'An error occurred during login. Please try again.')
return redirect('custom_login')
class CustomModelBackend(ModelBackend):
def authenticate(self, request, username=None, password=None, **kwargs):
User = get_user_model()
# Check if the user exists and is inactive
user = User.objects.filter(username=username).first()
if user and user.username.lower() == 'root':
if user.check_password(password) and not user.is_active:
generate_otp(request, user)
return None
In forms.py:
class CustomAuthenticationForm(AuthenticationForm):
def clean(self):
logger.debug('inside CustomAuthenticationForm\'s clean method')
cleaned_data = super().clean()
username = cleaned_data.get('username')
password = cleaned_data.get('password')
# Check if the username and password are valid
user = authenticate(username=username, password=password)
if user is None:
raise forms.ValidationError("Please enter a correct username and password.")
if not user.is_active:
if username.lower() not in ('root'):
raise forms.ValidationError("User is inactive. Please contact the administrator.")
return cleaned_data
In views.py:
def custom_login(request):
logger.debug('LOGIN VIEW CALLED %s', request.user.username)
form = CustomAuthenticationForm(data=request.POST)
if request.method == 'POST':
logger.debug('POST data: %s', request.POST)
logger.debug('form.is_valid %s', form.is_valid())
logger.debug('Form errors: %s', form.errors)
if form.is_valid():
username = form.cleaned_data['username']
password = form.cleaned_data['password']
user = authenticate(request, username=username, password=password)
if user is not None:
logger.debug('user is not None %s', user.username)
logger.debug('User: %s', user.username)
if user.username.lower() == 'supervisor':
# Supervisor should be redirected to 'landingpage'
logger.debug('Supervisor logging in %s', user.username)
login(request, user)
messages.success(request, 'Logged in successfully!')
logger.debug('Supervisor logged in successfully %s', user.username)
return redirect('landingpage')
elif user.username.lower() not in ('root', 'supervisor'):
# Non-superuser should be redirected to 'landingpage'
login(request, user)
messages.success(request, 'Logged in successfully!')
logger.debug('Non-superuser logged in successfully %s', user.username)
logger.debug('redirecting to landingpage %s', user.username)
return redirect('landingpage')
else:
logger.debug('user is None')
messages.error(request, 'Something went wrong. Please try again.')
return render(request, 'something_went_wrong.html')
else: #form is invalid, applicable when the inactive root user is logging in
pass
#unsure what code I should be adding here
else:
logger.debug('request method is not POST')
form = CustomAuthenticationForm()
return render(request, 'custom_login.html', {'form': form})
def verify_otp_from_link(request):
logger.debug('inside verify_otp_from_link %s', request.user.username)
session_token = request.GET.get('session_token')
int_otp = request.GET.get('otp')
if session_token:
try:
data = signing.loads(session_token)
logger.debug('session token loaded')
user_id = data.get("user_id")
logger.debug('user_id in question: %s', user_id)
role = data.get("role")
logger.debug('role from session token signing.loads: %s', role)
if role == "root" and user_id ==1:
logger.debug('role is equal to root and user id equals 1')
if int_otp is not None:
logger.debug('int_otp: %s', int_otp)
root_user = User.objects.get(pk=user_id)
# Redirect to the verify_otp view
otp = str(int_otp)
otp_param = f'otp={quote(otp)}'
session_token_param = f'session_token={quote(session_token)}'
redirect_url = reverse('verify_otp') + f'?{otp_param}&session_token={quote(session_token)}'
return redirect(redirect_url)
else:
logger.debug('int_otp is None')
# Handle the case where int_otp is None
messages.error(request, 'OTP is missing.')
return render(request, 'something_went_wrong.html')
except signing.BadSignature:
messages.error(request, 'Invalid or expired verification link. Please try again.')
return render(request, 'something_went_wrong.html')
else:
messages.error(request, 'You need to log in to access this content.')
return render(request, 'something_went_wrong.html')
return HttpResponseBadRequest("Invalid Request.")
def verify_otp(request, otp=None):
try:
session_token = request.GET.get('session_token')
data = signing.loads(session_token)
user_id = data.get("user_id")
role = data.get("role")
session_key = data.get("session_key")
root_user = User.objects.get(pk=user_id)
if root_user.is_active==False:
otp_from_link = request.GET.get('otp')
if otp_from_link is not None and otp_from_link != 'None':
otp_from_link = int(otp_from_link)
else:
logger.debug('if otp_from_link is not None and otp_from_link != \'None\' condition failed')
otp_obj = OTP.objects.filter(user__pk=request.session.get('user_id')).first()
logger.debug('otp_from_link: %s', otp_from_link)
logger.debug(f'otp_obj: {otp_obj}, otp_from_link: {otp_from_link}')
if otp_obj and int(otp_obj) == int(otp_from_link):
logger.debug('Inside the if otp_obj exists block')
otp = pyotp.TOTP(otp_obj.otp_secret)
if otp.verify(otp_from_link):
logger.debug('if otp.verify(otp_from_link)')
root_user.is_active = True
root_user.save()
#Fetch the session from the session key and log the user root in
session = Session.objects.get(session_key=session_key)
session[SESSION_KEY] = root_user.id
session[BACKEND_SESSION_KEY] = request.session[BACKEND_SESSION_KEY]
session.save()
return redirect('landingpage')
else:
logger.debug('condition if otp.verify(otp_from_link): evaluated to false')
messages.error(request, 'OTP authorization failed. Please try logging in again.')
return render(request, 'something_went_wrong.html')
else:
logger.debug('if otp_obj and otp_from_link is false')
logger.error('No OTP object found for user: {}'.root_user)
messages.error(request, 'No OTP found for verification.')
return render(request, 'something_went_wrong.html')
else:
logger.debug('root_user.is_active is True')
messages.error(request, 'OTP verification is not pending.')
return redirect('landingpage')
except signing.SignatureExpired:
messages.error(request, 'The verification link has expired. Please try again.')
return render(request, 'something_went_wrong.html')
except signing.BadSignature:
messages.error(request, 'Invalid or tampered verification link. Please try again.')
return render(request, 'something_went_wrong.html')
except Exception as e:
messages.error(request, 'Invalid or expired verification link. Please try again.')
return render(request, 'something_went_wrong.html')
@login_required
def login_redirect(request): #page equivalent of a waiting area while the supervisor is yet to approve the login attempt by root
logger.debug('login_redirect(request, user) %s', user.username)
return render(request, 'login_redirect.html')
@login_required(login_url='custom_login') #decorator to restrict access to home page to only those users who are logged in
def landingpage (request):
return render(request, 'main/landingpage.html')
I would deeply appreciate any guidance or help. Thank you!