From 462ba03672f85c3f155dd2797c31edff38f0c6de Mon Sep 17 00:00:00 2001 From: wailbentafat Date: Wed, 25 Mar 2026 23:07:51 +0100 Subject: [PATCH 01/10] feat:seperate repsonsabilites for single face detection --- app/core/config.py | 3 +- app/service/face_match.py | 124 ++++++++ app/service/single_face_match.py | 301 ------------------ app/service/users.py | 8 + app/worker/single_face_match/main.py | 156 +++++++++ app/worker/single_face_match/worker.py | 85 ----- db/generated/devices.py | 64 ++-- db/generated/models.py | 7 +- db/generated/photo_faces.py | 138 ++++++++ db/generated/user.py | 151 ++++----- db/queries/photo_faces.sql | 62 ++++ db/queries/user.sql | 8 + .../versions/4dd6658b9f83_merge_heads.py | 2 - .../versions/5b6615c9ab1d_merge_heads.py | 2 - 14 files changed, 600 insertions(+), 511 deletions(-) create mode 100644 app/service/face_match.py delete mode 100644 app/service/single_face_match.py create mode 100644 app/worker/single_face_match/main.py delete mode 100644 app/worker/single_face_match/worker.py diff --git a/app/core/config.py b/app/core/config.py index 55d0841..103fe2a 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -81,7 +81,8 @@ def _parse_debug(cls, value): # type: ignore[no-untyped-def] return False if lowered in {"true", "1", "yes"}: return True - return value + return value + settings = Settings() # type: ignore diff --git a/app/service/face_match.py b/app/service/face_match.py new file mode 100644 index 0000000..9f6dbf8 --- /dev/null +++ b/app/service/face_match.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import json +from uuid import UUID + +from sqlalchemy.exc import DBAPIError, SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncConnection + +from app.core.logger import logger +from app.schema.dto.single_face_match import BBoxPayload, SingleFaceMatchJob +from app.service.user_match import ClosestUserMatch, UserMatchService +from app.service.user_notification import UserNotificationService +from app.service.users import AuthService +from db.generated import photo_faces as photo_face_queries + + +class SingleFaceMatchService: + def __init__( + self, + *, + conn: AsyncConnection, + photo_face_querier: photo_face_queries.AsyncQuerier, + user_service: AuthService, + user_notification_service: UserNotificationService, + ) -> None: + self.conn = conn + self.photo_face_querier = photo_face_querier + self.user_service = user_service + self.user_notification_service = user_notification_service + + async def process_detected_face( + self, + job: SingleFaceMatchJob, + embedding: list[float], + bbox: BBoxPayload | None, + ) -> None: # noqa: C901 + if not job.image_ref: + logger.warning("Missing image_ref in event payload for photo %s", job.photo_id) + return + + embedding_literal = self._vector_literal(embedding) + bbox_payload = self._serialize_bbox(bbox) + + created_face_match_id: UUID | None = None + matched_user: ClosestUserMatch | None = None + + try: + async with self.conn.begin(): + if not await self.photo_exists(job.photo_id): + logger.warning("Photo not found: %s", job.photo_id) + return + + if await self.match_exists_for_photo(job.photo_id): + logger.info("Photo %s already matched; skipping", job.photo_id) + return + + matched_user = await self.user_service.find_closest_user( + embedding_literal=embedding_literal, + ) + if matched_user is None: + logger.info("No user embeddings available for matching") + return + + params = photo_face_queries.PhotoFacesEnsureFaceMatchParams( + photo_id=job.photo_id, + face_index=job.face_index, + column_3=embedding_literal, + bbox=bbox_payload, + user_id=matched_user.user_id, + confidence=matched_user.distance, + ) + result = await self.photo_face_querier.photo_faces_ensure_face_match(params) + if result is None: + logger.warning("Failed to ensure face match for photo %s", job.photo_id) + return + + if result.face_match_id is None: + logger.info("Match already exists for photo %s; skipping", job.photo_id) + else: + created_face_match_id = result.face_match_id + logger.info( + "Inserted face match %s for photo %s", + created_face_match_id, + job.photo_id, + ) + except (DBAPIError, SQLAlchemyError) as exc: + logger.warning("DB write failed for photo %s: %s", job.photo_id, exc) + return + except MemoryError: + logger.error("Out of memory while matching photo %s", job.photo_id) + return + + if created_face_match_id and matched_user is not None: + await self.user_notification_service.create_notification( + user_id=matched_user.user_id, + type="face_match", + payload={ + "photo_id": str(job.photo_id), + "confidence": matched_user.distance, + }, + ) + + async def photo_exists(self, photo_id: UUID) -> bool: + row = await self.photo_face_querier.photo_faces_photo_exists(id=photo_id) + return row is not None + + async def match_exists_for_photo(self, photo_id: UUID) -> bool: + row = await self.photo_face_querier.photo_faces_match_exists_for_photo( + photo_id=photo_id, + ) + return row is not None + + @staticmethod + def _vector_literal(embedding: list[float]) -> str: + return "[" + ", ".join(str(x) for x in embedding) + "]" + + @staticmethod + def _serialize_bbox(bbox: BBoxPayload | None) -> str | None: + if bbox is None: + return None + return json.dumps( + {"x1": bbox.x1, "y1": bbox.y1, "x2": bbox.x2, "y2": bbox.y2} + ) +*** End Patch diff --git a/app/service/single_face_match.py b/app/service/single_face_match.py deleted file mode 100644 index 30676c1..0000000 --- a/app/service/single_face_match.py +++ /dev/null @@ -1,301 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -from dataclasses import dataclass -from uuid import UUID - -import sqlalchemy -import sqlalchemy.ext.asyncio - -from app.core.constant import MINIO_URL_PREFIX -from app.core.config import settings -from app.core.logger import logger -from sqlalchemy.exc import DBAPIError, SQLAlchemyError -from app.infra.minio import Bucket, IMAGES_BUCKET_NAME -from app.service.face_embedding import FaceEmbeddingService, FaceImagePayload -from app.schema.dto.single_face_match import BBoxPayload, SingleFaceMatchJob -from db.generated import photo_faces as photo_face_queries -from db.generated import models - - -@dataclass(frozen=True) -class ClosestUserMatch: - user_id: UUID - distance: float - - -PHOTO_EXISTS = """ -SELECT 1 -FROM photos -WHERE id = :photo_id -""" - -GET_CLOSEST_USER = """ -SELECT id, (face_embedding <=> CAST(:embedding AS vector)) AS distance -FROM users -WHERE face_embedding IS NOT NULL -ORDER BY distance ASC -LIMIT 1 -""" - -INSERT_FACE_MATCH = """ -INSERT INTO face_matches (photo_face_id, user_id, confidence) -VALUES (:photo_face_id, :user_id, :confidence) -RETURNING id -""" - -CHECK_MATCH_FOR_PHOTO = """ -SELECT 1 -FROM face_matches fm -JOIN photo_faces pf ON pf.id = fm.photo_face_id -WHERE pf.photo_id = :photo_id -LIMIT 1 -""" - -CHECK_MATCH_FOR_PHOTO_FACE = """ -SELECT 1 -FROM face_matches -WHERE photo_face_id = :photo_face_id -LIMIT 1 -""" - - - -class SingleFaceMatchService: - def __init__( - self, - *, - conn: sqlalchemy.ext.asyncio.AsyncConnection, - face_embedding_service: FaceEmbeddingService, - photo_face_querier: photo_face_queries.AsyncQuerier, - ) -> None: - self.conn = conn - self.face_embedding_service = face_embedding_service - self.photo_face_querier = photo_face_querier - - async def process_job(self, job: SingleFaceMatchJob) -> None: # noqa: C901 - if job.faces_detected is not None and job.faces_detected != 1: - logger.info( - "Skipping photo %s: faces_detected=%s (single-face worker)", - job.photo_id, - job.faces_detected, - ) - return - - if not job.image_ref: - logger.warning("Missing image_ref in event payload for photo %s", job.photo_id) - return - - if not await self._photo_exists(job.photo_id): - logger.warning("Photo not found: %s", job.photo_id) - return - if await self._match_exists_for_photo(job.photo_id): - logger.info("Photo %s already matched; skipping", job.photo_id) - return - - embedding, bbox = await self._resolve_embedding(job) - if embedding is None: - return - - try: - photo_face = await self._upsert_photo_face( - photo_id=job.photo_id, - face_index=job.face_index, - embedding=embedding, - bbox=bbox, - ) - if photo_face is None: - logger.warning("Failed to upsert photo_face for photo %s", job.photo_id) - return - await self._commit_best_effort() - except (DBAPIError, SQLAlchemyError) as exc: - await self._rollback_best_effort() - logger.warning("DB write failed for photo %s: %s", job.photo_id, exc) - return - except MemoryError: - logger.error("Out of memory while processing photo %s", job.photo_id) - return - - match = await self._find_closest_user(embedding) - if match is None: - logger.info("No user embeddings available for matching") - return - - if await self._match_exists_for_photo_face(photo_face.id): - logger.info("Match already exists for photo_face %s; skipping", photo_face.id) - return - - try: - await self._insert_face_match( - photo_face_id=photo_face.id, - user_id=match.user_id, - confidence=match.distance, - ) - await self._commit_best_effort() - except (DBAPIError, SQLAlchemyError) as exc: - await self._rollback_best_effort() - logger.warning("Failed to insert face match for photo %s: %s", job.photo_id, exc) - return - except MemoryError: - logger.error("Out of memory while matching photo %s", job.photo_id) - return - - async def _photo_exists(self, photo_id: UUID) -> bool: - row = (await self.conn.execute( - sqlalchemy.text(PHOTO_EXISTS), - {"photo_id": photo_id}, - )).first() - return row is not None - - async def _resolve_embedding( - self, - job: SingleFaceMatchJob, - ) -> tuple[list[float] | None, BBoxPayload | None]: - try: - payload = await self._load_payload(job) - except Exception as exc: - logger.warning("Failed to load image payload for photo %s: %s", job.photo_id, exc) - return None, None - - try: - faces = await self.face_embedding_service.detect_faces(payload) - except Exception as exc: - logger.warning("Face detection failed for photo %s: %s", job.photo_id, exc) - return None, None - - if len(faces) != 1: - logger.info( - "Skipping photo %s: detected %s faces (single-face worker)", - job.photo_id, - len(faces), - ) - return None, None - - face = faces[0] - bbox = BBoxPayload( - x1=float(face.bbox[0]), - y1=float(face.bbox[1]), - x2=float(face.bbox[2]), - y2=float(face.bbox[3]), - ) - return face.embedding, bbox - - async def _load_payload(self, job: SingleFaceMatchJob) -> FaceImagePayload: - if not job.image_ref: - raise ValueError("Missing image_ref in event payload") - - bucket_name, object_name = self._parse_minio_ref(job.image_ref) - bucket = Bucket(bucket_name, "") - last_exc: Exception | None = None - for attempt in range(1, settings.MINIO_RETRY_ATTEMPTS + 1): - try: - data, filename, content_type = await bucket.get(object_name) - return FaceImagePayload( - filename=filename, - content_type=content_type, - bytes=data, - ) - except Exception as exc: - last_exc = exc - logger.warning( - "MinIO fetch failed for %s (attempt %s/%s): %s", - object_name, - attempt, - settings.MINIO_RETRY_ATTEMPTS, - exc, - ) - if attempt < settings.MINIO_RETRY_ATTEMPTS: - await asyncio.sleep(settings.MINIO_RETRY_BASE_SECONDS * attempt) - assert last_exc is not None - raise last_exc - - async def _upsert_photo_face( - self, - *, - photo_id: UUID, - face_index: int, - embedding: list[float], - bbox: BBoxPayload | None, - ) -> models.PhotoFace | None: - embedding_literal = self._vector_literal(embedding) - bbox_payload = None - if bbox is not None: - bbox_payload = json.dumps( - {"x1": bbox.x1, "y1": bbox.y1, "x2": bbox.x2, "y2": bbox.y2} - ) - return await self.photo_face_querier.upsert_photo_face( - photo_id=photo_id, - face_index=face_index, - dollar_3=embedding_literal, - bbox=bbox_payload, - ) - - async def _find_closest_user( - self, - embedding: list[float], - ) -> ClosestUserMatch | None: - embedding_literal = self._vector_literal(embedding) - row = (await self.conn.execute( - sqlalchemy.text(GET_CLOSEST_USER), - {"embedding": embedding_literal}, - )).first() - if row is None: - return None - return ClosestUserMatch(user_id=row[0], distance=float(row[1])) - - async def _insert_face_match( - self, - *, - photo_face_id: UUID, - user_id: UUID, - confidence: float, - ) -> None: - await self.conn.execute( - sqlalchemy.text(INSERT_FACE_MATCH), - { - "photo_face_id": photo_face_id, - "user_id": user_id, - "confidence": confidence, - }, - ) - - async def _match_exists_for_photo(self, photo_id: UUID) -> bool: - row = (await self.conn.execute( - sqlalchemy.text(CHECK_MATCH_FOR_PHOTO), - {"photo_id": photo_id}, - )).first() - return row is not None - - async def _match_exists_for_photo_face(self, photo_face_id: UUID) -> bool: - row = (await self.conn.execute( - sqlalchemy.text(CHECK_MATCH_FOR_PHOTO_FACE), - {"photo_face_id": photo_face_id}, - )).first() - return row is not None - - async def _commit_best_effort(self) -> None: - try: - await self.conn.commit() - except Exception: - pass - - async def _rollback_best_effort(self) -> None: - try: - await self.conn.rollback() - except Exception: - pass - - @staticmethod - def _vector_literal(embedding: list[float]) -> str: - return "[" + ", ".join(str(x) for x in embedding) + "]" - - @staticmethod - def _parse_minio_ref(image_ref: str) -> tuple[str, str]: - if image_ref.startswith(MINIO_URL_PREFIX): - raw = image_ref[len(MINIO_URL_PREFIX) :] - parts = raw.split("/", 1) - if len(parts) != 2 or not parts[0] or not parts[1]: - raise ValueError("Invalid MinIO image_ref format") - return parts[0], parts[1] - return IMAGES_BUCKET_NAME, image_ref diff --git a/app/service/users.py b/app/service/users.py index 8ab3f81..8700878 100644 --- a/app/service/users.py +++ b/app/service/users.py @@ -350,3 +350,11 @@ async def unblock_user(self, *, user_id: uuid.UUID) -> User: except Exception as exc: logger.error("Failed to unblock user: %s", exc) raise DBException.handle(exc) + + async def find_closest_user(self, *, embedding_literal: str) -> ClosestUserMatch | None: + row = await self.user_querier.find_closest_user_by_embedding( + dollar_1=embedding_literal, + ) + if row is None or row.distance is None: + return None + return ClosestUserMatch(user_id=row.id, distance=float(row.distance)) diff --git a/app/worker/single_face_match/main.py b/app/worker/single_face_match/main.py new file mode 100644 index 0000000..d47744b --- /dev/null +++ b/app/worker/single_face_match/main.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import asyncio + +from app.container import Container +from app.core.config import settings +from app.core.constant import MINIO_URL_PREFIX +from app.core.logger import logger +from app.infra.database import engine +from app.infra.minio import Bucket, IMAGES_BUCKET_NAME, init_minio_client +from app.infra.nats import NatsClient, NatsSubjects +from app.infra.redis import RedisClient +from app.schema.dto.single_face_match import BBoxPayload, SingleFaceMatchJob +from app.service.face_embedding import FaceEmbeddingService, FaceImagePayload +from app.service.face_match import SingleFaceMatchService + + +class SingleFaceMatchWorker: + def __init__(self, service: SingleFaceMatchService, face_embedding_service: FaceEmbeddingService) -> None: + self.service = service + self.face_embedding_service = face_embedding_service + + async def handle_message(self, data: bytes) -> None: + try: + job = SingleFaceMatchJob.model_validate_json(data) + except Exception as exc: + logger.warning("Failed to parse single face match job: %s", exc) + return + + try: + payload = await self._load_payload(job) + except Exception as exc: + logger.warning("Failed to load image payload for photo %s: %s", job.photo_id, exc) + return + + try: + faces = await self.face_embedding_service.detect_faces(payload) + except Exception as exc: + logger.warning("Face detection failed for photo %s: %s", job.photo_id, exc) + return + + if len(faces) != 1: + logger.info( + "Skipping photo %s: detected %s faces (single-face worker)", + job.photo_id, + len(faces), + ) + return + + face = faces[0] + bbox_payload = BBoxPayload( + x1=float(face.bbox[0]), + y1=float(face.bbox[1]), + x2=float(face.bbox[2]), + y2=float(face.bbox[3]), + ) + + try: + await self.service.process_detected_face(job, face.embedding, bbox_payload) + except Exception as exc: + logger.exception("Failed to process single face match job: %s", exc) + return + + async def _load_payload(self, job: SingleFaceMatchJob) -> FaceImagePayload: + if not job.image_ref: + raise ValueError("Missing image_ref in event payload") + + bucket_name, object_name = self._parse_minio_ref(job.image_ref) + bucket = Bucket(bucket_name, "") + last_exc: Exception | None = None + for attempt in range(1, settings.MINIO_RETRY_ATTEMPTS + 1): + try: + data, filename, content_type = await bucket.get(object_name) + return FaceImagePayload( + filename=filename, + content_type=content_type, + bytes=data, + ) + except Exception as exc: + last_exc = exc + logger.warning( + "MinIO fetch failed for %s (attempt %s/%s): %s", + object_name, + attempt, + settings.MINIO_RETRY_ATTEMPTS, + exc, + ) + if attempt < settings.MINIO_RETRY_ATTEMPTS: + await asyncio.sleep(settings.MINIO_RETRY_BASE_SECONDS * attempt) + assert last_exc is not None + raise last_exc + + @staticmethod + def _parse_minio_ref(image_ref: str) -> tuple[str, str]: + if image_ref.startswith(MINIO_URL_PREFIX): + raw = image_ref[len(MINIO_URL_PREFIX) :] + parts = raw.split("/", 1) + if len(parts) != 2 or not parts[0] or not parts[1]: + raise ValueError("Invalid MinIO image_ref format") + return parts[0], parts[1] + return IMAGES_BUCKET_NAME, image_ref + + +async def run_worker() -> None: + await init_minio_client( + minio_host=settings.MINIO_HOST, + minio_port=settings.MINIO_API_PORT, + minio_root_user=settings.MINIO_ROOT_USER, + minio_root_password=settings.MINIO_ROOT_PASSWORD, + ) + RedisClient( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + password=settings.REDIS_PASSWORD, + ) + + async with engine.connect() as conn: + container = Container(conn) + service = SingleFaceMatchService( + conn=conn, + photo_face_querier=container.photo_face_querier, + user_match_service=container.user_match_service, + user_notification_service=container.user_notifications_service, + ) + worker = SingleFaceMatchWorker(service, container.face_embedding_service) + + await NatsClient.js_subscribe( + subject=NatsSubjects.SINGLE_FACE_MATCH_REQUESTED, + callback=worker.handle_message, + stream_name=settings.NATS_SINGLE_FACE_MATCH_STREAM, + durable_name=settings.NATS_SINGLE_FACE_MATCH_DURABLE, + ) + + logger.info("SingleFaceMatchWorker subscribed; waiting for jobs") + try: + await asyncio.Event().wait() + finally: + await _close_minio() + await NatsClient.close() + + +async def _close_minio() -> None: + client = getattr(Bucket, "client", None) + if client is None: + return + close_session = getattr(client, "close_session", None) + if close_session is None: + return + try: + await close_session() + except Exception: + pass + + +if __name__ == "__main__": + asyncio.run(run_worker()) diff --git a/app/worker/single_face_match/worker.py b/app/worker/single_face_match/worker.py deleted file mode 100644 index fa8ef37..0000000 --- a/app/worker/single_face_match/worker.py +++ /dev/null @@ -1,85 +0,0 @@ -from __future__ import annotations - -import asyncio - -from app.container import Container -from app.core.config import settings -from app.core.logger import logger -from app.infra.database import engine -from app.infra.minio import Bucket, init_minio_client -from app.infra.nats import NatsClient, NatsSubjects -from app.infra.redis import RedisClient -from app.schema.dto.single_face_match import SingleFaceMatchJob -from app.service.single_face_match import SingleFaceMatchService - - -class SingleFaceMatchWorker: - def __init__(self, service: SingleFaceMatchService) -> None: - self.service = service - - async def handle_message(self, data: bytes) -> None: - try: - job = SingleFaceMatchJob.model_validate_json(data) - except Exception as exc: - logger.warning("Failed to parse single face match job: %s", exc) - return - - try: - await self.service.process_job(job) - except Exception as exc: - logger.exception("Failed to process single face match job: %s", exc) - return - - -async def run_worker() -> None: - await init_minio_client( - minio_host=settings.MINIO_HOST, - minio_port=settings.MINIO_API_PORT, - minio_root_user=settings.MINIO_ROOT_USER, - minio_root_password=settings.MINIO_ROOT_PASSWORD, - ) - RedisClient( - host=settings.REDIS_HOST, - port=settings.REDIS_PORT, - password=settings.REDIS_PASSWORD, - ) - - async with engine.connect() as conn: - container = Container(conn) - service = SingleFaceMatchService( - conn=conn, - face_embedding_service=container.face_embedding_service, - photo_face_querier=container.photo_face_querier, - ) - worker = SingleFaceMatchWorker(service) - - await NatsClient.js_subscribe( - subject=NatsSubjects.SINGLE_FACE_MATCH_REQUESTED, - callback=worker.handle_message, - stream_name=settings.NATS_SINGLE_FACE_MATCH_STREAM, - durable_name=settings.NATS_SINGLE_FACE_MATCH_DURABLE, - ) - - logger.info("SingleFaceMatchWorker subscribed; waiting for jobs") - try: - await asyncio.Event().wait() - finally: - await _close_minio() - await NatsClient.close() - - -async def _close_minio() -> None: - client = getattr(Bucket, "client", None) - if client is None: - return - close_session = getattr(client, "close_session", None) - if close_session is None: - return - try: - await close_session() - except Exception: - pass - - -if __name__ == "__main__": - asyncio.run(run_worker()) diff --git a/db/generated/devices.py b/db/generated/devices.py index 2514ff9..fb2e064 100644 --- a/db/generated/devices.py +++ b/db/generated/devices.py @@ -37,7 +37,7 @@ ) VALUES ( COALESCE(:p1, uuid_generate_v4()), :p2, :p3, :p4, :p5 ) -RETURNING id, user_id, device_name, device_type, push_token, totp_secret, is_active, is_invalid_token, is_2fa_enabled, last_active, created_at +RETURNING id, user_id, device_name, device_type, totp_secret, is_2fa_enabled, last_active, created_at, push_token, is_active, is_invalid_token """ @@ -68,13 +68,13 @@ class CreateDeviceParams: GET_DEVICE__BY_ID = """-- name: get_device__by_id \\:one -SELECT id, user_id, device_name, device_type, push_token, totp_secret, is_active, is_invalid_token, is_2fa_enabled, last_active, created_at from user_devices +SELECT id, user_id, device_name, device_type, totp_secret, is_2fa_enabled, last_active, created_at, push_token, is_active, is_invalid_token from user_devices WHERE id =:p1 """ LIST_USER_DEVICES = """-- name: list_user_devices \\:many -SELECT id, user_id, device_name, device_type, push_token, totp_secret, is_active, is_invalid_token, is_2fa_enabled, last_active, created_at +SELECT id, user_id, device_name, device_type, totp_secret, is_2fa_enabled, last_active, created_at, push_token, is_active, is_invalid_token FROM user_devices WHERE user_id = :p1 ORDER BY last_active DESC @@ -112,7 +112,7 @@ class CreateDeviceParams: is_invalid_token = FALSE WHERE id = :p1 AND user_id = :p3 -RETURNING id, user_id, device_name, device_type, push_token, totp_secret, is_active, is_invalid_token, is_2fa_enabled, last_active, created_at +RETURNING id, user_id, device_name, device_type, totp_secret, is_2fa_enabled, last_active, created_at, push_token, is_active, is_invalid_token """ @@ -144,13 +144,13 @@ async def create_device(self, arg: CreateDeviceParams) -> Optional[models.UserDe user_id=row[1], device_name=row[2], device_type=row[3], - push_token=row[4], - totp_secret=row[5], - is_active=row[6], - is_invalid_token=row[7], - is_2fa_enabled=row[8], - last_active=row[9], - created_at=row[10], + totp_secret=row[4], + is_2fa_enabled=row[5], + last_active=row[6], + created_at=row[7], + push_token=row[8], + is_active=row[9], + is_invalid_token=row[10], ) async def deactivate_device(self, *, id: uuid.UUID, user_id: uuid.UUID) -> None: @@ -168,13 +168,13 @@ async def get_device__by_id(self, *, id: uuid.UUID) -> Optional[models.UserDevic user_id=row[1], device_name=row[2], device_type=row[3], - push_token=row[4], - totp_secret=row[5], - is_active=row[6], - is_invalid_token=row[7], - is_2fa_enabled=row[8], - last_active=row[9], - created_at=row[10], + totp_secret=row[4], + is_2fa_enabled=row[5], + last_active=row[6], + created_at=row[7], + push_token=row[8], + is_active=row[9], + is_invalid_token=row[10], ) async def list_user_devices(self, *, user_id: uuid.UUID) -> AsyncIterator[models.UserDevice]: @@ -185,13 +185,13 @@ async def list_user_devices(self, *, user_id: uuid.UUID) -> AsyncIterator[models user_id=row[1], device_name=row[2], device_type=row[3], - push_token=row[4], - totp_secret=row[5], - is_active=row[6], - is_invalid_token=row[7], - is_2fa_enabled=row[8], - last_active=row[9], - created_at=row[10], + totp_secret=row[4], + is_2fa_enabled=row[5], + last_active=row[6], + created_at=row[7], + push_token=row[8], + is_active=row[9], + is_invalid_token=row[10], ) async def mark_device_token_invalid(self, *, push_token: Optional[str]) -> None: @@ -212,11 +212,11 @@ async def update_device_push_token(self, *, id: uuid.UUID, push_token: Optional[ user_id=row[1], device_name=row[2], device_type=row[3], - push_token=row[4], - totp_secret=row[5], - is_active=row[6], - is_invalid_token=row[7], - is_2fa_enabled=row[8], - last_active=row[9], - created_at=row[10], + totp_secret=row[4], + is_2fa_enabled=row[5], + last_active=row[6], + created_at=row[7], + push_token=row[8], + is_active=row[9], + is_invalid_token=row[10], ) diff --git a/db/generated/models.py b/db/generated/models.py index f54caf5..b0dcb01 100644 --- a/db/generated/models.py +++ b/db/generated/models.py @@ -221,7 +221,6 @@ class User: updated_at: datetime.datetime display_name: Optional[str] face_embedding: Optional[Any] - blocked: bool deleted_at: Optional[datetime.datetime] blocked: bool @@ -232,13 +231,13 @@ class UserDevice: user_id: uuid.UUID device_name: Optional[str] device_type: Optional[str] - push_token: Optional[str] totp_secret: Optional[str] - is_active: bool - is_invalid_token: bool is_2fa_enabled: bool last_active: datetime.datetime created_at: datetime.datetime + push_token: Optional[str] + is_active: bool + is_invalid_token: bool @dataclasses.dataclass() diff --git a/db/generated/photo_faces.py b/db/generated/photo_faces.py index 09d76f1..58f83be 100644 --- a/db/generated/photo_faces.py +++ b/db/generated/photo_faces.py @@ -2,6 +2,7 @@ # versions: # sqlc v1.30.0 # source: photo_faces.sql +import dataclasses from typing import Any, Optional import uuid @@ -11,6 +12,100 @@ from db.generated import models +PHOTO_FACES_ENSURE_FACE_MATCH = """-- name: photo_faces_ensure_face_match \\:one +WITH upserted_photo_face AS ( + INSERT INTO photo_faces ( + photo_id, + face_index, + embedding, + bbox + ) VALUES ( + :p1, + :p2, + CAST(:p3 AS vector), + :p4 + ) ON CONFLICT (photo_id, face_index) + DO UPDATE SET embedding = EXCLUDED.embedding, + bbox = EXCLUDED.bbox + RETURNING id, photo_id +), +existing_match AS ( + SELECT 1 + FROM face_matches fm + JOIN photo_faces pf ON pf.id = fm.photo_face_id + WHERE pf.photo_id = :p1 + LIMIT 1 +), +inserted_match AS ( + INSERT INTO face_matches (photo_face_id, user_id, confidence) + SELECT upserted_photo_face.id, :p5, :p6 + WHERE NOT EXISTS (SELECT 1 FROM existing_match) + RETURNING id +) +SELECT upserted_photo_face.id AS photo_face_id, + inserted_match.id AS face_match_id +FROM upserted_photo_face +LEFT JOIN inserted_match ON TRUE +""" + + +@dataclasses.dataclass() +class PhotoFacesEnsureFaceMatchParams: + photo_id: uuid.UUID + face_index: int + column_3: Any + bbox: Optional[str] + user_id: uuid.UUID + confidence: float + + +@dataclasses.dataclass() +class PhotoFacesEnsureFaceMatchRow: + photo_face_id: uuid.UUID + face_match_id: Optional[uuid.UUID] + + +PHOTO_FACES_FIND_CLOSEST_USER = """-- name: photo_faces_find_closest_user \\:one +SELECT id, + (face_embedding <=> CAST(:p1 AS vector)) AS distance +FROM users +WHERE face_embedding IS NOT NULL +ORDER BY distance ASC +LIMIT 1 +""" + + +@dataclasses.dataclass() +class PhotoFacesFindClosestUserRow: + id: uuid.UUID + distance: Optional[Any] + + +PHOTO_FACES_MATCH_EXISTS_FOR_PHOTO = """-- name: photo_faces_match_exists_for_photo \\:one +SELECT 1 +FROM face_matches fm +JOIN photo_faces pf ON pf.id = fm.photo_face_id +WHERE pf.photo_id = :p1 +LIMIT 1 +""" + + +PHOTO_FACES_MATCH_EXISTS_FOR_PHOTO_FACE = """-- name: photo_faces_match_exists_for_photo_face \\:one +SELECT 1 +FROM face_matches +WHERE photo_face_id = :p1 +LIMIT 1 +""" + + +PHOTO_FACES_PHOTO_EXISTS = """-- name: photo_faces_photo_exists \\:one +SELECT 1 +FROM photos +WHERE id = :p1 +LIMIT 1 +""" + + UPSERT_PHOTO_FACE = """-- name: upsert_photo_face \\:one INSERT INTO photo_faces ( photo_id, @@ -31,6 +126,49 @@ class AsyncQuerier: def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection): self._conn = conn + async def photo_faces_ensure_face_match(self, arg: PhotoFacesEnsureFaceMatchParams) -> Optional[PhotoFacesEnsureFaceMatchRow]: + row = (await self._conn.execute(sqlalchemy.text(PHOTO_FACES_ENSURE_FACE_MATCH), { + "p1": arg.photo_id, + "p2": arg.face_index, + "p3": arg.column_3, + "p4": arg.bbox, + "p5": arg.user_id, + "p6": arg.confidence, + })).first() + if row is None: + return None + return PhotoFacesEnsureFaceMatchRow( + photo_face_id=row[0], + face_match_id=row[1], + ) + + async def photo_faces_find_closest_user(self, *, dollar_1: Any) -> Optional[PhotoFacesFindClosestUserRow]: + row = (await self._conn.execute(sqlalchemy.text(PHOTO_FACES_FIND_CLOSEST_USER), {"p1": dollar_1})).first() + if row is None: + return None + return PhotoFacesFindClosestUserRow( + id=row[0], + distance=row[1], + ) + + async def photo_faces_match_exists_for_photo(self, *, photo_id: uuid.UUID) -> Optional[int]: + row = (await self._conn.execute(sqlalchemy.text(PHOTO_FACES_MATCH_EXISTS_FOR_PHOTO), {"p1": photo_id})).first() + if row is None: + return None + return row[0] + + async def photo_faces_match_exists_for_photo_face(self, *, photo_face_id: uuid.UUID) -> Optional[int]: + row = (await self._conn.execute(sqlalchemy.text(PHOTO_FACES_MATCH_EXISTS_FOR_PHOTO_FACE), {"p1": photo_face_id})).first() + if row is None: + return None + return row[0] + + async def photo_faces_photo_exists(self, *, id: uuid.UUID) -> Optional[int]: + row = (await self._conn.execute(sqlalchemy.text(PHOTO_FACES_PHOTO_EXISTS), {"p1": id})).first() + if row is None: + return None + return row[0] + async def upsert_photo_face(self, *, photo_id: uuid.UUID, face_index: int, dollar_3: Any, bbox: Optional[str]) -> Optional[models.PhotoFace]: row = (await self._conn.execute(sqlalchemy.text(UPSERT_PHOTO_FACE), { "p1": photo_id, diff --git a/db/generated/user.py b/db/generated/user.py index e812192..a5ee595 100644 --- a/db/generated/user.py +++ b/db/generated/user.py @@ -2,6 +2,7 @@ # versions: # sqlc v1.30.0 # source: user.sql +import dataclasses from typing import Any, AsyncIterator, Optional import uuid @@ -14,7 +15,7 @@ CREATE_USER = """-- name: create_user \\:one INSERT INTO users (email, hashed_password) VALUES (:p1, :p2) -RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked """ @@ -24,22 +25,38 @@ """ +FIND_CLOSEST_USER_BY_EMBEDDING = """-- name: find_closest_user_by_embedding \\:one +SELECT id, + (face_embedding <=> :p1\\:\\:vector) AS distance +FROM users +WHERE face_embedding IS NOT NULL +ORDER BY distance ASC +LIMIT 1 +""" + + +@dataclasses.dataclass() +class FindClosestUserByEmbeddingRow: + id: uuid.UUID + distance: Optional[Any] + + GET_USER_BY_EMAIL = """-- name: get_user_by_email \\:one -SELECT id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +SELECT id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked FROM users WHERE email = :p1 """ GET_USER_BY_ID = """-- name: get_user_by_id \\:one -SELECT id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +SELECT id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked FROM users WHERE id = :p1 """ LIST_USERS = """-- name: list_users \\:many -SELECT id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +SELECT id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked FROM users ORDER BY created_at DESC LIMIT :p1 OFFSET :p2 @@ -51,7 +68,7 @@ SET blocked = :p1, updated_at = NOW() WHERE id = :p2 -RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked """ @@ -60,7 +77,7 @@ SET face_embedding = :p1\\:\\:vector, updated_at = NOW() WHERE id = :p2 -RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked """ @@ -71,7 +88,7 @@ blocked = COALESCE(:p3, blocked), updated_at = NOW() WHERE id = :p4 -RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked """ @@ -80,7 +97,7 @@ SET hashed_password = :p1, updated_at = NOW() WHERE id = :p2 -RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, blocked, deleted_at +RETURNING id, email, hashed_password, created_at, updated_at, display_name, face_embedding, deleted_at, blocked """ @@ -88,14 +105,8 @@ class AsyncQuerier: def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection): self._conn = conn - async def create_user( - self, *, email: str, hashed_password: Optional[str] - ) -> Optional[models.User]: - row = ( - await self._conn.execute( - sqlalchemy.text(CREATE_USER), {"p1": email, "p2": hashed_password} - ) - ).first() + async def create_user(self, *, email: str, hashed_password: Optional[str]) -> Optional[models.User]: + row = (await self._conn.execute(sqlalchemy.text(CREATE_USER), {"p1": email, "p2": hashed_password})).first() if row is None: return None return models.User( @@ -106,19 +117,24 @@ async def create_user( updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) async def delete_user(self, *, id: uuid.UUID) -> None: await self._conn.execute(sqlalchemy.text(DELETE_USER), {"p1": id}) + async def find_closest_user_by_embedding(self, *, dollar_1: Any) -> Optional[FindClosestUserByEmbeddingRow]: + row = (await self._conn.execute(sqlalchemy.text(FIND_CLOSEST_USER_BY_EMBEDDING), {"p1": dollar_1})).first() + if row is None: + return None + return FindClosestUserByEmbeddingRow( + id=row[0], + distance=row[1], + ) + async def get_user_by_email(self, *, email: str) -> Optional[models.User]: - row = ( - await self._conn.execute( - sqlalchemy.text(GET_USER_BY_EMAIL), {"p1": email} - ) - ).first() + row = (await self._conn.execute(sqlalchemy.text(GET_USER_BY_EMAIL), {"p1": email})).first() if row is None: return None return models.User( @@ -129,14 +145,12 @@ async def get_user_by_email(self, *, email: str) -> Optional[models.User]: updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) async def get_user_by_id(self, *, id: uuid.UUID) -> Optional[models.User]: - row = ( - await self._conn.execute(sqlalchemy.text(GET_USER_BY_ID), {"p1": id}) - ).first() + row = (await self._conn.execute(sqlalchemy.text(GET_USER_BY_ID), {"p1": id})).first() if row is None: return None return models.User( @@ -147,16 +161,12 @@ async def get_user_by_id(self, *, id: uuid.UUID) -> Optional[models.User]: updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) - async def list_users( - self, *, limit: int, offset: int - ) -> AsyncIterator[models.User]: - result = await self._conn.stream( - sqlalchemy.text(LIST_USERS), {"p1": limit, "p2": offset} - ) + async def list_users(self, *, limit: int, offset: int) -> AsyncIterator[models.User]: + result = await self._conn.stream(sqlalchemy.text(LIST_USERS), {"p1": limit, "p2": offset}) async for row in result: yield models.User( id=row[0], @@ -166,16 +176,12 @@ async def list_users( updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) async def set_user_blocked(self, *, blocked: bool, id: uuid.UUID) -> Optional[models.User]: - row = ( - await self._conn.execute( - sqlalchemy.text(SET_USER_BLOCKED), {"p1": blocked, "p2": id} - ) - ).first() + row = (await self._conn.execute(sqlalchemy.text(SET_USER_BLOCKED), {"p1": blocked, "p2": id})).first() if row is None: return None return models.User( @@ -186,16 +192,12 @@ async def set_user_blocked(self, *, blocked: bool, id: uuid.UUID) -> Optional[mo updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) async def set_user_embedding(self, *, dollar_1: Any, id: uuid.UUID) -> Optional[models.User]: - row = ( - await self._conn.execute( - sqlalchemy.text(SET_USER_EMBEDDING), {"p1": dollar_1, "p2": id} - ) - ).first() + row = (await self._conn.execute(sqlalchemy.text(SET_USER_EMBEDDING), {"p1": dollar_1, "p2": id})).first() if row is None: return None return models.User( @@ -206,29 +208,17 @@ async def set_user_embedding(self, *, dollar_1: Any, id: uuid.UUID) -> Optional[ updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) - async def update_user( - self, - *, - email: str, - display_name: Optional[str], - blocked: bool, - id: uuid.UUID, - ) -> Optional[models.User]: - row = ( - await self._conn.execute( - sqlalchemy.text(UPDATE_USER), - { - "p1": email, - "p2": display_name, - "p3": blocked, - "p4": id, - }, - ) - ).first() + async def update_user(self, *, email: str, display_name: Optional[str], blocked: bool, id: uuid.UUID) -> Optional[models.User]: + row = (await self._conn.execute(sqlalchemy.text(UPDATE_USER), { + "p1": email, + "p2": display_name, + "p3": blocked, + "p4": id, + })).first() if row is None: return None return models.User( @@ -239,19 +229,12 @@ async def update_user( updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) - async def update_user_password( - self, *, hashed_password: Optional[str], id: uuid.UUID - ) -> Optional[models.User]: - row = ( - await self._conn.execute( - sqlalchemy.text(UPDATE_USER_PASSWORD), - {"p1": hashed_password, "p2": id}, - ) - ).first() + async def update_user_password(self, *, hashed_password: Optional[str], id: uuid.UUID) -> Optional[models.User]: + row = (await self._conn.execute(sqlalchemy.text(UPDATE_USER_PASSWORD), {"p1": hashed_password, "p2": id})).first() if row is None: return None return models.User( @@ -262,6 +245,6 @@ async def update_user_password( updated_at=row[4], display_name=row[5], face_embedding=row[6], - blocked=row[7], - deleted_at=row[8], + deleted_at=row[7], + blocked=row[8], ) diff --git a/db/queries/photo_faces.sql b/db/queries/photo_faces.sql index de3ffbb..3380700 100644 --- a/db/queries/photo_faces.sql +++ b/db/queries/photo_faces.sql @@ -11,3 +11,65 @@ ON CONFLICT (photo_id, face_index) DO UPDATE SET embedding = EXCLUDED.embedding, bbox = EXCLUDED.bbox RETURNING *; + +-- name: PhotoFacesPhotoExists :one +SELECT 1 +FROM photos +WHERE id = $1 +LIMIT 1; + +-- name: PhotoFacesMatchExistsForPhoto :one +SELECT 1 +FROM face_matches fm +JOIN photo_faces pf ON pf.id = fm.photo_face_id +WHERE pf.photo_id = $1 +LIMIT 1; + +-- name: PhotoFacesMatchExistsForPhotoFace :one +SELECT 1 +FROM face_matches +WHERE photo_face_id = $1 +LIMIT 1; + +-- name: PhotoFacesFindClosestUser :one +SELECT id, + (face_embedding <=> CAST($1 AS vector)) AS distance +FROM users +WHERE face_embedding IS NOT NULL +ORDER BY distance ASC +LIMIT 1; + +-- name: PhotoFacesEnsureFaceMatch :one +WITH upserted_photo_face AS ( + INSERT INTO photo_faces ( + photo_id, + face_index, + embedding, + bbox + ) VALUES ( + $1, + $2, + CAST($3 AS vector), + $4 + ) ON CONFLICT (photo_id, face_index) + DO UPDATE SET embedding = EXCLUDED.embedding, + bbox = EXCLUDED.bbox + RETURNING id, photo_id +), +existing_match AS ( + SELECT 1 + FROM face_matches fm + JOIN photo_faces pf ON pf.id = fm.photo_face_id + WHERE pf.photo_id = $1 + LIMIT 1 +), +inserted_match AS ( + INSERT INTO face_matches (photo_face_id, user_id, confidence) + SELECT upserted_photo_face.id, $5, $6 + WHERE NOT EXISTS (SELECT 1 FROM existing_match) + RETURNING id +) +SELECT upserted_photo_face.id AS photo_face_id, + inserted_match.id AS face_match_id +FROM upserted_photo_face +LEFT JOIN inserted_match ON TRUE; diff --git a/db/queries/user.sql b/db/queries/user.sql index bc3fdd8..f940d45 100644 --- a/db/queries/user.sql +++ b/db/queries/user.sql @@ -52,3 +52,11 @@ SET face_embedding = $1::vector, updated_at = NOW() WHERE id = $2 RETURNING *; + +-- name: FindClosestUserByEmbedding :one +SELECT id, + (face_embedding <=> $1::vector) AS distance +FROM users +WHERE face_embedding IS NOT NULL +ORDER BY distance ASC +LIMIT 1; diff --git a/migrations/versions/4dd6658b9f83_merge_heads.py b/migrations/versions/4dd6658b9f83_merge_heads.py index b63cff0..e7c5aea 100644 --- a/migrations/versions/4dd6658b9f83_merge_heads.py +++ b/migrations/versions/4dd6658b9f83_merge_heads.py @@ -7,8 +7,6 @@ """ from typing import Sequence, Union -from alembic import op -import sqlalchemy as sa # revision identifiers, used by Alembic. diff --git a/migrations/versions/5b6615c9ab1d_merge_heads.py b/migrations/versions/5b6615c9ab1d_merge_heads.py index cea5228..0451545 100644 --- a/migrations/versions/5b6615c9ab1d_merge_heads.py +++ b/migrations/versions/5b6615c9ab1d_merge_heads.py @@ -7,8 +7,6 @@ """ from typing import Sequence, Union -from alembic import op -import sqlalchemy as sa # revision identifiers, used by Alembic. From 3bdcc3af0f93ee5498d316c076e8b924867a9d43 Mon Sep 17 00:00:00 2001 From: wailbentafat Date: Wed, 25 Mar 2026 23:17:30 +0100 Subject: [PATCH 02/10] refactor the schema folder add intenral file --- app/schema/{ => internal}/notification.py | 0 .../{dto => internal}/single_face_match.py | 7 +++++++ app/schema/{dto/staff => internal}/uploads.py | 0 app/schema/request/staff/uploads.py | 2 +- app/service/face_match.py | 19 +++++++++---------- app/service/upload_requests.py | 2 +- app/service/user_match.py | 19 +++++++++++++++++++ app/service/user_notification.py | 2 +- app/worker/notification/firebase.py | 2 +- app/worker/notification/notification_queue.py | 2 +- app/worker/notification/settings.py | 2 +- app/worker/single_face_match/main.py | 4 ++-- 12 files changed, 43 insertions(+), 18 deletions(-) rename app/schema/{ => internal}/notification.py (100%) rename app/schema/{dto => internal}/single_face_match.py (83%) rename app/schema/{dto/staff => internal}/uploads.py (100%) create mode 100644 app/service/user_match.py diff --git a/app/schema/notification.py b/app/schema/internal/notification.py similarity index 100% rename from app/schema/notification.py rename to app/schema/internal/notification.py diff --git a/app/schema/dto/single_face_match.py b/app/schema/internal/single_face_match.py similarity index 83% rename from app/schema/dto/single_face_match.py rename to app/schema/internal/single_face_match.py index e691808..a5ebe67 100644 --- a/app/schema/dto/single_face_match.py +++ b/app/schema/internal/single_face_match.py @@ -23,3 +23,10 @@ class SingleFaceMatchJob(BaseModel): submitted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) model_config = {"extra": "allow"} + + +class ClosestUserMatch(BaseModel): + user_id: UUID + distance: float + + model_config = {"extra": "forbid"} diff --git a/app/schema/dto/staff/uploads.py b/app/schema/internal/uploads.py similarity index 100% rename from app/schema/dto/staff/uploads.py rename to app/schema/internal/uploads.py diff --git a/app/schema/request/staff/uploads.py b/app/schema/request/staff/uploads.py index ed50627..b1fb68f 100644 --- a/app/schema/request/staff/uploads.py +++ b/app/schema/request/staff/uploads.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, Field, field_validator from uuid import UUID -from app.schema.dto.staff.uploads import UploadPhotoInput +from app.schema.internal.uploads import UploadPhotoInput MAX_UPLOAD_BATCH_SIZE = 20 diff --git a/app/service/face_match.py b/app/service/face_match.py index 9f6dbf8..749a574 100644 --- a/app/service/face_match.py +++ b/app/service/face_match.py @@ -8,9 +8,9 @@ from app.core.logger import logger from app.schema.dto.single_face_match import BBoxPayload, SingleFaceMatchJob -from app.service.user_match import ClosestUserMatch, UserMatchService +from app.schema.internal.single_face_match import ClosestUserMatch +from app.service.user_match import UserMatchService from app.service.user_notification import UserNotificationService -from app.service.users import AuthService from db.generated import photo_faces as photo_face_queries @@ -20,12 +20,12 @@ def __init__( *, conn: AsyncConnection, photo_face_querier: photo_face_queries.AsyncQuerier, - user_service: AuthService, + user_match_service: UserMatchService, user_notification_service: UserNotificationService, ) -> None: self.conn = conn self.photo_face_querier = photo_face_querier - self.user_service = user_service + self.user_match_service = user_match_service self.user_notification_service = user_notification_service async def process_detected_face( @@ -46,15 +46,15 @@ async def process_detected_face( try: async with self.conn.begin(): - if not await self.photo_exists(job.photo_id): + if not await self._photo_exists(job.photo_id): logger.warning("Photo not found: %s", job.photo_id) return - if await self.match_exists_for_photo(job.photo_id): + if await self._match_exists_for_photo(job.photo_id): logger.info("Photo %s already matched; skipping", job.photo_id) return - matched_user = await self.user_service.find_closest_user( + matched_user = await self.user_match_service.find_closest_user( embedding_literal=embedding_literal, ) if matched_user is None: @@ -100,11 +100,11 @@ async def process_detected_face( }, ) - async def photo_exists(self, photo_id: UUID) -> bool: + async def _photo_exists(self, photo_id: UUID) -> bool: row = await self.photo_face_querier.photo_faces_photo_exists(id=photo_id) return row is not None - async def match_exists_for_photo(self, photo_id: UUID) -> bool: + async def _match_exists_for_photo(self, photo_id: UUID) -> bool: row = await self.photo_face_querier.photo_faces_match_exists_for_photo( photo_id=photo_id, ) @@ -121,4 +121,3 @@ def _serialize_bbox(bbox: BBoxPayload | None) -> str | None: return json.dumps( {"x1": bbox.x1, "y1": bbox.y1, "x2": bbox.x2, "y2": bbox.y2} ) -*** End Patch diff --git a/app/service/upload_requests.py b/app/service/upload_requests.py index 13759dd..b42eced 100644 --- a/app/service/upload_requests.py +++ b/app/service/upload_requests.py @@ -11,7 +11,7 @@ from app.infra.nats import NatsClient, NatsSubjects from sqlalchemy.exc import IntegrityError -from app.schema.dto.staff.uploads import UploadPhotoInput +from app.schema.internal.uploads import UploadPhotoInput from app.service.staged_upload_storage import PreviewObject, StagedUploadStorageService from app.service.staff_drive import StaffDriveService from app.service.staff_notifications import StaffNotificationsService diff --git a/app/service/user_match.py b/app/service/user_match.py new file mode 100644 index 0000000..f6ce94d --- /dev/null +++ b/app/service/user_match.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from uuid import UUID + +from db.generated import user as user_queries +from app.schema.internal.single_face_match import ClosestUserMatch + + +class UserMatchService: + def __init__(self, *, user_querier: user_queries.AsyncQuerier) -> None: + self.user_querier = user_querier + + async def find_closest_user(self, *, embedding_literal: str) -> ClosestUserMatch | None: + row = await self.user_querier.find_closest_user_by_embedding( + dollar_1=embedding_literal, + ) + if row is None or row.distance is None: + return None + return ClosestUserMatch(user_id=row.id, distance=float(row.distance)) diff --git a/app/service/user_notification.py b/app/service/user_notification.py index 1d73971..a218b50 100644 --- a/app/service/user_notification.py +++ b/app/service/user_notification.py @@ -2,7 +2,7 @@ import uuid from app.core.exceptions import AppException -from app.schema.notification import UnifiedNotification +from app.schema.internal.notification import UnifiedNotification from app.worker.notification.notification_queue import NotificationQueue from db.generated import notifications as notification_queries from db.generated.models import Notification diff --git a/app/worker/notification/firebase.py b/app/worker/notification/firebase.py index ffc2360..356de0e 100644 --- a/app/worker/notification/firebase.py +++ b/app/worker/notification/firebase.py @@ -8,7 +8,7 @@ from app.core.config import settings from app.core.logger import logger -from app.schema.notification import UnifiedNotification +from app.schema.internal.notification import UnifiedNotification INVALID_TOKEN_CODES = { diff --git a/app/worker/notification/notification_queue.py b/app/worker/notification/notification_queue.py index bf1589d..463de93 100644 --- a/app/worker/notification/notification_queue.py +++ b/app/worker/notification/notification_queue.py @@ -1,7 +1,7 @@ from typing import Sequence from pydantic import BaseModel, ConfigDict, Field from app.infra.nats import NatsClient -from app.schema.notification import NotificationPriority, PRIORITY_ORDER, UnifiedNotification +from app.schema.internal.notification import NotificationPriority, PRIORITY_ORDER, UnifiedNotification from app.worker.notification.settings import NotificationWorkerSettings diff --git a/app/worker/notification/settings.py b/app/worker/notification/settings.py index 4d23dd7..0c25f32 100644 --- a/app/worker/notification/settings.py +++ b/app/worker/notification/settings.py @@ -5,7 +5,7 @@ from pydantic import Field from pydantic_settings import BaseSettings -from app.schema.notification import NotificationPriority, PRIORITY_ORDER +from app.schema.internal.notification import NotificationPriority, PRIORITY_ORDER class NotificationWorkerSettings(BaseSettings): diff --git a/app/worker/single_face_match/main.py b/app/worker/single_face_match/main.py index d47744b..505597f 100644 --- a/app/worker/single_face_match/main.py +++ b/app/worker/single_face_match/main.py @@ -10,7 +10,7 @@ from app.infra.minio import Bucket, IMAGES_BUCKET_NAME, init_minio_client from app.infra.nats import NatsClient, NatsSubjects from app.infra.redis import RedisClient -from app.schema.dto.single_face_match import BBoxPayload, SingleFaceMatchJob +from app.schema.internal.single_face_match import BBoxPayload, SingleFaceMatchJob from app.service.face_embedding import FaceEmbeddingService, FaceImagePayload from app.service.face_match import SingleFaceMatchService @@ -119,7 +119,7 @@ async def run_worker() -> None: service = SingleFaceMatchService( conn=conn, photo_face_querier=container.photo_face_querier, - user_match_service=container.user_match_service, + user_service=container.auth_service, user_notification_service=container.user_notifications_service, ) worker = SingleFaceMatchWorker(service, container.face_embedding_service) From 43d78848722c193070c034f446ba3099a3af7a2f Mon Sep 17 00:00:00 2001 From: wailbentafat Date: Wed, 25 Mar 2026 23:21:42 +0100 Subject: [PATCH 03/10] refacor dto and logique of the service detection --- app/infra/nats.py | 2 +- app/service/face_embedding.py | 2 +- app/service/face_match.py | 10 ++++------ app/service/user_match.py | 19 ------------------- app/service/users.py | 1 + app/worker/single_face_match/main.py | 2 +- 6 files changed, 8 insertions(+), 28 deletions(-) delete mode 100644 app/service/user_match.py diff --git a/app/infra/nats.py b/app/infra/nats.py index 5f01b55..e691c36 100644 --- a/app/infra/nats.py +++ b/app/infra/nats.py @@ -130,7 +130,7 @@ async def ensure_stream(*, stream_name: str, subjects: list[str]) -> None: try: await js.stream_info(stream_name) except NotFoundError: - await js.add_stream( + await js.add_stream( # type: ignore name=stream_name, config=StreamConfig( name=stream_name, diff --git a/app/service/face_embedding.py b/app/service/face_embedding.py index 71295e3..e5333a0 100644 --- a/app/service/face_embedding.py +++ b/app/service/face_embedding.py @@ -187,7 +187,7 @@ async def compute_event_embedding( image = self._decode_image(payload) image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - faces: list[FaceStub] = await asyncio.to_thread( + faces: list[FaceStub] = await asyncio.to_thread( # type: ignore self.face_embedding.model.get, image_rgb # type: ignore ) diff --git a/app/service/face_match.py b/app/service/face_match.py index 749a574..3f04d44 100644 --- a/app/service/face_match.py +++ b/app/service/face_match.py @@ -7,10 +7,9 @@ from sqlalchemy.ext.asyncio import AsyncConnection from app.core.logger import logger -from app.schema.dto.single_face_match import BBoxPayload, SingleFaceMatchJob -from app.schema.internal.single_face_match import ClosestUserMatch -from app.service.user_match import UserMatchService +from app.schema.internal.single_face_match import BBoxPayload, ClosestUserMatch, SingleFaceMatchJob from app.service.user_notification import UserNotificationService +from app.service.users import AuthService from db.generated import photo_faces as photo_face_queries @@ -20,7 +19,7 @@ def __init__( *, conn: AsyncConnection, photo_face_querier: photo_face_queries.AsyncQuerier, - user_match_service: UserMatchService, + user_match_service: AuthService, user_notification_service: UserNotificationService, ) -> None: self.conn = conn @@ -90,13 +89,12 @@ async def process_detected_face( logger.error("Out of memory while matching photo %s", job.photo_id) return - if created_face_match_id and matched_user is not None: + if created_face_match_id : await self.user_notification_service.create_notification( user_id=matched_user.user_id, type="face_match", payload={ "photo_id": str(job.photo_id), - "confidence": matched_user.distance, }, ) diff --git a/app/service/user_match.py b/app/service/user_match.py deleted file mode 100644 index f6ce94d..0000000 --- a/app/service/user_match.py +++ /dev/null @@ -1,19 +0,0 @@ -from __future__ import annotations - -from uuid import UUID - -from db.generated import user as user_queries -from app.schema.internal.single_face_match import ClosestUserMatch - - -class UserMatchService: - def __init__(self, *, user_querier: user_queries.AsyncQuerier) -> None: - self.user_querier = user_querier - - async def find_closest_user(self, *, embedding_literal: str) -> ClosestUserMatch | None: - row = await self.user_querier.find_closest_user_by_embedding( - dollar_1=embedding_literal, - ) - if row is None or row.distance is None: - return None - return ClosestUserMatch(user_id=row.id, distance=float(row.distance)) diff --git a/app/service/users.py b/app/service/users.py index 8700878..f5329e8 100644 --- a/app/service/users.py +++ b/app/service/users.py @@ -22,6 +22,7 @@ from db.generated.models import User, UserDevice from app.core.logger import logger from app.service.face_embedding import FaceImagePayload, FaceEmbeddingService +from app.schema.internal.single_face_match import ClosestUserMatch class AuthService: diff --git a/app/worker/single_face_match/main.py b/app/worker/single_face_match/main.py index 505597f..f505e18 100644 --- a/app/worker/single_face_match/main.py +++ b/app/worker/single_face_match/main.py @@ -119,7 +119,7 @@ async def run_worker() -> None: service = SingleFaceMatchService( conn=conn, photo_face_querier=container.photo_face_querier, - user_service=container.auth_service, + user_match_service=container.auth_service, user_notification_service=container.user_notifications_service, ) worker = SingleFaceMatchWorker(service, container.face_embedding_service) From 4691baf9cf63d93388bcd77e0afba9e477e9586e Mon Sep 17 00:00:00 2001 From: wailbentafat Date: Wed, 25 Mar 2026 23:24:26 +0100 Subject: [PATCH 04/10] rename fucntion to follow convention and add readbailite --- app/service/face_match.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/service/face_match.py b/app/service/face_match.py index 3f04d44..e463ec9 100644 --- a/app/service/face_match.py +++ b/app/service/face_match.py @@ -45,7 +45,7 @@ async def process_detected_face( try: async with self.conn.begin(): - if not await self._photo_exists(job.photo_id): + if not await self.Check_photo_exists(job.photo_id): logger.warning("Photo not found: %s", job.photo_id) return @@ -98,7 +98,7 @@ async def process_detected_face( }, ) - async def _photo_exists(self, photo_id: UUID) -> bool: + async def Check_photo_exists(self, photo_id: UUID) -> bool: row = await self.photo_face_querier.photo_faces_photo_exists(id=photo_id) return row is not None From b366f8048ee3d989a89703ef95e36ef078d106a9 Mon Sep 17 00:00:00 2001 From: bouhamza abderrahmane Date: Thu, 26 Mar 2026 03:32:54 +0100 Subject: [PATCH 05/10] refactor(sql): replace SELECT * with explicit columns --- db/queries/devices.sql | 53 +++++++++++++-- db/queries/eventParticipant.sql | 8 ++- db/queries/events.sql | 66 +++++++++++++++++-- db/queries/photo_faces.sql | 9 ++- db/queries/photos.sql | 11 +++- db/queries/session.sql | 24 ++++++- db/queries/staff_drive_connections.sql | 30 ++++++++- db/queries/staff_notifications.sql | 24 ++++++- db/queries/stuff_user.sql | 56 ++++++++++++++-- db/queries/upload_request_photos.sql | 90 ++++++++++++++++++++++++-- db/queries/upload_requests.sql | 60 +++++++++++++++-- db/queries/user.sql | 88 ++++++++++++++++++++++--- 12 files changed, 470 insertions(+), 49 deletions(-) diff --git a/db/queries/devices.sql b/db/queries/devices.sql index 2b512d8..0f4e133 100644 --- a/db/queries/devices.sql +++ b/db/queries/devices.sql @@ -8,10 +8,32 @@ INSERT INTO user_devices ( ) VALUES ( COALESCE($1, uuid_generate_v4()), $2, $3, $4, $5 ) -RETURNING *; +RETURNING + id, + user_id, + device_name, + device_type, + totp_secret, + is_2fa_enabled, + last_active, + created_at, + push_token, + is_active, + is_invalid_token; -- name: ListUserDevices :many -SELECT * +SELECT + id, + user_id, + device_name, + device_type, + totp_secret, + is_2fa_enabled, + last_active, + created_at, + push_token, + is_active, + is_invalid_token FROM user_devices WHERE user_id = $1 ORDER BY last_active DESC; @@ -35,7 +57,19 @@ AND user_id = $2 AND is_2fa_enabled = FALSE; -- name: Get_device_By_id :one -SELECT * from user_devices +SELECT + id, + user_id, + device_name, + device_type, + totp_secret, + is_2fa_enabled, + last_active, + created_at, + push_token, + is_active, + is_invalid_token +from user_devices WHERE id =$1; -- name: Count_User_Devices :one @@ -51,7 +85,18 @@ SET is_invalid_token = FALSE WHERE id = $1 AND user_id = $3 -RETURNING *; +RETURNING + id, + user_id, + device_name, + device_type, + totp_secret, + is_2fa_enabled, + last_active, + created_at, + push_token, + is_active, + is_invalid_token; -- name: ActivateDevice :exec UPDATE user_devices diff --git a/db/queries/eventParticipant.sql b/db/queries/eventParticipant.sql index 9203c83..7868009 100644 --- a/db/queries/eventParticipant.sql +++ b/db/queries/eventParticipant.sql @@ -2,7 +2,11 @@ -- Records when a user scans a QR code to join an event INSERT INTO event_participants (event_id, user_id) VALUES ($1, $2) -RETURNING *; +RETURNING + id, + event_id, + user_id, + joined_at; -- name: GetUserEvents :many -- Retrieves all events a specific user has successfully joined @@ -76,4 +80,4 @@ WHERE event_id = $1; SELECT EXISTS ( SELECT 1 FROM event_participants WHERE event_id = $1 AND user_id = $2 -); \ No newline at end of file +); diff --git a/db/queries/events.sql b/db/queries/events.sql index e7a7fd8..d5572a8 100644 --- a/db/queries/events.sql +++ b/db/queries/events.sql @@ -1,23 +1,67 @@ -- name: CreateEvent :one INSERT INTO events (name, event_code, event_date, status, created_by) VALUES ($1, $2, $3, $4, $5) -RETURNING *; +RETURNING + id, + name, + event_code, + event_date, + status, + created_by, + created_at, + archived_at; -- name: GetEventById :one -SELECT * FROM events +SELECT + id, + name, + event_code, + event_date, + status, + created_by, + created_at, + archived_at +FROM events WHERE id = $1; -- name: GetEventByCode :one -SELECT * FROM events +SELECT + id, + name, + event_code, + event_date, + status, + created_by, + created_at, + archived_at +FROM events WHERE event_code = $1; -- name: GetEventsByName :many -SELECT * FROM events +SELECT + id, + name, + event_code, + event_date, + status, + created_by, + created_at, + archived_at +FROM events WHERE name ILIKE '%' || $1 || '%' ORDER BY event_date DESC; -- name: ListEvents :many -SELECT * FROM events +SELECT + id, + name, + event_code, + event_date, + status, + created_by, + created_at, + archived_at +FROM events WHERE -- Filter by Status (Optional) (sqlc.narg('status')::event_status IS NULL OR status = sqlc.narg('status')) @@ -44,8 +88,16 @@ UPDATE events SET status = $2, archived_at = CASE WHEN $2 = 'archived'::event_status THEN NOW() ELSE archived_at END WHERE id = $1 -RETURNING *; +RETURNING + id, + name, + event_code, + event_date, + status, + created_by, + created_at, + archived_at; -- name: DeleteEvent :exec DELETE FROM events -WHERE id = $1; \ No newline at end of file +WHERE id = $1; diff --git a/db/queries/photo_faces.sql b/db/queries/photo_faces.sql index 3380700..0c3d148 100644 --- a/db/queries/photo_faces.sql +++ b/db/queries/photo_faces.sql @@ -10,7 +10,13 @@ INSERT INTO photo_faces ( ON CONFLICT (photo_id, face_index) DO UPDATE SET embedding = EXCLUDED.embedding, bbox = EXCLUDED.bbox -RETURNING *; +RETURNING + id, + photo_id, + face_index, + embedding, + bbox, + created_at; -- name: PhotoFacesPhotoExists :one SELECT 1 @@ -40,6 +46,7 @@ ORDER BY distance ASC LIMIT 1; -- name: PhotoFacesEnsureFaceMatch :one +-- Ensures one face match per photo by inserting a face_match only if none exist. WITH upserted_photo_face AS ( INSERT INTO photo_faces ( photo_id, diff --git a/db/queries/photos.sql b/db/queries/photos.sql index 34e7c34..cc38105 100644 --- a/db/queries/photos.sql +++ b/db/queries/photos.sql @@ -8,4 +8,13 @@ INSERT INTO photos ( ) VALUES ( $1, $2, $3, $4, $5 ) -RETURNING *; +RETURNING + id, + event_id, + uploaded_by, + storage_key, + taken_at, + day_number, + visibility, + status, + created_at; diff --git a/db/queries/session.sql b/db/queries/session.sql index b22911e..ac69946 100644 --- a/db/queries/session.sql +++ b/db/queries/session.sql @@ -19,17 +19,35 @@ RETURNING created_at; -- name: GetSessionByDevice :one -SELECT * +SELECT + id, + user_id, + device_id, + created_at, + last_active, + expires_at FROM user_sessions WHERE device_id = $1; -- name: GetSessionByID :one -SELECT * +SELECT + id, + user_id, + device_id, + created_at, + last_active, + expires_at FROM user_sessions WHERE id = $1; -- name: ListSessionsByUser :many -SELECT * +SELECT + id, + user_id, + device_id, + created_at, + last_active, + expires_at FROM user_sessions WHERE user_id = $1; diff --git a/db/queries/staff_drive_connections.sql b/db/queries/staff_drive_connections.sql index fe12528..c20b6e4 100644 --- a/db/queries/staff_drive_connections.sql +++ b/db/queries/staff_drive_connections.sql @@ -25,10 +25,36 @@ DO UPDATE SET connected_at = NOW(), revoked_at = NULL, updated_at = NOW() -RETURNING *; +RETURNING + id, + staff_user_id, + provider, + google_email, + google_account_id, + access_token, + refresh_token, + token_expires_at, + scopes, + connected_at, + revoked_at, + created_at, + updated_at; -- name: GetActiveStaffDriveConnectionByStaffUserID :one -SELECT * +SELECT + id, + staff_user_id, + provider, + google_email, + google_account_id, + access_token, + refresh_token, + token_expires_at, + scopes, + connected_at, + revoked_at, + created_at, + updated_at FROM staff_drive_connections WHERE staff_user_id = $1 AND provider = $2 diff --git a/db/queries/staff_notifications.sql b/db/queries/staff_notifications.sql index 74070b8..d07d52d 100644 --- a/db/queries/staff_notifications.sql +++ b/db/queries/staff_notifications.sql @@ -6,10 +6,22 @@ INSERT INTO staff_notifications ( ) VALUES ( $1, $2, $3 ) -RETURNING *; +RETURNING + id, + staff_user_id, + type, + payload, + read_at, + created_at; -- name: ListStaffNotificationsByStaffUserID :many -SELECT * +SELECT + id, + staff_user_id, + type, + payload, + read_at, + created_at FROM staff_notifications WHERE staff_user_id = $1 ORDER BY created_at DESC; @@ -20,4 +32,10 @@ SET read_at = NOW() WHERE id = $1 AND staff_user_id = $2 AND read_at IS NULL -RETURNING *; +RETURNING + id, + staff_user_id, + type, + payload, + read_at, + created_at; diff --git a/db/queries/stuff_user.sql b/db/queries/stuff_user.sql index 777109d..b0dbdf0 100644 --- a/db/queries/stuff_user.sql +++ b/db/queries/stuff_user.sql @@ -1,25 +1,55 @@ -- name: CreateAdmin :one INSERT INTO staff_users (email, password, role) VALUES ($1, $2, 'admin') -RETURNING *; +RETURNING + id, + email, + role, + created_at, + updated_at, + password; -- name: CreateMulti :one INSERT INTO staff_users (email, password, role) VALUES ($1, $2, $3) -RETURNING *; +RETURNING + id, + email, + role, + created_at, + updated_at, + password; -- name: GetStaffUserByID :one -SELECT * +SELECT + id, + email, + role, + created_at, + updated_at, + password FROM staff_users WHERE id = $1; -- name: GetStaffUserByEmail :one -SELECT * +SELECT + id, + email, + role, + created_at, + updated_at, + password FROM staff_users WHERE email = $1; -- name: ListStaffUsers :many -SELECT * +SELECT + id, + email, + role, + created_at, + updated_at, + password FROM staff_users WHERE (COALESCE($1, '') = '' OR email ILIKE '%' || $1 || '%') AND (COALESCE($2, '') = '' OR role::text = $2) @@ -35,9 +65,21 @@ LIMIT $5 OFFSET $6; UPDATE staff_users SET email = $2, role = $3, updated_at = NOW() WHERE id = $1 -RETURNING *; +RETURNING + id, + email, + role, + created_at, + updated_at, + password; -- name: DeleteStaffUser :one DELETE FROM staff_users WHERE id = $1 -RETURNING *; +RETURNING + id, + email, + role, + created_at, + updated_at, + password; diff --git a/db/queries/upload_request_photos.sql b/db/queries/upload_request_photos.sql index bab46b5..1787bc4 100644 --- a/db/queries/upload_request_photos.sql +++ b/db/queries/upload_request_photos.sql @@ -13,22 +13,74 @@ INSERT INTO upload_request_photos ( ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 ) -RETURNING *; +RETURNING + id, + upload_request_id, + drive_file_id, + file_name, + mime_type, + size_bytes, + staging_storage_key, + final_storage_key, + taken_at, + day_number, + visibility, + status, + created_at; -- name: ListUploadRequestPhotosByUploadRequestID :many -SELECT * +SELECT + id, + upload_request_id, + drive_file_id, + file_name, + mime_type, + size_bytes, + staging_storage_key, + final_storage_key, + taken_at, + day_number, + visibility, + status, + created_at FROM upload_request_photos WHERE upload_request_id = $1 ORDER BY created_at ASC; -- name: ListUploadRequestPhotosByUploadRequestIDs :many -SELECT * +SELECT + id, + upload_request_id, + drive_file_id, + file_name, + mime_type, + size_bytes, + staging_storage_key, + final_storage_key, + taken_at, + day_number, + visibility, + status, + created_at FROM upload_request_photos WHERE upload_request_id = ANY($1::uuid[]) ORDER BY created_at ASC; -- name: GetUploadRequestPhotoByID :one -SELECT * +SELECT + id, + upload_request_id, + drive_file_id, + file_name, + mime_type, + size_bytes, + staging_storage_key, + final_storage_key, + taken_at, + day_number, + visibility, + status, + created_at FROM upload_request_photos WHERE id = $1; @@ -37,13 +89,39 @@ UPDATE upload_request_photos SET status = $2, final_storage_key = $3 WHERE id = $1 -RETURNING *; +RETURNING + id, + upload_request_id, + drive_file_id, + file_name, + mime_type, + size_bytes, + staging_storage_key, + final_storage_key, + taken_at, + day_number, + visibility, + status, + created_at; -- name: UpdateUploadRequestPhotoStatusByUploadRequestID :many UPDATE upload_request_photos SET status = $2 WHERE upload_request_id = $1 -RETURNING *; +RETURNING + id, + upload_request_id, + drive_file_id, + file_name, + mime_type, + size_bytes, + staging_storage_key, + final_storage_key, + taken_at, + day_number, + visibility, + status, + created_at; -- name: DeleteUploadRequestPhotosByUploadRequestID :exec DELETE FROM upload_request_photos diff --git a/db/queries/upload_requests.sql b/db/queries/upload_requests.sql index c95fcad..c9f1362 100644 --- a/db/queries/upload_requests.sql +++ b/db/queries/upload_requests.sql @@ -7,15 +7,45 @@ INSERT INTO upload_requests ( ) VALUES ( $1, $2, $3, $4 ) -RETURNING *; +RETURNING + id, + event_id, + drive_file_id, + requested_by, + approved_by, + status, + created_at, + approved_at, + photo_count, + rejection_reason; -- name: GetUploadRequestByID :one -SELECT * +SELECT + id, + event_id, + drive_file_id, + requested_by, + approved_by, + status, + created_at, + approved_at, + photo_count, + rejection_reason FROM upload_requests WHERE id = $1; -- name: ListUploadRequests :many -SELECT * +SELECT + id, + event_id, + drive_file_id, + requested_by, + approved_by, + status, + created_at, + approved_at, + photo_count, + rejection_reason FROM upload_requests WHERE requested_by = $1::uuid AND status = COALESCE(sqlc.narg('p2')::upload_request_status, status) @@ -29,7 +59,17 @@ SET status = 'approved', rejection_reason = NULL WHERE id = $1 AND status = 'pending' -RETURNING *; +RETURNING + id, + event_id, + drive_file_id, + requested_by, + approved_by, + status, + created_at, + approved_at, + photo_count, + rejection_reason; -- name: RejectUploadRequest :one UPDATE upload_requests @@ -39,4 +79,14 @@ SET status = 'rejected', rejection_reason = $3 WHERE id = $1 AND status = 'pending' -RETURNING *; +RETURNING + id, + event_id, + drive_file_id, + requested_by, + approved_by, + status, + created_at, + approved_at, + photo_count, + rejection_reason; diff --git a/db/queries/user.sql b/db/queries/user.sql index f940d45..aad962b 100644 --- a/db/queries/user.sql +++ b/db/queries/user.sql @@ -1,15 +1,42 @@ -- name: CreateUser :one INSERT INTO users (email, hashed_password) VALUES ($1, $2) -RETURNING *; +RETURNING + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked; -- name: GetUserByID :one -SELECT * +SELECT + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked FROM users WHERE id = $1; -- name: GetUserByEmail :one -SELECT * +SELECT + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked FROM users WHERE email = $1; @@ -18,7 +45,16 @@ UPDATE users SET hashed_password = $1, updated_at = NOW() WHERE id = $2 -RETURNING *; +RETURNING + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked; -- name: UpdateUser :one UPDATE users @@ -27,21 +63,48 @@ SET email = COALESCE($1, email), blocked = COALESCE($3, blocked), updated_at = NOW() WHERE id = $4 -RETURNING *; +RETURNING + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked; -- name: SetUserBlocked :one UPDATE users SET blocked = $1, updated_at = NOW() WHERE id = $2 -RETURNING *; +RETURNING + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked; -- name: DeleteUser :exec DELETE FROM users WHERE id = $1; -- name: ListUsers :many -SELECT * +SELECT + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2; @@ -51,7 +114,16 @@ UPDATE users SET face_embedding = $1::vector, updated_at = NOW() WHERE id = $2 -RETURNING *; +RETURNING + id, + email, + hashed_password, + created_at, + updated_at, + display_name, + face_embedding, + deleted_at, + blocked; -- name: FindClosestUserByEmbedding :one SELECT id, From f5fb65c164f44dd077bca1b77604185eb09f4749 Mon Sep 17 00:00:00 2001 From: bouhamza abderrahmane Date: Thu, 26 Mar 2026 03:33:36 +0100 Subject: [PATCH 06/10] refactor(auth): standardize naming in security/auth flows --- app/core/securite.py | 26 ++++++-------------------- app/router/mobile/enrollement.py | 2 +- app/service/users.py | 15 ++++++++------- 3 files changed, 15 insertions(+), 28 deletions(-) diff --git a/app/core/securite.py b/app/core/securite.py index af9db00..3b00c4a 100644 --- a/app/core/securite.py +++ b/app/core/securite.py @@ -30,7 +30,7 @@ def verify_password(password: str, hashed: str) -> bool: return result -def Get_expiry_time() -> int: +def get_expiry_time() -> int: if settings.environment == "dev": expiry = 60 * 60 * 24 * 7 else: @@ -38,11 +38,11 @@ def Get_expiry_time() -> int: return expiry -def create_acces_mobile_token(session_id: str) -> str: +def create_access_mobile_token(session_id: str) -> str: payload: dict[str, Any] = { "session_id": session_id, "exp": int( - (datetime.now(timezone.utc) + timedelta(seconds=Get_expiry_time())).timestamp() + (datetime.now(timezone.utc) + timedelta(seconds=get_expiry_time())).timestamp() ), } return jwt.encode(payload, key=settings.jwt_secret, algorithm=settings.jwt_algorithm) @@ -62,7 +62,7 @@ def create_refresh_mobile_token(session_id: str) -> str: payload: dict[str, Any] = { "session_id": session_id, "exp": int( - (datetime.now(timezone.utc) + timedelta(seconds=Get_expiry_time() * 4)).timestamp() + (datetime.now(timezone.utc) + timedelta(seconds=get_expiry_time() * 4)).timestamp() ), } return jwt.encode(payload, key=settings.jwt_secret, algorithm=settings.jwt_algorithm) @@ -91,19 +91,6 @@ def verify_totp_token_with_window(secret: str, token: str, valid_window: int = 8 totp = pyotp.TOTP(secret) return totp.verify(token, valid_window=valid_window) - -def generate_Acces_token_stuff(user_id: str, role: str) -> str: - payload: dict[str, Any] = { - "user_id": user_id, - "role": role, - "exp": int( - (datetime.now(timezone.utc) + timedelta(seconds=Get_expiry_time())).timestamp() - ), - } - return jwt.encode(payload, key=settings.jwt_secret, algorithm=settings.jwt_algorithm) - - - # class EmbeddingCrypto: # _key: bytes = base64.b64decode(settings.FACE_ENCRYPTION_KEY) # _aes: AESGCM = AESGCM(_key) @@ -126,7 +113,6 @@ def generate_Acces_token_stuff(user_id: str, role: str) -> str: # return np.frombuffer(data, dtype=np.float32) - class StaffJWTPayload(BaseModel): sub: str role: str @@ -147,7 +133,7 @@ def create_access_staff_token(staff_id: str, role: str) -> str: role=role, type="access", exp=int( - (datetime.now(timezone.utc) + timedelta(seconds=Get_expiry_time())).timestamp() + (datetime.now(timezone.utc) + timedelta(seconds=get_expiry_time())).timestamp() ), ) return jwt.encode( @@ -166,7 +152,7 @@ def create_refresh_staff_token(staff_id: str, role: str) -> str: role=role, type="refresh", exp=int( - (datetime.now(timezone.utc) + timedelta(seconds=Get_expiry_time() * 4)).timestamp() + (datetime.now(timezone.utc) + timedelta(seconds=get_expiry_time() * 4)).timestamp() ), ) return jwt.encode( diff --git a/app/router/mobile/enrollement.py b/app/router/mobile/enrollement.py index 1a5f652..7994da9 100644 --- a/app/router/mobile/enrollement.py +++ b/app/router/mobile/enrollement.py @@ -69,7 +69,7 @@ async def enroll_face( image_payloads.append(payload) - return await container.auth_service.add_embbed_user( + return await container.auth_service.add_embedded_user( user.user_id, image_payloads, ) diff --git a/app/service/users.py b/app/service/users.py index f5329e8..992da81 100644 --- a/app/service/users.py +++ b/app/service/users.py @@ -5,10 +5,10 @@ from app.core.securite import ( hash_password, verify_password, - create_acces_mobile_token, + create_access_mobile_token, create_refresh_mobile_token, decode_refresh_mobile_token, - Get_expiry_time, + get_expiry_time, ) from app.core import constant from app.core.config import settings @@ -26,6 +26,7 @@ class AuthService: + """Authentication and user account service for mobile flows.""" user_querier: user_queries.AsyncQuerier device_querier: device_queries.AsyncQuerier session_querier: session_queries.AsyncQuerier @@ -135,9 +136,9 @@ async def mobile_register_login( session_key, str(session.id), expire=AuthService.REDIS_SESSION_TTL ) - access_token = create_acces_mobile_token(str(session.id)) + access_token = create_access_mobile_token(str(session.id)) refresh_token = create_refresh_mobile_token(str(session.id)) - expiry = Get_expiry_time() + expiry = get_expiry_time() logger.info("created session %s for user %s", session.id, user_id) return MobileAuthResponse( @@ -172,9 +173,9 @@ async def refresh_token( if user.blocked: raise AppException.forbidden("User is blocked") - new_access_token = create_acces_mobile_token(session_id) + new_access_token = create_access_mobile_token(session_id) new_refresh_token = create_refresh_mobile_token(session_id) - expiry = Get_expiry_time() + expiry = get_expiry_time() return MobileAuthResponse( access_token=new_access_token, @@ -193,7 +194,7 @@ async def logout( await redis.delete(session_key) return {"message": "Logged out successfully"} - async def add_embbed_user( + async def add_embedded_user( self, user_id: uuid.UUID, image_payloads: list[FaceImagePayload], From 065afc5e10c100400a80555e7386691ebe5697d5 Mon Sep 17 00:00:00 2001 From: bouhamza abderrahmane Date: Thu, 26 Mar 2026 03:54:01 +0100 Subject: [PATCH 07/10] refactor(face): streamline matching flow and logging --- app/service/face_embedding.py | 13 ++++++++++--- app/service/face_match.py | 8 +++++--- app/worker/single_face_match/main.py | 28 ++++++---------------------- 3 files changed, 21 insertions(+), 28 deletions(-) diff --git a/app/service/face_embedding.py b/app/service/face_embedding.py index e5333a0..e2e401c 100644 --- a/app/service/face_embedding.py +++ b/app/service/face_embedding.py @@ -9,6 +9,7 @@ from insightface.app import FaceAnalysis # type: ignore[import-untyped] from app.core.config import settings from app.core.exceptions import AppException +from app.core.logger import logger BBox = tuple[int, int, int, int] @@ -36,6 +37,7 @@ class DetectedFace: class FaceEmbedding: + """Thin wrapper around InsightFace to load, initialize, and embed faces.""" def __init__( self, model_name: str | None = None, @@ -69,7 +71,7 @@ def load_model(self) -> None: name=self.model_name, providers=list(self.providers), ) - print("[FaceEmbedding] model loaded!") + logger.info("FaceEmbedding model loaded") def init_model(self) -> None: if self.model is None: @@ -80,7 +82,7 @@ def init_model(self) -> None: self.model.prepare(ctx_id=self.ctx_id, det_size=self.det_size) # type: ignore self._initialized = True - print("[FaceEmbedding] model initialized") + logger.info("FaceEmbedding model initialized") def prepare(self) -> None: self.load_model() @@ -126,6 +128,7 @@ def embed(self, image: np.ndarray, bboxes: Sequence[BBox]) -> list[float]: class FaceEmbeddingService: + """Service layer for face embedding workflows.""" def __init__(self, face_embedding: FaceEmbedding | None = None) -> None: self.face_embedding = face_embedding or FaceEmbedding() self.face_embedding.prepare() @@ -198,7 +201,11 @@ async def compute_event_embedding( ] except Exception as e: - print(f"[FaceEmbeddingService] Skipping {payload['filename']}: {e}") + logger.warning( + "Skipping %s due to face embedding error: %s", + payload["filename"], + e, + ) results[payload["filename"]] = [] return results diff --git a/app/service/face_match.py b/app/service/face_match.py index e463ec9..079a4e5 100644 --- a/app/service/face_match.py +++ b/app/service/face_match.py @@ -14,6 +14,8 @@ class SingleFaceMatchService: + """Coordinates face-match persistence and user notifications for a single photo.""" + def __init__( self, *, @@ -45,7 +47,7 @@ async def process_detected_face( try: async with self.conn.begin(): - if not await self.Check_photo_exists(job.photo_id): + if not await self._photo_exists(job.photo_id): logger.warning("Photo not found: %s", job.photo_id) return @@ -89,7 +91,7 @@ async def process_detected_face( logger.error("Out of memory while matching photo %s", job.photo_id) return - if created_face_match_id : + if created_face_match_id: await self.user_notification_service.create_notification( user_id=matched_user.user_id, type="face_match", @@ -98,7 +100,7 @@ async def process_detected_face( }, ) - async def Check_photo_exists(self, photo_id: UUID) -> bool: + async def _photo_exists(self, photo_id: UUID) -> bool: row = await self.photo_face_querier.photo_faces_photo_exists(id=photo_id) return row is not None diff --git a/app/worker/single_face_match/main.py b/app/worker/single_face_match/main.py index f505e18..d5cc284 100644 --- a/app/worker/single_face_match/main.py +++ b/app/worker/single_face_match/main.py @@ -67,28 +67,12 @@ async def _load_payload(self, job: SingleFaceMatchJob) -> FaceImagePayload: bucket_name, object_name = self._parse_minio_ref(job.image_ref) bucket = Bucket(bucket_name, "") - last_exc: Exception | None = None - for attempt in range(1, settings.MINIO_RETRY_ATTEMPTS + 1): - try: - data, filename, content_type = await bucket.get(object_name) - return FaceImagePayload( - filename=filename, - content_type=content_type, - bytes=data, - ) - except Exception as exc: - last_exc = exc - logger.warning( - "MinIO fetch failed for %s (attempt %s/%s): %s", - object_name, - attempt, - settings.MINIO_RETRY_ATTEMPTS, - exc, - ) - if attempt < settings.MINIO_RETRY_ATTEMPTS: - await asyncio.sleep(settings.MINIO_RETRY_BASE_SECONDS * attempt) - assert last_exc is not None - raise last_exc + data, filename, content_type = await bucket.get(object_name) + return FaceImagePayload( + filename=filename, + content_type=content_type, + bytes=data, + ) @staticmethod def _parse_minio_ref(image_ref: str) -> tuple[str, str]: From a785d036726d2865125e677293c6aa4bf54b28da Mon Sep 17 00:00:00 2001 From: bouhamza abderrahmane Date: Thu, 26 Mar 2026 04:13:36 +0100 Subject: [PATCH 08/10] refactor(config): move hardcoded values to settings --- app/core/config.py | 18 +++- app/core/securite.py | 4 +- app/infra/minio.py | 161 ++++++++++++++++++++-------- app/main.py | 19 ++-- app/router/web/auth.py | 3 +- app/service/session.py | 10 +- app/service/staff_user.py | 34 +++--- app/worker/notification/main.py | 5 +- app/worker/notification/settings.py | 1 + 9 files changed, 181 insertions(+), 74 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 103fe2a..8885ca2 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -4,8 +4,10 @@ class Settings(BaseSettings): app_name: str = "multAI" + API_VERSION: str = "1.0.0" environment: str = "dev" debug: bool = True + CORS_ALLOW_ORIGINS: list[str] = ["*"] # Redis REDIS_PORT: int @@ -28,6 +30,9 @@ class Settings(BaseSettings): MINIO_HOST: str MINIO_RETRY_ATTEMPTS: int = 3 MINIO_RETRY_BASE_SECONDS: float = 0.5 + MINIO_INIT_MAX_RETRIES: int = 5 + MINIO_INIT_RETRY_BASE_SECONDS: float = 2.0 + MINIO_PART_SIZE_BYTES: int = 10 * 1024 * 1024 # PostgreSQL POSTGRES_USER: str @@ -40,6 +45,7 @@ class Settings(BaseSettings): MOBILE_SESSION_LIMIT: int = 3 MOBILE_SESSION_TTL_SECONDS: int = 180 MOBILE_SESSION_DAYS: int = 7 + SESSION_REDIS_TTL_SECONDS: int = 60 * 60 * 5 # Admin list defaults ADMIN_USERS_DEFAULT_LIMIT: int = 20 ADMIN_USERS_MAX_LIMIT: int = 100 @@ -48,6 +54,9 @@ class Settings(BaseSettings): jwt_algorithm: str = "HS256" encryption_key: str totp_issuer: str = "multAI" + ACCESS_TOKEN_EXPIRY_DEV_SECONDS: int = 60 * 60 * 24 * 7 + ACCESS_TOKEN_EXPIRY_PROD_SECONDS: int = 60 * 60 * 24 + STAFF_AUTH_COOKIE_MAX_AGE_SECONDS: int = 60 * 60 * 24 * 7 # Face embedding model FACE_EMBEDDING_MODEL_NAME: str = "buffalo_l" @@ -82,7 +91,14 @@ def _parse_debug(cls, value): # type: ignore[no-untyped-def] if lowered in {"true", "1", "yes"}: return True return value - + + @field_validator("CORS_ALLOW_ORIGINS", mode="before") + @classmethod + def _parse_cors_allow_origins(cls, value): # type: ignore[no-untyped-def] + if isinstance(value, str): + return [item.strip() for item in value.split(",") if item.strip()] + return value + settings = Settings() # type: ignore diff --git a/app/core/securite.py b/app/core/securite.py index 3b00c4a..9f2a10c 100644 --- a/app/core/securite.py +++ b/app/core/securite.py @@ -32,9 +32,9 @@ def verify_password(password: str, hashed: str) -> bool: def get_expiry_time() -> int: if settings.environment == "dev": - expiry = 60 * 60 * 24 * 7 + expiry = settings.ACCESS_TOKEN_EXPIRY_DEV_SECONDS else: - expiry = 60 * 60 * 24 + expiry = settings.ACCESS_TOKEN_EXPIRY_PROD_SECONDS return expiry diff --git a/app/infra/minio.py b/app/infra/minio.py index e6249da..848ffcd 100644 --- a/app/infra/minio.py +++ b/app/infra/minio.py @@ -1,12 +1,16 @@ +import asyncio import io import random import string import uuid +from typing import Awaitable, Callable, TypeVar from fastapi import UploadFile from miniopy_async.commonconfig import CopySource from miniopy_async.error import S3Error from miniopy_async.api import Minio +from app.core.config import settings +from app.core.logger import logger from app.core.utils import check_extension from app.core.exceptions import AppException from app.core.constant import ( @@ -22,9 +26,41 @@ DOCUMENTS_BUCKET_NAME = CORE_DOCUMENTS_BUCKET_NAME WA_SIM_BUCKET_NAME = CORE_WA_SIM_BUCKET_NAME +T = TypeVar("T") + + +async def _with_retries(op_name: str, func: Callable[[], Awaitable[T]]) -> T: + attempts = max(1, settings.MINIO_RETRY_ATTEMPTS) + base_delay = settings.MINIO_RETRY_BASE_SECONDS + last_exc: Exception | None = None + + for attempt in range(1, attempts + 1): + try: + return await func() + except S3Error as exc: + if exc.code in {"NoSuchKey", "NoSuchBucket"}: + raise + last_exc = exc + except Exception as exc: + last_exc = exc + + logger.warning( + "MinIO %s failed (attempt %s/%s): %s", + op_name, + attempt, + attempts, + last_exc, + ) + if attempt < attempts: + await asyncio.sleep(base_delay * attempt) + + assert last_exc is not None + raise last_exc + async def init_minio_client( minio_host: str, minio_port: int, minio_root_user: str, minio_root_password: str ) -> None: + """Initialize MinIO client and ensure buckets exist.""" Bucket.client = Minio( f"{minio_host}:{minio_port}", access_key=minio_root_user, @@ -33,10 +69,15 @@ async def init_minio_client( ) for bucket_name in [IMAGES_BUCKET_NAME, DOCUMENTS_BUCKET_NAME, WA_SIM_BUCKET_NAME]: - if not await Bucket.client.bucket_exists(bucket_name): - await Bucket.client.make_bucket(bucket_name) + async def _ensure_bucket() -> None: + if not await Bucket.client.bucket_exists(bucket_name): + await Bucket.client.make_bucket(bucket_name) + + await _with_retries("ensure_bucket", _ensure_bucket) + class Bucket: + """Bucket helper with retry-aware operations.""" bucket_name: str file_prefix: str client: Minio @@ -60,44 +101,72 @@ async def put(self, file: UploadFile, object_name: str | None = None) -> str: if file.filename is None: file.filename = object_name - await self.client.put_object( - bucket_name=self.bucket_name, - object_name=self._object_path(object_name), - data=file.file, - length=-1, - part_size=10 * 1024 * 1024, - content_type=file.content_type, - metadata={ - "filename": file.filename, - }, - ) - return object_name + attempts = max(1, settings.MINIO_RETRY_ATTEMPTS) + base_delay = settings.MINIO_RETRY_BASE_SECONDS + last_exc: Exception | None = None + + for attempt in range(1, attempts + 1): + try: + if hasattr(file.file, "seek"): + file.file.seek(0) + await self.client.put_object( + bucket_name=self.bucket_name, + object_name=self._object_path(object_name), + data=file.file, + length=-1, + part_size=settings.MINIO_PART_SIZE_BYTES, + content_type=file.content_type, + metadata={ + "filename": file.filename, + }, + ) + return object_name + except Exception as exc: + last_exc = exc + logger.warning( + "MinIO put failed for %s (attempt %s/%s): %s", + object_name, + attempt, + attempts, + exc, + ) + if attempt < attempts: + await asyncio.sleep(base_delay * attempt) + + assert last_exc is not None + raise last_exc async def get(self, object_name: str) -> tuple[bytes, str, str]: try: - res = await self.client.get_object( - bucket_name=self.bucket_name, - object_name=self._object_path(object_name), + res = await _with_retries( + "get_object", + lambda: self.client.get_object( + bucket_name=self.bucket_name, + object_name=self._object_path(object_name), + ), ) except S3Error as e: if e.code == "NoSuchKey": raise AppException.not_found("File not found") else: raise e - - data = await res.read() - content_type = ( - res.content_type if res.content_type else DEFAULT_CONTENT_TYPE - ) - filename = res.headers.get("x-amz-meta-filename", f"{object_name}") - - res.close() - return (data, filename, content_type) + try: + data = await res.read() + content_type = ( + res.content_type if res.content_type else DEFAULT_CONTENT_TYPE + ) + filename = res.headers.get("x-amz-meta-filename", f"{object_name}") + return (data, filename, content_type) + finally: + res.close() async def delete(self, object_name: str) -> None: - await self.client.remove_object( - bucket_name=self.bucket_name, - object_name=self._object_path(object_name), + await _with_retries( + "remove_object", + lambda: self.client.remove_object( + bucket_name=self.bucket_name, + object_name=self._object_path(object_name), + ), ) async def put_bytes( @@ -108,24 +177,30 @@ async def put_bytes( content_type: str, filename: str | None = None, ) -> str: - await self.client.put_object( - bucket_name=self.bucket_name, - object_name=self._object_path(object_name), - data=io.BytesIO(data), - length=len(data), - part_size=10 * 1024 * 1024, - content_type=content_type, - metadata={"filename": filename or object_name}, + await _with_retries( + "put_object_bytes", + lambda: self.client.put_object( + bucket_name=self.bucket_name, + object_name=self._object_path(object_name), + data=io.BytesIO(data), + length=len(data), + part_size=settings.MINIO_PART_SIZE_BYTES, + content_type=content_type, + metadata={"filename": filename or object_name}, + ), ) return object_name async def copy(self, *, source_object_name: str, target_object_name: str) -> str: - await self.client.copy_object( - bucket_name=self.bucket_name, - object_name=self._object_path(target_object_name), - source=CopySource( - self.bucket_name, - self._object_path(source_object_name), + await _with_retries( + "copy_object", + lambda: self.client.copy_object( + bucket_name=self.bucket_name, + object_name=self._object_path(target_object_name), + source=CopySource( + self.bucket_name, + self._object_path(source_object_name), + ), ), ) return target_object_name diff --git a/app/main.py b/app/main.py index 26c4965..4660ec7 100644 --- a/app/main.py +++ b/app/main.py @@ -46,12 +46,10 @@ async def dispatch( -MAX_RETRIES = 5 -RETRY_DELAY = 2 # seconds @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: - for attempt in range(1, MAX_RETRIES + 1): + for attempt in range(1, settings.MINIO_INIT_MAX_RETRIES + 1): try: await init_minio_client( minio_host=settings.MINIO_HOST, @@ -61,10 +59,15 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: ) break except Exception as e: - print(f"[MINIO] Attempt {attempt} failed: {e}") - if attempt == MAX_RETRIES: + logger.warning( + "MinIO init attempt %s/%s failed: %s", + attempt, + settings.MINIO_INIT_MAX_RETRIES, + e, + ) + if attempt == settings.MINIO_INIT_MAX_RETRIES: raise RuntimeError("Cannot connect to MinIO after multiple attempts") from e - await asyncio.sleep(RETRY_DELAY) + await asyncio.sleep(settings.MINIO_INIT_RETRY_BASE_SECONDS * attempt) RedisClient( host=settings.REDIS_HOST, @@ -85,7 +88,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: app = FastAPI( title="multAI API", description="Mobile and Web API for multAI", - version="1.0.0", + version=settings.API_VERSION, lifespan=lifespan, ) @@ -93,7 +96,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=settings.CORS_ALLOW_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], diff --git a/app/router/web/auth.py b/app/router/web/auth.py index c167440..d36d053 100644 --- a/app/router/web/auth.py +++ b/app/router/web/auth.py @@ -2,6 +2,7 @@ from app.container import Container, get_container from fastapi import Response +from app.core.config import settings from app.deps.cookie_auth import get_current_staff_user from app.schema.request.web.auth import WebAuthRequest from app.schema.response.web.auth import WebAuthResponse @@ -26,7 +27,7 @@ async def admin_login( httponly=True, secure=True, samesite="strict", - max_age=60 * 60 * 24 * 7, + max_age=settings.STAFF_AUTH_COOKIE_MAX_AGE_SECONDS, ) return authResponse diff --git a/app/service/session.py b/app/service/session.py index e441fc9..297fca8 100644 --- a/app/service/session.py +++ b/app/service/session.py @@ -6,6 +6,7 @@ from datetime import datetime,timedelta,timezone from app.infra.redis import RedisClient from app.core.constant import RedisKey +from app.core.config import settings from db.generated.session import UpsertSessionRow class SessionRedis(BaseModel): @@ -31,7 +32,8 @@ async def create_session(user_id:uuid.UUID,device_id:uuid.UUID)->UpsertSessionRo session = await SessionService.session_querier.upsert_session( user_id=user_id, device_id=device_id, - expires_at=datetime.now(timezone.utc) + timedelta(days=7), + expires_at=datetime.now(timezone.utc) + + timedelta(days=settings.MOBILE_SESSION_DAYS), ) if session is None : raise AppException.internal_error("session creation failed ") @@ -45,7 +47,7 @@ async def create_session(user_id:uuid.UUID,device_id:uuid.UUID)->UpsertSessionRo last_active=session.last_active, expires_at=session.expires_at, ).model_dump_json(), - expire=60*60*5, + expire=settings.SESSION_REDIS_TTL_SECONDS, nx=True ) if not result: @@ -96,7 +98,7 @@ async def check_session( last_active=session_info.last_active, expires_at=session_info.expires_at, ).model_dump_json(), - expire=60 * 60 * 5, + expire=settings.SESSION_REDIS_TTL_SECONDS, nx=False, ) @@ -116,7 +118,7 @@ async def check_session( last_active=session.last_active, expires_at=session.expires_at, ).model_dump_json(), - expire=60 * 60 * 5, + expire=settings.SESSION_REDIS_TTL_SECONDS, nx=True, ) diff --git a/app/service/staff_user.py b/app/service/staff_user.py index 6241818..bc74921 100644 --- a/app/service/staff_user.py +++ b/app/service/staff_user.py @@ -1,12 +1,13 @@ from sqlalchemy.exc import SQLAlchemyError +from fastapi import HTTPException from app.core.logger import logger from typing import Literal, Optional import uuid from app.core.exceptions import AppException, DBException, DBExceptionImpl -from app.core.securite import create_access_staff_token, hash_password, verify_password +from app.core.securite import create_access_staff_token, hash_password, verify_password from app.schema.response.web.auth import WebAuthResponse from db.generated import stuff_user as staff_queries from db.generated.stuff_user import ListStaffUsersParams @@ -14,6 +15,7 @@ class StaffUserService: + """Service layer for staff user management and authentication.""" staff_user_querier: staff_queries.AsyncQuerier def init(self, staff_user_querier: staff_queries.AsyncQuerier) -> None: @@ -45,7 +47,6 @@ async def create_staff_user( logger.error("Failed to create staff user: %s", exc) raise DBException.handle(exc) - async def update_staff_user( self, *, id: uuid.UUID, email: Optional[str], role: StaffRole ) -> StaffUser: @@ -56,6 +57,11 @@ async def update_staff_user( if user is None: raise AppException.not_found("Staff user not found") return user + except HTTPException: + raise + except SQLAlchemyError as exc: + logger.error("Database error updating staff user: %s", exc) + raise DBExceptionImpl.handle(exc) except Exception as exc: logger.error("Failed to update staff user: %s", exc) raise DBException.handle(exc) @@ -66,7 +72,13 @@ async def delete_staff_user(self, *, id: uuid.UUID) -> StaffUser: if user is None: raise AppException.not_found("Staff user not found") return user + except HTTPException: + raise + except SQLAlchemyError as exc: + logger.error("Database error deleting staff user: %s", exc) + raise DBExceptionImpl.handle(exc) except Exception as exc: + logger.error("Failed to delete staff user: %s", exc) raise DBException.handle(exc) async def list_staff_users( @@ -98,18 +110,21 @@ async def list_staff_users( async for user in self.staff_user_querier.list_staff_users(params): result.append(user) return result + except SQLAlchemyError as exc: + logger.error("Database error listing staff users: %s", exc) + raise DBExceptionImpl.handle(exc) except Exception as exc: logger.error("Failed to list staff users: %s", exc) raise DBException.handle(exc) - async def admin_login( self, email: str, password: str, ) -> WebAuthResponse: - print("hello") - staff: StaffUser | None = await self.staff_user_querier.get_staff_user_by_email(email=email) + staff: StaffUser | None = await self.staff_user_querier.get_staff_user_by_email( + email=email + ) if staff is None or not verify_password(password, staff.password): logger.info("admin login failed for email %s", email) raise AppException.unauthorized("Invalid email or password") @@ -125,12 +140,3 @@ async def admin_login( user_id=staff.id, role=staff.role, ) - - async def Get_stuff_user( - self, - stuff_id:uuid.UUID - )->StaffUser: - stuff:StaffUser|None = await self.staff_user_querier.get_staff_user_by_id(id=stuff_id) - if stuff is None: - raise AppException.not_found("user not found ") - return stuff diff --git a/app/worker/notification/main.py b/app/worker/notification/main.py index 3f4711d..8ea6136 100644 --- a/app/worker/notification/main.py +++ b/app/worker/notification/main.py @@ -74,7 +74,10 @@ async def retry( if not notification.tokens: return - delay = min(NotifSetting.BASE_RETRY_DELAY * (2 ** attempts), 60) + delay = min( + NotifSetting.BASE_RETRY_DELAY * (2 ** attempts), + NotifSetting.MAX_RETRY_DELAY_SECONDS, + ) await asyncio.sleep(delay) await queue.enqueue_notification(notification, attempts=attempts) diff --git a/app/worker/notification/settings.py b/app/worker/notification/settings.py index 0c25f32..9d38bbe 100644 --- a/app/worker/notification/settings.py +++ b/app/worker/notification/settings.py @@ -21,6 +21,7 @@ class NotificationWorkerSettings(BaseSettings): firebase_credentials_path: str | None = Field(None) MAX_SEND_ATTEMPTS = 5 BASE_RETRY_DELAY = 2 + MAX_RETRY_DELAY_SECONDS = 60 TTL_SECONDS = 30 * 24 * 3600 CONCURRENCY = 10 RATE_LIMIT = 50 From 702b58cb2798dd064c0f9970b4b6ec18f870ce38 Mon Sep 17 00:00:00 2001 From: bouhamza abderrahmane Date: Thu, 26 Mar 2026 11:45:50 +0100 Subject: [PATCH 09/10] feat: update app logic and database --- app/core/securite.py | 6 ++-- app/deps/cookie_auth.py | 4 --- app/schema/response/web/auth.py | 7 ++-- app/service/upload_requests.py | 46 +++++++++++++-------------- app/service/users.py | 7 ++-- app/worker/audit/__init__.py | 6 ++-- app/worker/notification/settings.py | 23 +++++++------- db/generated/photo_faces.py | 1 + db/queries/photo_faces.sql | 1 + migrations/sql/up/add-audit-table.sql | 21 +++++++----- 10 files changed, 59 insertions(+), 63 deletions(-) diff --git a/app/core/securite.py b/app/core/securite.py index 9f2a10c..756c264 100644 --- a/app/core/securite.py +++ b/app/core/securite.py @@ -2,7 +2,7 @@ from typing import Any, Literal import jwt from passlib.context import CryptContext -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict import pyotp from app.core.config import settings from app.core.exceptions import AppException @@ -119,9 +119,7 @@ class StaffJWTPayload(BaseModel): type: Literal["access", "refresh"] exp: int - class Config: - frozen = True - # immutable class + model_config = ConfigDict(frozen=True) def create_access_staff_token(staff_id: str, role: str) -> str: diff --git a/app/deps/cookie_auth.py b/app/deps/cookie_auth.py index 749870b..bf814d5 100644 --- a/app/deps/cookie_auth.py +++ b/app/deps/cookie_auth.py @@ -7,13 +7,9 @@ from db.generated.models import StaffRole, StaffUser from app.core.securite import decode_staff_token - - def _role_value(role: object) -> str: return getattr(role, "value", str(role)) - - async def get_current_staff_user( container: Annotated[Container, Depends(get_container)], token: Annotated[str | None, Cookie(alias="access_token")] = None, diff --git a/app/schema/response/web/auth.py b/app/schema/response/web/auth.py index f1fefed..5281986 100644 --- a/app/schema/response/web/auth.py +++ b/app/schema/response/web/auth.py @@ -1,15 +1,14 @@ import uuid -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class WebAuthResponse(BaseModel): + model_config = ConfigDict(frozen=True) access_token: str user_id: uuid.UUID role: str - - class Config: - from_attributes = True + model_config = ConfigDict(frozen=True, from_attributes=True) diff --git a/app/service/upload_requests.py b/app/service/upload_requests.py index b42eced..b16c185 100644 --- a/app/service/upload_requests.py +++ b/app/service/upload_requests.py @@ -198,29 +198,6 @@ async def _create_staged_photo( return created_photo - def _ensure_request_access( - self, - *, - current_staff_user: StaffUser, - upload_request: UploadRequest, - ) -> None: - if upload_request.requested_by == current_staff_user.id: - return - if self._role_value(current_staff_user.role) == StaffRole.MULTI_TEAM_LEAD.value: - return - raise AppException.forbidden("You are not allowed to access this upload request") - - async def _publish_event( - self, - *, - subject: NatsSubjects, - payload: dict[str, object], - ) -> None: - try: - await NatsClient.publish(subject, json.dumps(payload).encode("utf-8")) - except Exception as exc: - logger.warning("Failed to publish upload request event %s: %s", subject.value, exc) - async def get_request_details( self, *, @@ -497,3 +474,26 @@ async def reject_request( ) await self._delete_staging_objects_best_effort(staged_photos) return UploadRequestDetails(request=upload_request, photos=rejected_photos) + + def _ensure_request_access( + self, + *, + current_staff_user: StaffUser, + upload_request: UploadRequest, + ) -> None: + if upload_request.requested_by == current_staff_user.id: + return + if self._role_value(current_staff_user.role) == StaffRole.MULTI_TEAM_LEAD.value: + return + raise AppException.forbidden("You are not allowed to access this upload request") + + async def _publish_event( + self, + *, + subject: NatsSubjects, + payload: dict[str, object], + ) -> None: + try: + await NatsClient.publish(subject, json.dumps(payload).encode("utf-8")) + except Exception as exc: + logger.warning("Failed to publish upload request event %s: %s", subject.value, exc) diff --git a/app/service/users.py b/app/service/users.py index 992da81..4a0c340 100644 --- a/app/service/users.py +++ b/app/service/users.py @@ -19,7 +19,7 @@ from db.generated import user as user_queries from db.generated import devices as device_queries from db.generated import session as session_queries -from db.generated.models import User, UserDevice +from db.generated.models import User from app.core.logger import logger from app.service.face_embedding import FaceImagePayload, FaceEmbeddingService from app.schema.internal.single_face_match import ClosestUserMatch @@ -49,7 +49,7 @@ async def _ensure_device_for_login( self, user_id: uuid.UUID, req: MobileAuthRequest, - ) -> UserDevice: + ) -> None: existing_device = await self.device_querier.get_device__by_id(id=req.device_id) if existing_device: @@ -61,7 +61,7 @@ async def _ensure_device_for_login( ) if not existing_device.is_active: await self.device_querier.activate_device(id=req.device_id, user_id=user_id) - return existing_device + return device = await self.device_querier.create_device( arg=device_queries.CreateDeviceParams( @@ -74,7 +74,6 @@ async def _ensure_device_for_login( ) if not device: raise AppException.internal_error("Failed to create device") - return device async def mobile_register_login( self, diff --git a/app/worker/audit/__init__.py b/app/worker/audit/__init__.py index dcd57fc..f266311 100644 --- a/app/worker/audit/__init__.py +++ b/app/worker/audit/__init__.py @@ -1,6 +1,4 @@ -"""Audit worker package exports.""" +"""Audit worker package.""" from __future__ import annotations -from .main import main # noqa: F401 - -__all__ = ["main"] +__all__: list[str] = [] diff --git a/app/worker/notification/settings.py b/app/worker/notification/settings.py index 9d38bbe..36a9116 100644 --- a/app/worker/notification/settings.py +++ b/app/worker/notification/settings.py @@ -1,8 +1,8 @@ from __future__ import annotations -from typing import Sequence +from typing import Sequence, ClassVar -from pydantic import Field +from pydantic import Field, ConfigDict from pydantic_settings import BaseSettings from app.schema.internal.notification import NotificationPriority, PRIORITY_ORDER @@ -19,16 +19,15 @@ class NotificationWorkerSettings(BaseSettings): nats_user: str = Field("") nats_password: str = Field("") firebase_credentials_path: str | None = Field(None) - MAX_SEND_ATTEMPTS = 5 - BASE_RETRY_DELAY = 2 - MAX_RETRY_DELAY_SECONDS = 60 - TTL_SECONDS = 30 * 24 * 3600 - CONCURRENCY = 10 - RATE_LIMIT = 50 - RATE_PERIOD = 1.0 - - class Config: - env_prefix = "NOTIFICATIONS_" + MAX_SEND_ATTEMPTS: ClassVar[int] = 5 + BASE_RETRY_DELAY: ClassVar[int] = 2 + MAX_RETRY_DELAY_SECONDS: ClassVar[int] = 60 + TTL_SECONDS: ClassVar[int] = 30 * 24 * 3600 + CONCURRENCY: ClassVar[int] = 10 + RATE_LIMIT: ClassVar[int] = 50 + RATE_PERIOD: ClassVar[float] = 1.0 + + model_config = ConfigDict(env_prefix="NOTIFICATIONS_") def subject_for(self, priority: NotificationPriority) -> str: return f"{self.subject_prefix}.{priority.value}" diff --git a/db/generated/photo_faces.py b/db/generated/photo_faces.py index 58f83be..3ed989b 100644 --- a/db/generated/photo_faces.py +++ b/db/generated/photo_faces.py @@ -39,6 +39,7 @@ inserted_match AS ( INSERT INTO face_matches (photo_face_id, user_id, confidence) SELECT upserted_photo_face.id, :p5, :p6 + FROM upserted_photo_face WHERE NOT EXISTS (SELECT 1 FROM existing_match) RETURNING id ) diff --git a/db/queries/photo_faces.sql b/db/queries/photo_faces.sql index 0c3d148..23dca1b 100644 --- a/db/queries/photo_faces.sql +++ b/db/queries/photo_faces.sql @@ -73,6 +73,7 @@ existing_match AS ( inserted_match AS ( INSERT INTO face_matches (photo_face_id, user_id, confidence) SELECT upserted_photo_face.id, $5, $6 + FROM upserted_photo_face WHERE NOT EXISTS (SELECT 1 FROM existing_match) RETURNING id ) diff --git a/migrations/sql/up/add-audit-table.sql b/migrations/sql/up/add-audit-table.sql index 430f043..1e81b84 100644 --- a/migrations/sql/up/add-audit-table.sql +++ b/migrations/sql/up/add-audit-table.sql @@ -1,11 +1,16 @@ -CREATE TYPE IF NOT EXISTS public.audit_event_type AS ENUM ( - 'user.signup', - 'user.login', - 'user.logout', - 'upload_request.created', - 'upload_request.approved', - 'upload_request.rejected' -); +DO $$ +BEGIN + CREATE TYPE public.audit_event_type AS ENUM ( + 'user.signup', + 'user.login', + 'user.logout', + 'upload_request.created', + 'upload_request.approved', + 'upload_request.rejected' + ); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; CREATE TABLE IF NOT EXISTS public.audit_events ( id uuid DEFAULT public.uuid_generate_v4() NOT NULL, From 919f3ff86f8feda5e039aa921c2233dee83cc60a Mon Sep 17 00:00:00 2001 From: bouhamza abderrahmane Date: Thu, 26 Mar 2026 14:27:43 +0100 Subject: [PATCH 10/10] refactor(single-face-worker): inline helpers and remove unused embedding code --- app/service/face_embedding.py | 78 ---------------------------- app/worker/single_face_match/main.py | 44 +++++++--------- 2 files changed, 19 insertions(+), 103 deletions(-) diff --git a/app/service/face_embedding.py b/app/service/face_embedding.py index e2e401c..ecae3e7 100644 --- a/app/service/face_embedding.py +++ b/app/service/face_embedding.py @@ -12,9 +12,6 @@ from app.core.logger import logger -BBox = tuple[int, int, int, int] - - class FaceImagePayload(TypedDict): filename: str content_type: str @@ -88,44 +85,6 @@ def prepare(self) -> None: self.load_model() self.init_model() - def embed(self, image: np.ndarray, bboxes: Sequence[BBox]) -> list[float]: - if not bboxes: - raise ValueError("No faces to embed") - - if self.model is None or not self._initialized: - raise RuntimeError("Model not ready. Call `prepare()` first.") - - image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - faces: list[FaceStub] = self.model.get(image_rgb) # type: ignore - - if not faces: - raise ValueError("No faces detected by the model") - - x1, y1, x2, y2 = bboxes[0] # type: ignore - target_cx = (x1 + x2) / 2 - target_cy = (y1 + y2) / 2 - - best_face: Optional[FaceStub] = None - best_dist = float("inf") - - for face in faces: - fx1, fy1, fx2, fy2 = face.bbox - cx = (fx1 + fx2) / 2 - cy = (fy1 + fy2) / 2 - - dist = np.sqrt((cx - target_cx) ** 2 + (cy - target_cy) ** 2) - - if dist < best_dist: - best_dist = dist - best_face = face - - if best_face is None or best_face.embedding is None: - raise ValueError("Failed to generate embedding for selected face") - - embedding = best_face.embedding.flatten() - return embedding.tolist() - class FaceEmbeddingService: """Service layer for face embedding workflows.""" @@ -173,43 +132,6 @@ async def compute_average_embedding( return averaged.astype(float).tolist() - async def compute_event_embedding( - self, - payloads: Sequence[FaceImagePayload], - ) -> dict[str, list[list[float]]]: - - if not payloads: - raise AppException.bad_request( - "At least one image is required" - ) - - results: dict[str, list[list[float]]] = {} - - for payload in payloads: - try: - image = self._decode_image(payload) - image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - faces: list[FaceStub] = await asyncio.to_thread( # type: ignore - self.face_embedding.model.get, image_rgb # type: ignore - ) - - results[payload["filename"]] = [ - face.embedding.flatten().tolist() - for face in faces - if face.embedding is not None - ] - - except Exception as e: - logger.warning( - "Skipping %s due to face embedding error: %s", - payload["filename"], - e, - ) - results[payload["filename"]] = [] - - return results - async def detect_faces( self, payload: FaceImagePayload, diff --git a/app/worker/single_face_match/main.py b/app/worker/single_face_match/main.py index d5cc284..a58b589 100644 --- a/app/worker/single_face_match/main.py +++ b/app/worker/single_face_match/main.py @@ -28,7 +28,25 @@ async def handle_message(self, data: bytes) -> None: return try: - payload = await self._load_payload(job) + if not job.image_ref: + raise ValueError("Missing image_ref in event payload") + + if job.image_ref.startswith(MINIO_URL_PREFIX): + raw = job.image_ref[len(MINIO_URL_PREFIX) :] + parts = raw.split("/", 1) + if len(parts) != 2 or not parts[0] or not parts[1]: + raise ValueError("Invalid MinIO image_ref format") + bucket_name, object_name = parts[0], parts[1] + else: + bucket_name, object_name = IMAGES_BUCKET_NAME, job.image_ref + + bucket = Bucket(bucket_name, "") + data, filename, content_type = await bucket.get(object_name) + payload = FaceImagePayload( + filename=filename, + content_type=content_type, + bytes=data, + ) except Exception as exc: logger.warning("Failed to load image payload for photo %s: %s", job.photo_id, exc) return @@ -61,30 +79,6 @@ async def handle_message(self, data: bytes) -> None: logger.exception("Failed to process single face match job: %s", exc) return - async def _load_payload(self, job: SingleFaceMatchJob) -> FaceImagePayload: - if not job.image_ref: - raise ValueError("Missing image_ref in event payload") - - bucket_name, object_name = self._parse_minio_ref(job.image_ref) - bucket = Bucket(bucket_name, "") - data, filename, content_type = await bucket.get(object_name) - return FaceImagePayload( - filename=filename, - content_type=content_type, - bytes=data, - ) - - @staticmethod - def _parse_minio_ref(image_ref: str) -> tuple[str, str]: - if image_ref.startswith(MINIO_URL_PREFIX): - raw = image_ref[len(MINIO_URL_PREFIX) :] - parts = raw.split("/", 1) - if len(parts) != 2 or not parts[0] or not parts[1]: - raise ValueError("Invalid MinIO image_ref format") - return parts[0], parts[1] - return IMAGES_BUCKET_NAME, image_ref - - async def run_worker() -> None: await init_minio_client( minio_host=settings.MINIO_HOST,