Supervisor approval of root superuser

In my Django project with two superusers called root and supervisor, I’m trying to implement the following flow:

  • When root logs into the application, he is directed to a ‘waiting for authorisation’ page

  • In the meantime, an email is sent to the supervisor containing the OTP in plain text as well as a parameterised link (containing the OTP and session token)

  • When the supervisor clicks this link, approval via OTP verification for root occurs

  • If authentication is successful, the supervisor sees a message saying OTP authentication successful, and in the meantime, the root user is redirected from the waiting page to the landing page

  • If authentication fails, root and the supervisor are shown a message on the page saying that the OTP authorisation failed

The only user who needs OTP authentication is root and approval can only be granted by supervisor.

My first approach had been to trigger OTP generation when root logs in, root gets redirected to a waiting page. Once the supervisor clicks the link, root is redirected to the landing page
All my code was working except for the part where root gets redirected from the waiting area to the landing page.

My current approach is to set root as inactive when the session expires/when root logs out. The idea is to make root’s is_active flag True only when the supervisor approves.

The challenge is that when an inactive user tries logging in, the form is not valid.

I am unsure of what to add to my else condition when ‘if form.is_valid’ evaluates to false: I want to identify the inactive user attempting to login, and if it is root, I want to trigger the OTP generation process, and once approved by the supervisor, activate root.

I am sharing the relevant parts of my code below:

custom_auth_backend.py:

from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.models import User
from django.contrib.auth import get_user_model
import logging
from django.contrib import messages
# other required imports

logger = logging.getLogger(__name__)

def get_supervisor_email():
	try:
        supervisor_user = User.objects.get(username='supervisor')
        return supervisor_user.email
    except User.DoesNotExist as e:
        # If 'supervisor' does not exist
        raise e

def get_root_email():
		try:
        root_user = User.objects.get(username='root')
        return root_user.email
    except User.DoesNotExist as e:
        # If 'root' does not exist
        raise e
	
def generate_otp(request, user):
    logger.debug('inside generate_otp method')

    try:
		supervisor_email = get_supervisor_email()
		root_email = get_root_email()
        logger.debug('inside the try block in generate_otp method')
        session_token = signing.dumps({"user_id": user.id, "role": "root", "session_key":request.session.session_key})
        logger.debug('session_token in generate_otp method generated')

        otp_secret = pyotp.random_base32()
        otp = pyotp.TOTP(otp_secret)
        otp_code = otp.now()
        logger.debug('otp_code generated from within generate_otp is %s', otp_code)
        
        # Save OTP to the database
        otp_obj, created = OTP.objects.get_or_create(user=request.user)
        otp_obj.otp_secret = otp_secret
        logger.debug('otp_secret is %s', otp_secret)
        otp_obj.save()
        logger.debug('otp saved to db, in generate_otp method')
        
        # Prepare the verification link
        current_site = get_current_site(request)
        verification_link = f"http://{current_site}{reverse('verify_otp_from_link')}?otp={int(otp_code)}&session_token={quote(session_token)}"
        logger.debug('verification link prepared, in generate_otp method')
        
        # Prepare email
        subject = 'OTP for Root Super User Login'
        message = f'The OTP for login is: {otp_code}. Please provide this OTP to the superuser ({root_email}). Click this link to verify: {verification_link}'
        from_email = settings.DEFAULT_FROM_EMAIL
        recipient_list = [supervisor_email]
        send_mail(subject, message, from_email, recipient_list)
        logger.debug('Email message: %s %s', message, user.username)
        messages.success(request, f'OTP has been shared with your supervisor on email at {supervisor_email}')
        return render(request, 'login_redirect.html')
        
    except Exception as e:
        logger.error(f'An error occurred during OTP generation and email sending: {str(e)} %s', user.username)
        messages.error(request, 'An error occurred during login. Please try again.')
        return redirect('custom_login')



class CustomModelBackend(ModelBackend):
    def authenticate(self, request, username=None, password=None, **kwargs):
        User = get_user_model()

        # Check if the user exists and is inactive
        user = User.objects.filter(username=username).first()
        if user and user.username.lower() == 'root':
            if user.check_password(password) and not user.is_active:
                generate_otp(request, user)

        return None

In forms.py:

class CustomAuthenticationForm(AuthenticationForm):
    def clean(self):
        logger.debug('inside CustomAuthenticationForm\'s clean method')
        cleaned_data = super().clean()
        username = cleaned_data.get('username')
        password = cleaned_data.get('password')
        
        # Check if the username and password are valid
        user = authenticate(username=username, password=password)
        
        if user is None:
            raise forms.ValidationError("Please enter a correct username and password.")
            
        if not user.is_active:
            if username.lower() not in ('root'):
                raise forms.ValidationError("User is inactive. Please contact the administrator.")
        
        return cleaned_data

In views.py:

def custom_login(request):
    logger.debug('LOGIN VIEW CALLED %s', request.user.username)
    form = CustomAuthenticationForm(data=request.POST)

    if request.method == 'POST':
        logger.debug('POST data: %s', request.POST)
        logger.debug('form.is_valid %s', form.is_valid())
        logger.debug('Form errors: %s', form.errors)

        if form.is_valid():
            username = form.cleaned_data['username']
            password = form.cleaned_data['password']
            user = authenticate(request, username=username, password=password)
            
            if user is not None:
                logger.debug('user is not None %s', user.username)
                logger.debug('User: %s', user.username)
                
                if user.username.lower() == 'supervisor':
                    # Supervisor should be redirected to 'landingpage'
                    logger.debug('Supervisor logging in %s', user.username)
                    login(request, user)
                    messages.success(request, 'Logged in successfully!')
                    logger.debug('Supervisor logged in successfully %s', user.username)
                    return redirect('landingpage')
                        
                elif user.username.lower() not in ('root', 'supervisor'):
                    # Non-superuser should be redirected to 'landingpage'
                    login(request, user)
                    messages.success(request, 'Logged in successfully!')
                    logger.debug('Non-superuser logged in successfully %s', user.username)
                    logger.debug('redirecting to landingpage %s', user.username)
                    return redirect('landingpage')
                        
            else:
                logger.debug('user is None')
                messages.error(request, 'Something went wrong. Please try again.')
                return render(request, 'something_went_wrong.html')
				
		else: #form is invalid, applicable when the inactive root user is logging in
			pass
			#unsure what code I should be adding here
                   
    else:
        logger.debug('request method is not POST')
        form = CustomAuthenticationForm()
        
    return render(request, 'custom_login.html', {'form': form})


