I will be sharing the solution here. It has a lot of “noise”, since i’m out of time right now. But I believe that you will be able to pick it up the idea.
Management command
# bid_capital_loan_service/users/management/commands/setup_bff_groups.py
from datetime import timedelta
from django.conf import settings
from django.core.cache import cache
from django.core.management.base import BaseCommand
from bid_capital_loan_service.users import roles
LAST_SETUP_VERSION_CACHE_KEY = "users:bff-role:last-setup-version"
class Command(BaseCommand):
def handle(self, *args, **options):
if not settings.DEBUG:
return self._do_setup_on_production(*args, **options)
return self._setup_groups()
def _do_setup_on_production(self, *args, **options):
current_version = settings.SYSTEM_RELEASE_VERSION
last_setup_version = cache.get(LAST_SETUP_VERSION_CACHE_KEY)
if current_version == last_setup_version:
self.stdout.write(f"Skipping setup for {current_version=}, already setup before", self.style.WARNING)
return
self._setup_groups()
cache.set(LAST_SETUP_VERSION_CACHE_KEY, current_version, timeout=timedelta(days=1).total_seconds())
def _setup_groups(self):
self.stdout.write("Setting-up BFF groups")
roles.setup_frontend_groups()
The roles module
Here are the magic, doing some static type runtime behavior.
(Again there’s a lot of noise here, the method being called by the management command is setup_frontend_groups
)
# bid_capital_loan_service/users/roles.py
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import Annotated
from typing import Any
from typing import Literal
from typing import get_args
import structlog
from django.contrib.auth.models import Permission
from django.db.models import Case
from django.db.models import IntegerField
from django.db.models import Q
from django.db.models import When
if TYPE_CHECKING:
from collections.abc import Generator
from users.models import Group
# this is the same from django.contrib.auth, but mypy thinks that it's not but cant import this at runtime here
else:
from django.contrib.auth.models import Group
class PermissionCTCodename:
"""A list of permissions names that the group specifically have"""
def __init__(self, *perms: str):
for perm in perms:
app_label, found, codename = perm.partition(".")
if not all([found, app_label, codename]):
raise ValueError(
r"Invalid permission name, must follow the format `{app_label}.{codename}`",
"Received: ",
repr(perm),
)
self.perms = set(perms)
def __repr__(self):
return f"{self.__class__.__name__}({self.perms})"
class InheritPermissionsFromGroups:
"""Define one or more groups that the group inherits permissions from. If the target inherited group itself inherits
from another group, then permissions will be inherited recursively"""
def __init__(self, *group_names: str):
self.group_names = group_names
def __repr__(self):
return f"{self.__class__.__name__}({self.group_names})"
class FrontEndGroupPriority:
"""Priorities closer to 0 have higher precedence"""
MAX_PRIORITY = 99
def __init__(self, priority: int = MAX_PRIORITY):
assert 0 <= priority <= self.MAX_PRIORITY, "Not within valid priorities"
self.priority = priority
def __repr__(self):
return f"{self.__class__.__name__}({self.priority})"
FrontEndGroupDefinition = (
Annotated[
Literal["[Front] Operador (treinamento)"],
PermissionCTCodename(
"fgts_loan.simulate_fgts_loan",
"fgts_loan.view_own_simulated_fgts_loan",
),
FrontEndGroupPriority(90),
]
| Annotated[
Literal["[Front] Operador (Sem visão IA)"],
PermissionCTCodename(
"fgts_loan.submit_proposal",
"fgts_loan.change_own_proposal",
"users.view_own_operations",
"bff.view_dashboards",
),
InheritPermissionsFromGroups("[Front] Operador (treinamento)"),
FrontEndGroupPriority(85),
]
| Annotated[
Literal["[Front] Operador"],
PermissionCTCodename(
"users.view_ai_operations",
),
InheritPermissionsFromGroups("[Front] Operador (Sem visão IA)"),
FrontEndGroupPriority(80),
]
| Annotated[
Literal["[Front] Operador (back-office)"],
PermissionCTCodename(
"fgts_loan.assign_proposal_to_another_user",
),
InheritPermissionsFromGroups("[Front] Operador"),
FrontEndGroupPriority(75),
]
| Annotated[
Literal["[Front] Supervisor"],
PermissionCTCodename(
"users.create_operator",
"users.view_other_operators_operations",
"fgts_loan.view_loansimulation",
"fgts_loan.change_loanproposal",
"fgts_loan.view_operational_report",
"referrals.view_referrallink",
"referrals.add_referrallink",
"referrals.change_referrallink",
"referrals.view_loanproposalreferral",
"users.control_product_access",
),
InheritPermissionsFromGroups("[Front] Operador"),
FrontEndGroupPriority(70),
]
| Annotated[
Literal["[Front] Supervisor (back-office)"],
PermissionCTCodename(
"fgts_loan.assign_proposal_to_another_user",
),
InheritPermissionsFromGroups("[Front] Supervisor"),
FrontEndGroupPriority(65),
]
| Annotated[
Literal["[Front] Financeiro"],
PermissionCTCodename(
"comissions.view_resellercomissionpaymentprocesspayment",
"comissions.view_resellercomissionpaymentprocess",
"comissions.view_loanproposalcomission",
),
InheritPermissionsFromGroups("[Front] Supervisor (back-office)"),
FrontEndGroupPriority(60),
]
| Annotated[
Literal["[Front] Administrador"],
PermissionCTCodename(
"reseller.add_reseller",
"reseller.change_reseller",
"reseller.view_reseller",
"reseller.add_resellerselfhiringdomain",
"users.view_user",
"users.change_user",
"users.add_user",
"bff.define_reseller_target_production",
"bff.change_theme",
"bff.add_theme",
),
InheritPermissionsFromGroups("[Front] Financeiro"),
FrontEndGroupPriority(0),
]
)
def _get_metadata_by_type[T](definition, t: type[T]) -> T:
metadata = getattr(definition, "__metadata__", ())
return next((meta for meta in metadata if isinstance(meta, t)), t())
def _get_group_definition_by_name(group_name: str):
for group_def in get_args(FrontEndGroupDefinition):
definition_name = get_args(get_args(group_def)[0])[0]
if group_name == definition_name:
return group_def
raise ValueError("No group definition with name", group_name)
def _get_recursive_groups_permissions(groups: tuple[str, ...]):
perms: set[str] = set()
for group in groups:
group_def = _get_group_definition_by_name(group)
perms |= _get_metadata_by_type(group_def, PermissionCTCodename).perms
inheritance = _get_metadata_by_type(group_def, InheritPermissionsFromGroups)
perms |= _get_recursive_groups_permissions(inheritance.group_names)
return perms
def _get_group_names_with_recursive_perms() -> Generator[tuple[str, set[str]]]:
for group_name, definition in _get_group_names_definitions():
perms = _get_metadata_by_type(definition, t=PermissionCTCodename).perms
assert len(perms), "Expected at least one Permission to be defined on the group"
inherited_groups = _get_metadata_by_type(definition, InheritPermissionsFromGroups)
perms |= _get_recursive_groups_permissions(inherited_groups.group_names)
yield group_name, perms
def _get_group_names_definitions() -> Generator[tuple[FrontEndGroupDefinition, Any]]:
for group_def in get_args(FrontEndGroupDefinition):
literal_type = get_args(group_def)[0]
yield get_args(literal_type)[0], group_def
def setup_frontend_groups():
"""Setup all groups creating or updating them with the required permissions"""
for group_name, _ in _get_group_names_definitions():
setup_group(group_name=group_name)
_logger: structlog.typing.FilteringBoundLogger = structlog.get_logger(__name__)
def setup_group(group_name: FrontEndGroupDefinition) -> Group:
"""Create or update a group with the defined permissions. Can be called many times for the same group with no
side-effects. It won't change custom permissions added manually (e.g through the admin), it only adds the defined
permissions to the group permissions m2m."""
logger = _logger.new(group_name=group_name)
logger.debug("Setting-up group")
perms = next((p for n, p in _get_group_names_with_recursive_perms() if n == group_name))
logger.debug("Got perms", perms=perms)
group, created = Group.objects.get_or_create(name=group_name)
if not created:
logger.debug(f"Group {group_name} already existed")
perm_filter = Q()
for perm in perms:
app_label, codename = perm.split(".")
perm_filter |= Q(content_type__app_label=app_label, codename=codename)
permissions = Permission.objects.filter(perm_filter).select_related("content_type")
db_perms = {f"{p.content_type.app_label}.{p.codename}" for p in permissions}
missing_perms = perms.difference(db_perms)
if missing_perms:
logger.warning("One or more permissions were not found", missing_perms=missing_perms)
raise ValueError("Missing permissions")
group.permissions.add(*permissions)
return group