Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion backend/src/cms_backend/processors/book.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from uuid import UUID

from sqlalchemy.orm import Session as ORMSession

from cms_backend import logger
from cms_backend.db.models import Book, Title
from cms_backend.db.book import create_book_location
from cms_backend.db.models import Book, Title, TitleWarehousePath
from cms_backend.db.title import get_title_by_name_and_producer_or_none
from cms_backend.utils.datetime import getnow
from cms_backend.utils.filename import compute_target_filename


def check_book_qa(book: Book) -> bool:
Expand Down Expand Up @@ -72,3 +76,95 @@ def get_matching_title(session: ORMSession, book: Book) -> Title | None:
logger.exception(f"Failed to get matching title for {book.id}")
book.status = "errored"
return None


def _current_locations_match_targets(
book: Book,
target_locations: list[tuple[UUID, str]],
) -> bool:
"""Check if book's current locations exactly match the target locations.

Args:
book: The book to check
target_locations: List of (warehouse_path_id, filename) tuples representing
target locations

Returns:
True if the set of current locations is strictly identical to target locations
"""
# Extract current locations as set of (warehouse_path_id, filename) tuples
current_set = {
(loc.warehouse_path_id, loc.filename)
for loc in book.locations
if loc.status == "current"
}

# Convert target list to set
target_set = set(target_locations)

# Must be strictly identical
return current_set == target_set


def create_book_target_locations(
session: ORMSession,
book: Book,
target_warehouse_paths: list[TitleWarehousePath],
) -> None:
"""Create target locations for a book if not already at expected locations.

Computes target locations based on the provided warehouse paths and filename,
then checks if the book's current locations already match. If they do, no new
target locations are created. Otherwise, target locations are created for each
warehouse path.

Args:
session: SQLAlchemy session
book: Book to create target locations for
target_warehouse_paths: List of TitleWarehousePath objects defining where the
book should be

Side effects:
- Adds event to book if targets already match current locations
- Creates BookLocation records if targets don't match current locations
"""

if not book.name:
raise Exception("book name is missing or invalid")

if not book.date:
raise Exception("book date is missing or invalid")

# Compute target filename once for this book
target_filename = compute_target_filename(
session,
name=book.name,
flavour=book.flavour,
date=book.date,
book_id=book.id,
)

# Compute all target locations as (warehouse_path_id, filename) tuples
target_locations = [
(title_warehouse_path.warehouse_path_id, target_filename)
for title_warehouse_path in target_warehouse_paths
]

# Check if current locations already match targets exactly
if _current_locations_match_targets(book, target_locations):
# Book is already at all expected locations - skip creating targets
book.events.append(
f"{getnow()}: book already at all target locations, skipping target "
"creation"
)
return

# Create target locations for each applicable warehouse path
for title_warehouse_path in target_warehouse_paths:
create_book_location(
session=session,
book=book,
warehouse_path_id=title_warehouse_path.warehouse_path_id,
filename=target_filename,
status="target",
)
34 changes: 10 additions & 24 deletions backend/src/cms_backend/processors/title.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from sqlalchemy.orm import Session as OrmSession

from cms_backend import logger
from cms_backend.db.book import create_book_location
from cms_backend.db.models import Book, Title, TitleWarehousePath
from cms_backend.processors.book import create_book_target_locations
from cms_backend.utils.datetime import getnow
from cms_backend.utils.filename import compute_target_filename


def add_book_to_title(session: OrmSession, book: Book, title: Title):
Expand All @@ -18,16 +17,14 @@ def add_book_to_title(session: OrmSession, book: Book, title: Title):
if not book.date:
raise Exception("book date is missing or invalid")

name = book.name

title.books.append(book)
book.events.append(f"{getnow()}: book added to title {title.id}")
title.events.append(f"{getnow()}: book {book.id} added to title")
book.status = "processed"

if name and title.name != name:
title.events.append(f"{getnow()}: updating title name to {name}")
title.name = name
if title.name != book.name:
title.events.append(f"{getnow()}: updating title name to {book.name}")
title.name = book.name

# Update title producer display fields from book
if title.producer_display_name != book.producer_display_name:
Expand All @@ -44,14 +41,6 @@ def add_book_to_title(session: OrmSession, book: Book, title: Title):
)
title.producer_display_url = book.producer_display_url

# Compute target filename once for this book
target_filename = compute_target_filename(
session,
name=name,
flavour=book.flavour,
date=book.date,
)

# Determine which warehouse paths to use based on title.in_prod
path_type = "prod" if title.in_prod else "dev"

Expand All @@ -62,15 +51,12 @@ def add_book_to_title(session: OrmSession, book: Book, title: Title):
)
target_warehouse_paths = session.scalars(stmt).all()

# Create target locations for each applicable warehouse path
for title_warehouse_path in target_warehouse_paths:
create_book_location(
session=session,
book=book,
warehouse_path_id=title_warehouse_path.warehouse_path_id,
filename=target_filename,
status="target",
)
# Create target locations if not already at expected locations
create_book_target_locations(
session=session,
book=book,
target_warehouse_paths=list(target_warehouse_paths),
)

except Exception as exc:
book.events.append(
Expand Down
17 changes: 15 additions & 2 deletions backend/src/cms_backend/utils/filename.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Utilities for computing and managing book target filenames."""

from uuid import UUID

from sqlalchemy import select
from sqlalchemy.orm import Session as OrmSession

Expand Down Expand Up @@ -55,7 +57,12 @@ def get_next_suffix(current_suffix: str) -> str:


def compute_target_filename(
session: OrmSession, *, name: str, flavour: str | None, date: str
session: OrmSession,
*,
name: str,
flavour: str | None,
date: str,
book_id: UUID | None = None,
) -> str:
"""
Compute target filename: {name}[_{flavour}]_{period}[suffix]
Expand All @@ -67,7 +74,8 @@ def compute_target_filename(
- YYYY-MMaa, YYYY-MMab, ... (multiple letters)

Finds the last suffix already in use and generates the next one.
Queries ALL book locations (any status) with filenames starting with base pattern.
Queries ALL book locations (any status) with filenames starting with base pattern,
excluding locations from the current book to avoid self-collision.

Important edge cases:
- Books with same name but different flavours (including no flavour) never collide
Expand All @@ -79,6 +87,7 @@ def compute_target_filename(
name: Book name
flavour: Book flavour (optional)
date: Book date (format: YYYY-MM-DD)
book_id: ID of the book being processed (to exclude its own locations)

Returns:
Target filename including .zim extension
Expand Down Expand Up @@ -107,9 +116,13 @@ def compute_target_filename(

# Query all locations where filename starts with this pattern
# Check ALL locations regardless of status (current or target)
# Exclude the current book's own locations to avoid self-collision
stmt = select(BookLocation.filename).where(
BookLocation.filename.like(f"{base_pattern}%")
)
if book_id is not None:
stmt = stmt.where(BookLocation.book_id != book_id)

existing_filenames = list(session.scalars(stmt).all())

# If no existing files, use base pattern (no suffix)
Expand Down
Loading