def verify_otp_from_link(request):

    logger.debug('inside verify_otp_from_link %s', request.user.username)
    session_token = request.GET.get('session_token')
    int_otp = request.GET.get('otp')
    
    if session_token:
        try:
            
            data = signing.loads(session_token)
            logger.debug('session token loaded')
            
            user_id = data.get("user_id")
            logger.debug('user_id in question: %s', user_id)
            
            role = data.get("role")
            logger.debug('role from session token signing.loads: %s', role)
            
            if role == "root" and user_id ==1:
                logger.debug('role is equal to root and user id equals 1')
                
                if int_otp is not None:
                    logger.debug('int_otp: %s', int_otp)
                    root_user = User.objects.get(pk=user_id)

                    # Redirect to the verify_otp view
                    otp = str(int_otp)
                    otp_param = f'otp={quote(otp)}'
                    session_token_param = f'session_token={quote(session_token)}'
                    redirect_url = reverse('verify_otp') + f'?{otp_param}&session_token={quote(session_token)}'
                    return redirect(redirect_url)
                    
                else:
                    logger.debug('int_otp is None')
                    # Handle the case where int_otp is None
                    messages.error(request, 'OTP is missing.')
                    return render(request, 'something_went_wrong.html')
                    
        except signing.BadSignature:
            messages.error(request, 'Invalid or expired verification link. Please try again.')
            return render(request, 'something_went_wrong.html')
   
    else:
        messages.error(request, 'You need to log in to access this content.')
        return render(request, 'something_went_wrong.html')
    
    return HttpResponseBadRequest("Invalid Request.")
    
 
def verify_otp(request, otp=None):
    
    try:
        session_token = request.GET.get('session_token')
        data = signing.loads(session_token)
        user_id = data.get("user_id")
        role = data.get("role")
        session_key = data.get("session_key")
        
        root_user = User.objects.get(pk=user_id)
        
        if root_user.is_active==False:
            otp_from_link = request.GET.get('otp')
            if otp_from_link is not None and otp_from_link != 'None':
                otp_from_link = int(otp_from_link)
            else:
                logger.debug('if otp_from_link is not None and otp_from_link != \'None\' condition failed')
                
            otp_obj = OTP.objects.filter(user__pk=request.session.get('user_id')).first()
            logger.debug('otp_from_link: %s', otp_from_link)
            logger.debug(f'otp_obj: {otp_obj}, otp_from_link: {otp_from_link}')

            
            if otp_obj and int(otp_obj) == int(otp_from_link):
                logger.debug('Inside the if otp_obj exists block')
                otp = pyotp.TOTP(otp_obj.otp_secret)
                
                if otp.verify(otp_from_link):
                    logger.debug('if otp.verify(otp_from_link)')
                    root_user.is_active = True
                    root_user.save()
                    
                    #Fetch the session from the session key and log the user root in
                    session = Session.objects.get(session_key=session_key)
                    session[SESSION_KEY] = root_user.id
                    session[BACKEND_SESSION_KEY] = request.session[BACKEND_SESSION_KEY]
                    session.save()
                    return redirect('landingpage')
                    
                else:
                    logger.debug('condition if otp.verify(otp_from_link): evaluated to false')
                    messages.error(request, 'OTP authorization failed. Please try logging in again.')
                    return render(request, 'something_went_wrong.html')
                    
            else:
                logger.debug('if otp_obj and otp_from_link is false')
                logger.error('No OTP object found for user: {}'.root_user)
                messages.error(request, 'No OTP found for verification.')
                return render(request, 'something_went_wrong.html')
        
        else:
            logger.debug('root_user.is_active is True')
            messages.error(request, 'OTP verification is not pending.')
            return redirect('landingpage')
            
    except signing.SignatureExpired:
        messages.error(request, 'The verification link has expired. Please try again.')
        return render(request, 'something_went_wrong.html')
        
    except signing.BadSignature:
        messages.error(request, 'Invalid or tampered verification link. Please try again.')
        return render(request, 'something_went_wrong.html')
        
    except Exception as e:
        messages.error(request, 'Invalid or expired verification link. Please try again.')
        return render(request, 'something_went_wrong.html')


@login_required
def login_redirect(request): #page equivalent of a waiting area while the supervisor is yet to approve the login attempt by root
    logger.debug('login_redirect(request, user) %s', user.username)
    return render(request, 'login_redirect.html')


@login_required(login_url='custom_login') #decorator to restrict access to home page to only those users who are logged in
def landingpage (request):
    return render(request, 'main/landingpage.html')

I would deeply appreciate any guidance or help. Thank you!

Are you sure that else condition is executed with respect to this if form.is_valid(): if condition or your else condition is executed with respect to this if user is not None: if condition.
Because if you set your user’s is_active to False then this line user = authenticate(request, username=username, password=password) will always give user = None and else condition for this if user is not None will be executed.