Skip to content
32 changes: 32 additions & 0 deletions api/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,35 @@ class Permissions(int, Enum):
CREATE_FEEDBACK = 8388608
UPDATE_FEEDBACK = 16777216
DELETE_FEEDBACK = 33554432


class CorePermissions(int, Enum):
UPDATE_USER = 1
DELETE_USER = 2
CREATE_DIVISION = 4
UPDATE_DIVISION = 8
DELETE_DIVISION = 16
CREATE_ROLE = 32 ## make it a feature permission
UPDATE_ROLE = 64 ## make it a feature permission
DELETE_ROLE = 128 ## make it a feature permission


class FeaturePermissions(int, Enum):
CREATE_ASSIGNMENT = 1
UPDATE_ASSIGNMENT = 2
DELETE_ASSIGNMENT = 4
CREATE_ANNOUNCEMENT = 8
UPDATE_ANNOUNCEMENT = 16
DELETE_ANNOUNCEMENT = 32
CREATE_MEETING = 64
UPDATE_MEETING = 128
DELETE_MEETING = 256
CREATE_SUBMISSION = 512
UPDATE_SUBMISSION = 1024
DELETE_SUBMISSION = 2048
CREATE_EXCUSE = 4096
UPDATE_EXCUSE = 8192
DELETE_EXCUSE = 16384
CREATE_FEEDBACK = 32768
UPDATE_FEEDBACK = 65536
DELETE_FEEDBACK = 131072
46 changes: 46 additions & 0 deletions api/crud/core/core_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from typing import TypeVar, List, Callable

from fastapi import HTTPException
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Session, Query
from starlette import status

from ...const import CorePermissions, FeaturePermissions
from ...db.models import UserModel

T = TypeVar("T")
Expand All @@ -30,6 +32,34 @@ def get_db_all(cls, db: Session, attribute: str, value: int | str) -> List[T]:
def get_db_dump(cls, db: Session) -> List[T]:
return db.query(cls.db_model).all()

# Function to filter the table based on keyword arguments
@classmethod
def filter_table(cls, db: Session, **kwargs) -> T | None:

query = db.query(cls.db_model)

for attr, value in kwargs.items():
query = query.filter(getattr(cls.db_model, attr) == value)

try:
results = query.first()
return results
except NoResultFound:
return []

@classmethod
def filter_all(cls, db: Session, **kwargs) -> List[T]:
query = db.query(cls.db_model)

for attr, value in kwargs.items():
query = query.filter(getattr(cls.db_model, attr) == value)

try:
results = query.all()
return results
except NoResultFound:
return []

@classmethod
def create(cls, db: Session, **kwargs) -> T:
new_model = cls.db_model(**kwargs)
Expand Down Expand Up @@ -62,3 +92,19 @@ def db_update(cls, model_instance, **kwargs) -> None:
"""this method is used to update a model instance without committing to the database"""
for key, value in kwargs.items():
setattr(model_instance, key, value)

@classmethod
def _calculate_core_permission(cls, request_permissions: dict) -> int:
total_request: int = 0
for permission in CorePermissions:
if request_permissions[permission.name]:
total_request = total_request | permission.value
return total_request

@classmethod
def _calculate_feature_permission(cls, request_permissions: dict) -> int:
total_request: int = 0
for permission in FeaturePermissions:
if request_permissions[permission.name]:
total_request = total_request | permission.value
return total_request
36 changes: 24 additions & 12 deletions api/crud/core/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@
from starlette import status

from .core_base import CoreBase
from ...const import Permissions
from .division import Division
from ...db.models import RoleModel
from ...db.models.division_model import DivisionModel


class Role(CoreBase):
db_model = RoleModel

@classmethod
def create(cls, db: Session, **kwargs) -> RoleModel:
cls.check_role_exists(db, kwargs.get("name"))
cls.check_role_exists(db, kwargs.get("name"), kwargs.get("division_id"))
division_exists = Division.get_db_first(db, "id", kwargs.get("division_id"))
if not division_exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="division does not exist")

return super().create(db, name=kwargs["name"],
permissions=cls._calculate_total_permission(kwargs["permissions"]))
division_id=kwargs["division_id"],
permissions=cls._calculate_feature_permission(kwargs["permissions"]))

