class UserManager(BaseUserManager):
def create_user(
self,
user_name,
email,
first_name,
last_name,
emp_code,
phone_number,
role_tag,
branch_id,
branch_name,
user_type,
password=None,
created_by=None,
):
if not email:
raise ValueError("Users must have an email address")
user = self.model(
email=self.normalize_email(email),
user_name=user_name,
first_name=first_name,
last_name=last_name,
emp_code=emp_code,
phone_number=phone_number,
role_tag=role_tag,
branch_id=branch_id,
branch_name=branch_name,
user_type=user_type,
created_by=created_by,
)
if password:
user.set_password(password)
else:
raise ValueError("Password must be provided")
# Save user to database
user.save(using=self._db)
return user
def create_superuser(
self,
user_name,
email,
first_name,
last_name,
emp_code,
phone_number,
role_tag,
branch_id,
branch_name,
user_type,
password=None,
created_by=None,
):
user = self.create_user(
user_name=user_name,
email=email,
first_name=first_name,
last_name=last_name,
emp_code=emp_code,
phone_number=phone_number,
role_tag=role_tag,
branch_id=branch_id,
branch_name=branch_name,
user_type=user_type,
password=password,
created_by=created_by,
)
user.is_admin = True
user.is_active = True
user.save(using=self._db)
return user
class User(AbstractBaseUser):
USER_TYPE_CHOICES = [
("maker", "Maker"),
("checker", "Checker"),
("mis_view", "MIS View"),
("admin", "Admin"),
]
first_name = models.CharField(max_length=50)
last_name = models.CharField(max_length=50)
user_name = models.CharField(max_length=50, unique=True)
emp_code = models.CharField(max_length=50)
email = models.EmailField(unique=True)
phone_number = models.CharField(max_length=15)
role_tag = models.CharField(max_length=50)
is_active = models.BooleanField(default=True)
branch_id = models.IntegerField()
branch_name = models.CharField(max_length=50)
deleted_by_admin_id = models.IntegerField(null=True, blank=True)
user_type = models.CharField(max_length=20, choices=USER_TYPE_CHOICES)
created_by = models.IntegerField()
updated_by = models.IntegerField()
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_admin = models.BooleanField(default=False)
objects = UserManager()
USERNAME_FIELD = "email"
REQUIRED_FIELDS = [
"user_name",
"first_name",
"last_name",
"emp_code",
"phone_number",
"role_tag",
"branch_id",
"branch_name",
"user_type",
]
def __str__(self):
return f"{self.user_name} ({self.first_name} {self.last_name})"
@property
def is_staff(self):
return self.is_admin
def soft_delete(self, admin_user):
"""Mark the user as inactive and record the admin's ID."""
if admin_user.is_admin:
self.is_active = False
self.deleted_by_admin_id = admin_user.id
self.save()
def delete(self, *args, **kwargs):
"""Override delete to prevent actual deletion."""
raise Exception("Deletion is not allowed. Use soft_delete instead.")
def save(self, *args, **kwargs):
if self.password:
self.set_password(self.password)
super(User, self).save(*args, **kwargs)
this is my model for User
to login
def login(self, *args, **kwargs):
user_obj = self.user_dao.get_user(
email=kwargs["validated_data"]["email"], is_active=True
)
if not user_obj:
return self.request_response_handler.return_response(
{
"detail": f"{kwargs['validated_data']['email']} this email not found!"
},
status_code=status.HTTP_400_BAD_REQUEST,
)
if user_obj.check_password(kwargs["validated_data"]["password"]):
token_obj = self.token_dao.get_token(user_id=user_obj)
if not token_obj:
token_obj = self.token_dao.create(user_id=user_obj)
token_obj.set_access_key()
token_obj.set_refresh_key()
return self.request_response_handler.return_response(
{"access_token": token_obj.access_key},
status_code=status.HTTP_200_OK,
)
elif token_obj.is_token_expired():
token_obj.set_access_key()
token_obj.set_refresh_key()
response = self.request_response_handler.return_response(
{"access_token": token_obj.access_key},
status_code=status.HTTP_200_OK,
)
# Set the refresh token as HttpOnly cookie
response.set_cookie(
key="refresh_token",
value=token_obj.refresh_key,
httponly=True, # HttpOnly cookie for security
secure=settings.SECURE_SSL_REDIRECT, # Use secure cookies in production
samesite="Lax", # Prevents CSRF, use 'Strict' if compatible
max_age=14
* 24
* 60
* 60, # Set to your refresh token's expiration time
)
return response
return self.request_response_handler.return_response(
{"access_token": token_obj.access_key}, status_code=status.HTTP_200_OK
)
else:
return self.request_response_handler.return_response(
{"error": "Invalid credentials"},
status_code=status.HTTP_401_UNAUTHORIZED,
)