Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 150 additions & 1 deletion api/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import logging
import time

from fastapi import APIRouter, Depends, UploadFile, File
from fastapi import APIRouter, Depends, Form, UploadFile, File
from fastapi_pagination.ext.sqlalchemy import paginate
from sqlalchemy import select
from sqlalchemy.exc import ProgrammingError
from starlette.concurrency import run_in_threadpool
from starlette.status import (
HTTP_201_CREATED,
HTTP_204_NO_CONTENT,
HTTP_400_BAD_REQUEST,
HTTP_409_CONFLICT,
)

Expand All @@ -47,6 +48,24 @@
router = APIRouter(prefix="/asset", tags=["asset"])
logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# File upload constraints
# ---------------------------------------------------------------------------

ALLOWED_MIME_TYPES = frozenset(
{
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
"image/tiff",
"application/pdf",
"text/plain",
}
)

MAX_UPLOAD_SIZE_BYTES = 250 * 1024 * 1024 # 250 MB


def is_debug_timing_enabled() -> bool:
return bool(get_bool_env("API_DEBUG_TIMING", False))
Expand Down Expand Up @@ -139,6 +158,136 @@ async def upload_asset(
}


@router.post("/upload-and-record", status_code=HTTP_201_CREATED)
async def upload_and_record_asset(
user: admin_dependency,
session: session_dependency,
bucket=Depends(get_storage_bucket),
file: UploadFile = File(...),
thing_id: int = Form(...),
label: str | None = Form(None),
name: str | None = Form(None),
) -> AssetResponse:
"""
Upload a digital asset to GCS and record it in the database in one step.

Accepts a multipart/form-data request containing the file and optional
metadata. Validates the file type and size before uploading. If the same
file has already been uploaded for the same Thing, the existing record is
returned instead of creating a duplicate.

Args:
user: Authenticated admin user performing the upload.
session: Active database session.
bucket: GCS storage bucket resolved via dependency injection.
file: The file to upload. Accepted MIME types: JPEG, PNG, GIF, WebP,
TIFF (images); PDF (documents); plain text. Max size: 250 MB.
thing_id: ID of the Thing (e.g. a well) this asset belongs to.
label: Optional human-readable label for the asset.
name: Optional asset name. Defaults to the uploaded filename.

Returns:
AssetResponse: The newly created (or pre-existing duplicate) asset
record, including its database ID, GCS URI, and storage path.

Raises:
400 Bad Request: File MIME type is not in the allowed set, or the
file size exceeds 250 MB.
409 Conflict: No Thing with the given thing_id exists.
"""
from services.gcs_helper import gcs_upload, check_asset_exists

# ── 1. Validate file type ────────────────────────────────────────────────
if file.content_type not in ALLOWED_MIME_TYPES:
raise PydanticStyleException(
status_code=HTTP_400_BAD_REQUEST,
detail=[
{
"loc": ["file"],
"msg": (
f"Unsupported file type '{file.content_type}'. "
f"Allowed types: {', '.join(sorted(ALLOWED_MIME_TYPES))}."
),
"type": "value_error",
"input": {"content_type": file.content_type},
}
],
)

# ── 2. Validate file size ────────────────────────────────────────────────
# file.size is set by FastAPI during multipart parsing.
# Fall back to seeking when unavailable (e.g. streaming clients).
file_size = file.size
if file_size is None:
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)

if file_size > MAX_UPLOAD_SIZE_BYTES:
raise PydanticStyleException(
status_code=HTTP_400_BAD_REQUEST,
detail=[
{
"loc": ["file"],
"msg": (
f"File size {file_size} bytes exceeds the maximum "
f"upload size of {MAX_UPLOAD_SIZE_BYTES} bytes (250 MB)."
),
"type": "value_error",
"input": {"size": file_size},
}
],
)

# ── 3. Upload file to GCS (blocking I/O — run in thread pool) ────────────
uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket)

# ── 4. Return existing record for duplicate file + thing combinations ─────
existing = check_asset_exists(session, blob_name, thing_id=thing_id)
if existing:
return existing

# ── 5. Validate the Thing exists ─────────────────────────────────────────
thing = session.get(Thing, thing_id)
if thing is None:
raise PydanticStyleException(
status_code=HTTP_409_CONFLICT,
detail=[
{
"loc": ["body", "thing_id"],
"msg": f"Thing with ID {thing_id} not found.",
"type": "value_error",
"input": {"thing_id": thing_id},
}
],
)

# ── 6. Persist the Asset record ───────────────────────────────────────────
asset = Asset(
name=name or file.filename,
label=label,
storage_path=blob_name,
storage_service="gcs",
mime_type=file.content_type,
size=file_size,
uri=uri,
)
audit_add(user, asset)

# ── 7. Link the Asset to the Thing ───────────────────────────────────────
assoc = AssetThingAssociation()
audit_add(user, assoc)
assoc.thing = thing
assoc.asset = asset