@classmethod
def update(cls, model_id: int, db: Session, **kwargs) -> RoleModel:
Expand All @@ -24,22 +30,28 @@ def update(cls, model_id: int, db: Session, **kwargs) -> RoleModel:
if role_found and role_found.id != model_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="role already exists")
model.name = kwargs["name"]
model.permissions = cls._calculate_total_permission(kwargs["permissions"])
model.permissions = cls._calculate_feature_permission(kwargs["permissions"])
db.commit()
db.refresh(model)
return model
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{cls.__name__.lower()} not found")

@classmethod
def check_role_exists(cls, db: Session, name: str) -> None:
def check_role_exists(cls, db: Session, name: str, division_id: int) -> None:
role = cls.get_db_first(db, "name", name)
if role:
if role and role.division_id == division_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="role already exists")

# @classmethod
# def _calculate_total_permission(cls, request_permissions: dict) -> int:
# total_request: int = 0
# for permission in Permissions:
# if request_permissions[permission.name]:
# total_request = total_request | permission.value
# return total_request

@classmethod
def _calculate_total_permission(cls, request_permissions: dict) -> int:
total_request: int = 0
for permission in Permissions:
if request_permissions[permission.name]:
total_request = total_request | permission.value
return total_request
def check_division_exists(cls, db, division_id: int) -> None:
checkDivision = db.query(DivisionModel).filter_by(id=division_id).first() is not None
if not checkDivision:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="division not found")
26 changes: 25 additions & 1 deletion api/crud/core/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from starlette import status

from .core_base import CoreBase
from .role import Role
from .userrole import UserRole
from ...db.models import UserModel


Expand All @@ -18,4 +20,26 @@ def get_db_username_or_email(cls, db: Session, username: str) -> UserModel | Non
def validate_username(cls, db: Session, username: str) -> None:
existing_user = cls.get_db_first(db, "username", username)
if existing_user:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="username is taken")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="username is taken")

@classmethod
def edit_user_permissions(cls, db: Session, user_id: int, **kwargs) -> None:
user = cls.get_db_first(db, "id", user_id)
if user:
user.permissions = cls._calculate_core_permission(kwargs)
db.commit()
db.refresh(user)
return user
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="user not found")

@classmethod
def assign_user_role(cls, db: Session, user_id: int, role_id: int) -> UserRole:
user = cls.get_db_first(db, "id", user_id)
if user:
role = Role.get_db_first(db, "id", role_id)
if role:
if UserRole.filter_table(db, user_id=user_id, role_id=role_id):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="user already has this role")
return UserRole.create(db=db, user_id=user_id, role_id=role_id)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="role not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="user not found")
16 changes: 16 additions & 0 deletions api/crud/core/userrole.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sqlalchemy.orm import Session

from api.crud.core.core_base import CoreBase

from ...db.models import UserRoleModel


class UserRole(CoreBase):
db_model = UserRoleModel

@classmethod
def delete(cls, db: Session, user_id: int, role_id: int) -> dict:
# Use UserRoleModel for database operations
db.query(cls.db_model).filter_by(user_id=user_id, role_id=role_id).delete()
db.commit()
return {"message": "record deleted successfully"}
70 changes: 0 additions & 70 deletions api/crud/core/userroledivision.py

This file was deleted.

49 changes: 49 additions & 0 deletions api/crud/core/userroledivisionpermission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from fastapi import HTTPException
from sqlalchemy.orm import Session
from starlette import status

from .core_base import CoreBase
from .division import Division
from .role import Role
from .user import User
from ...db.models import UserModel, DivisionModel, RoleModel, \
UserRoleDivisionPermissionModel


class UserRoleDivisionPermission(CoreBase):
db_model = UserRoleDivisionPermissionModel

@classmethod
def create(cls, db: Session, user_id: int, division_id: int, role_id: int,
**kwargs) -> UserRoleDivisionPermissionModel:
user = User.get_db_first(db, "id", user_id)
division = Division.get_db_first(db, "id", division_id)

if cls.filter_table(db, user_id=user_id, division_id=division_id, role_id=role_id):
raise HTTPException(status_code=status.HTTP_409_CONFLICT,
detail="User already in this and have the same role")

if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")

if not division:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Division not found")

return super().create(
db, user_id=user_id, division_id=division_id, role_id=role_id,
permissions=cls._calculate_feature_permission(kwargs)
)

@classmethod
def delete(cls, db: Session, user_id: int, division_id: int, role_id: int | None, ) -> dict:
record = db.query(cls.db_model).filter(
(cls.db_model.user_id == user_id) &
(cls.db_model.division_id == division_id)
& (cls.db_model.role_id == role_id)
).first()
if record:
db.delete(record)
db.commit()

return {"message": "record deleted successfully"}
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="record not found")
Loading