I have the following code that sends requests to check JWT token, then authorize user and return authorized session with Access token, Refresh Token and Session ID.
@csrf_exempt
def new_login_view(request, *args, **kwargs):
def convert_data(req):
data = {
"email": req.data['username'],
"password": req.data['password'],
}
try:
data["language"] = request.LANGUAGE_CODE
except:
data["language"] = request.POST.get('language', 'en')
return data
if request.user.is_authenticated and not request.META.get('HTTP_X_AVOID_COOKIES'):
return HttpResponseRedirect(request.GET.get(KEY_NEXT, '/'))
if request.method == 'POST':
request_data = convert_data(request)
# request to Accounts API to check if user exists
response = send_service_request(EnumAuthUrls.sign_in.value,
json_data=request_data,
original_response=True)
if isinstance(response, dict):
return JsonResponse(response)
if response.status_code == 200:
tok_ac = response.headers.get(HEADER_ACCESS_KEY)
tok_ref = response.headers.get(HEADER_REFRESH_KEY)
# checking JWT token
user = ApiAuthenticationBackend().authenticate(request, tok_ac)
# creates session
data = login_session(request, response, user)
data['user_id'] = request.user.id
data['account_id'] = request.user.profile.account_id
data['balance'] = request.user.balance
if request.META.get('HTTP_X_AVOID_COOKIES'):
return JsonResponse(data)
response = AuthResponse(
data=data,
ssid=request.session.session_key,
access_token=tok_ac,
refresh_token=tok_ref,
)
return response
else:
return ErrorApiResponse(response.json())
service = urllib.parse.quote_plus(request.build_absolute_uri())
return HttpResponseRedirect(settings.ACCOUNTS_URL + f'login/?service={service}')
Here’s the code of class ApiAuthenitcationBackend
:
class ApiAuthenticationBackend(HttpBearer):
def authenticate(self, request, token):
try:
JWTHandler().verify_token(token)
user_data = JWTHandler.get_subject(token=token)
query = Q(email=user_data['email'])
try:
user = User.objects.select_related('profile').get(query)
if 'is_staff' in user_data and not user_data['is_staff']:
del user_data['is_staff']
if 'is_superuser' in user_data and not user_data['is_superuser']:
del user_data['is_superuser']
# renew user fields
for key, value in user_data.items():
setattr(user, key, value)
user.save()
except User.DoesNotExist:
try:
del user_data['subscriptions']
except:
pass
user = User.objects.create(**user_data)
profile = user.profile
if profile.account_id != user_data['id']:
profile.account_id = user_data['id']
profile.save()
return user
Here’s the code of login_session
fucntion:
def login_session(request: HttpRequest, response: HttpResponse, user):
request.user = user
request.session.create()
base_data = response.json().get(KEY_DATA)
return request.user.serialize(request, base_data, token=True)
And here’s the class AuthResponse
that is eventually based on HttpResponse
:
class AuthResponse(SuccessResponse):
def __init__(self, data={}, ssid='', access_token: str = '', refresh_token: str = '', **kwargs):
super().__init__(data, **kwargs)
if ssid:
logger.debug(f'Setting {settings.SESSION_COOKIE_NAME}: {ssid}')
self.set_cookie(key=settings.SESSION_COOKIE_NAME,
value=ssid)
if access_token:
self.set_cookie(key=settings.ACCESS_KEY_COOKIE_NAME,
value=access_token)
if refresh_token:
self.set_cookie(key=settings.REFRESH_KEY_COOKIE_NAME,
value=refresh_token)
The problem is that looks everything good on the browser side, I get all needed cookies (access token, refresh token and session id) however after trying logging in I get redirected to the main page.
There was problem in the beginning with setting cookies, but then I found out that I should not use SESSION_COOKIE_DOMAIN
if it’s local. Thus all cookies came up without problem but it didn’t resolve situation with authorization.
While setting cookies with self.set_cookie()
I tried to use secure=True
, samesite='Lax'
, httponly=True
, and all other parameters but it didn’t help.
Does anyone knows what else I can try in order to fix it?