session.add(asset)
session.add(assoc)
session.commit()
session.refresh(asset)

return asset


@router.post("", status_code=HTTP_201_CREATED)
async def add_asset(
user: admin_dependency,
Expand Down
4 changes: 3 additions & 1 deletion services/audit_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

def audit_add(user: dict, obj: DeclarativeBase) -> None:
# see note in "AuditMixin"
if user:
# Guard against non-dict values: when AUTHENTIK_DISABLE_AUTHENTICATION=1
# the auth dependency returns True instead of a claims dict.
if user and isinstance(user, dict):
obj.created_by_id = user["sub"]
obj.created_by_name = user["name"]

Expand Down
135 changes: 135 additions & 0 deletions tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,139 @@ def test_remove_asset(second_asset):
assert response.status_code == 204


# UPLOAD-AND-RECORD tests ====================================================


def test_upload_and_record_asset(water_well_thing):
"""
Happy path — valid image uploaded with a known thing_id.

Expects a 201 response with a fully populated AssetResponse that links
the new Asset to the given Thing. Cleans up the created record after
the assertion so the database is left in the same state.
"""
path = "tests/data/riochama.png"

with open(path, "rb") as f:
response = client.post(
"/asset/upload-and-record",
data={"thing_id": water_well_thing.id, "label": "Well photo"},
files={"file": ("riochama.png", f, "image/png")},
)

assert response.status_code == 201
data = response.json()

assert "id" in data
assert "uri" in data
assert "storage_path" in data
assert data["name"] == "riochama.png"
assert data["label"] == "Well photo"
assert data["mime_type"] == "image/png"
assert data["storage_service"] == "gcs"
assert data["size"] > 0

cleanup_post_test(Asset, data["id"])


def test_upload_and_record_asset_duplicate_returns_existing(water_well_thing):
"""
Uploading the same file to the same Thing twice must not create a duplicate.

The second call should return the asset created by the first call (same id)
and respond with 201 rather than an error.
"""
path = "tests/data/riochama.png"

with open(path, "rb") as f:
first = client.post(
"/asset/upload-and-record",
data={"thing_id": water_well_thing.id},
files={"file": ("riochama.png", f, "image/png")},
)
assert first.status_code == 201
first_id = first.json()["id"]

with open(path, "rb") as f:
second = client.post(
"/asset/upload-and-record",
data={"thing_id": water_well_thing.id},
files={"file": ("riochama.png", f, "image/png")},
)
assert second.status_code == 201
assert second.json()["id"] == first_id

cleanup_post_test(Asset, first_id)


def test_upload_and_record_asset_bad_thing_id():
"""
Providing a thing_id that does not exist must return 409 Conflict.

The error payload must identify the offending field and echo back the
supplied value so the caller can surface a meaningful error message.
"""
bad_thing_id = 99999
path = "tests/data/riochama.png"

with open(path, "rb") as f:
response = client.post(
"/asset/upload-and-record",
data={"thing_id": bad_thing_id},
files={"file": ("riochama.png", f, "image/png")},
)

assert response.status_code == 409
data = response.json()
assert data["detail"][0]["loc"] == ["body", "thing_id"]
assert data["detail"][0]["msg"] == f"Thing with ID {bad_thing_id} not found."
assert data["detail"][0]["type"] == "value_error"
assert data["detail"][0]["input"] == {"thing_id": bad_thing_id}


def test_upload_and_record_asset_invalid_file_type(water_well_thing):
"""
Files whose MIME type is not in ALLOWED_MIME_TYPES must be rejected
with 400 Bad Request before any GCS or database work is attempted.
"""
path = "tests/data/riochama.png"

with open(path, "rb") as f:
response = client.post(
"/asset/upload-and-record",
data={"thing_id": water_well_thing.id},
files={"file": ("riochama.png", f, "video/mp4")},
)

assert response.status_code == 400
data = response.json()
assert data["detail"][0]["loc"] == ["file"]
assert "video/mp4" in data["detail"][0]["msg"]
assert data["detail"][0]["type"] == "value_error"
assert data["detail"][0]["input"] == {"content_type": "video/mp4"}


def test_upload_and_record_asset_file_too_large(water_well_thing):
"""
Files whose size exceeds MAX_UPLOAD_SIZE_BYTES must be rejected with
400 Bad Request. The constant is patched to 5 bytes so any real file
triggers the limit without requiring an actual 250 MB payload.
"""
path = "tests/data/riochama.png"

with patch("api.asset.MAX_UPLOAD_SIZE_BYTES", 5):
with open(path, "rb") as f:
response = client.post(
"/asset/upload-and-record",
data={"thing_id": water_well_thing.id},
files={"file": ("riochama.png", f, "image/png")},
)

assert response.status_code == 400
data = response.json()
assert data["detail"][0]["loc"] == ["file"]
assert "exceeds" in data["detail"][0]["msg"]
assert data["detail"][0]["type"] == "value_error"


# ============= EOF =============================================
Loading