Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions api/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def upload_asset(

# GCS client calls are synchronous and can block for large uploads.
request_started_at = time.perf_counter()
uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket)
uri, blob_name, _created = await run_in_threadpool(gcs_upload, file, bucket)
if is_debug_timing_enabled():
logger.info(
"asset upload request completed",
Expand Down Expand Up @@ -255,7 +255,12 @@ async def upload_and_record_asset(
)

# ── 4. Upload file to GCS (blocking I/O — run in thread pool) ────────────
uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket)
# `created` is True only when this request actually wrote the blob — when
# gcs_upload deduplicates against an existing hash-named object it is
# False, meaning the blob is potentially shared by other Assets.
uri, blob_name, blob_created_by_request = await run_in_threadpool(
gcs_upload, file, bucket
)

# ── 5. Return existing record for duplicate file + thing combinations ─────
existing = check_asset_exists(session, blob_name, thing_id=thing_id)
Expand All @@ -280,11 +285,66 @@ async def upload_and_record_asset(
assoc.thing = thing
assoc.asset = asset

session.add(asset)
session.add(assoc)
session.commit()
session.refresh(asset)
# If the write fails BEFORE commit, roll back. Only delete the blob if
# this request actually created it AND no Asset row references it after
# rollback; otherwise we would orphan another Thing's Asset that shares
# the same hash-named blob (gcs_upload deduplicates by content hash).
# session.refresh() is intentionally outside the cleanup block: it runs
# AFTER the commit succeeded, so a refresh failure must not delete the
# blob — the committed Asset row would then point at a missing object.
try:
session.add(asset)
session.add(assoc)
session.commit()
except Exception:
Comment thread
jirhiker marked this conversation as resolved.
# Entire cleanup path is wrapped in one outer try/except so NOTHING
# in here (rollback, reference query, bucket.blob(), delete) can
# mask the original commit exception. Per-step try/excepts below
# produce finer-grained log messages.
try:
try:
session.rollback()
except Exception:
logger.exception(
"session.rollback() failed after asset commit failure; "
"original exception will still be re-raised"
)
if blob_created_by_request:
# Reference check is best-effort: if it raises, do NOT
# delete the blob (we cannot confirm it is unreferenced).
try:
still_referenced = session.scalars(
select(Asset).where(Asset.storage_path == blob_name)
).first()
except Exception:
logger.warning(
"Could not verify blob references; skipping cleanup for %s",
blob_name,
exc_info=True,
)
still_referenced = object() # sentinel: assume referenced
if still_referenced is None:
try:
await run_in_threadpool(bucket.blob(blob_name).delete)
except Exception:
logger.warning(
"Failed to clean up uploaded blob after DB failure: %s",
blob_name,
exc_info=True,
)
else:
logger.info(
"Skipping blob cleanup; another Asset still references %s",
blob_name,
)
Comment on lines +315 to +339
except Exception:
logger.exception(
"Unexpected error during asset upload cleanup; original "
"commit exception will still be re-raised"
)
raise
Comment thread
jirhiker marked this conversation as resolved.
Comment thread
jirhiker marked this conversation as resolved.

session.refresh(asset)
return asset


Expand Down
2 changes: 1 addition & 1 deletion services/asset_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def upload_and_associate(
name: str,
**asset_args,
) -> tuple[str, str]:
uri, blob_name = gcs_upload(ff, bucket)
uri, blob_name, _created = gcs_upload(ff, bucket)
asset = Asset(
name=name,
storage_path=blob_name,
Expand Down
4 changes: 3 additions & 1 deletion services/gcs_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def gcs_upload(file: UploadFile, bucket=None):
blob_exists=eblob is not None,
)

created = False
if not eblob:
blob = bucket.blob(blob_name)
file.file.seek(0)
Expand All @@ -150,6 +151,7 @@ def gcs_upload(file: UploadFile, bucket=None):
content_type=file.content_type,
timeout=GCS_UPLOAD_TIMEOUT_SECS,
)
created = True
_log_stage(
"upload_blob",
upload_blob_started_at,
Expand All @@ -162,7 +164,7 @@ def gcs_upload(file: UploadFile, bucket=None):
filename=file.filename,
blob_name=blob_name,
)
return uri, blob_name
return uri, blob_name, created


def gcs_remove(uri: str, bucket):
Expand Down
138 changes: 135 additions & 3 deletions tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
import asyncio
import io
import logging
import os
from datetime import timezone
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

Expand Down Expand Up @@ -122,13 +123,14 @@ def test_gcs_upload_logs_stage_timings(caplog):

with patch.dict(os.environ, {"API_DEBUG_TIMING": "true"}):
with caplog.at_level(logging.INFO, logger="services.gcs_helper"):
uri, blob_name = gcs_helper.gcs_upload(upload, bucket)
uri, blob_name, created = gcs_helper.gcs_upload(upload, bucket)

stage_logs = [
record for record in caplog.records if record.msg == "gcs stage timing"
]

assert uri.endswith(blob_name)
assert created is True
assert {record.stage for record in stage_logs} >= {
"hash_file",
"lookup_blob",
Expand All @@ -150,9 +152,10 @@ def test_gcs_upload_skips_existing_blob():
},
)()

gcs_helper.gcs_upload(upload, bucket)
_uri, _blob_name, created = gcs_helper.gcs_upload(upload, bucket)

assert bucket._blob.upload_calls == 0
assert created is False


def test_make_blob_name_and_uri_rewinds_file_after_hashing():
Expand Down Expand Up @@ -519,4 +522,133 @@ def test_upload_and_record_asset_file_too_large(water_well_thing):
assert data["detail"][0]["type"] == "value_error"


def _run_upload_and_record_with_failing_commit(blob_created_by_request):
"""
Drive upload_and_record_asset with a mocked session whose commit() raises.
Returns (raised_exc, session_mock, bucket_mock, blob_mock).
"""
Comment on lines +526 to +529
from api.asset import upload_and_record_asset

commit_error = RuntimeError("simulated commit failure")

session_mock = MagicMock()
session_mock.get.return_value = MagicMock(id=1) # Thing exists
Comment on lines +534 to +535
session_mock.commit.side_effect = commit_error
# No Asset row references blob_name after rollback.
scalar_result = MagicMock()
scalar_result.first.return_value = None
session_mock.scalars.return_value = scalar_result

blob_mock = MagicMock()
bucket_mock = MagicMock()
bucket_mock.blob.return_value = blob_mock

file_mock = MagicMock()
file_mock.content_type = "image/png"
file_mock.filename = "x.png"
file_mock.size = 10

with (
patch(
"services.gcs_helper.gcs_upload",
return_value=("uri://blob", "x_hash.png", blob_created_by_request),
),
patch("services.gcs_helper.check_asset_exists", return_value=None),
):
try:
asyncio.run(
upload_and_record_asset(
user={"name": "test", "sub": "1"},
session=session_mock,
bucket=bucket_mock,
file=file_mock,
thing_id=1,
label=None,
name=None,
)
)
raised = None
except Exception as e:
raised = e

return raised, commit_error, session_mock, bucket_mock, blob_mock


def test_upload_and_record_cleans_blob_when_created_and_db_fails():
"""
DB commit failure path: rollback runs, blob.delete is called, original
commit exception is re-raised (cleanup must not mask it).
"""
Comment on lines +577 to +581
raised, commit_error, session, bucket, blob = (
_run_upload_and_record_with_failing_commit(blob_created_by_request=True)
)

assert raised is commit_error
session.rollback.assert_called_once()
bucket.blob.assert_called_with("x_hash.png")
blob.delete.assert_called_once()


def test_upload_and_record_skips_blob_delete_when_blob_was_preexisting():
"""
When gcs_upload reports created=False (hash dedup hit a pre-existing
blob possibly shared with other Things' Assets), the cleanup path must
NOT delete the blob even though the DB write failed.
"""
raised, commit_error, session, bucket, blob = (
_run_upload_and_record_with_failing_commit(blob_created_by_request=False)
)

assert raised is commit_error
session.rollback.assert_called_once()
blob.delete.assert_not_called()


def test_upload_and_record_reraises_original_exception_when_rollback_fails():
"""
A rollback failure inside the cleanup handler must not replace the
original commit exception that the caller cares about.
"""
from api.asset import upload_and_record_asset

commit_error = RuntimeError("simulated commit failure")
rollback_error = RuntimeError("rollback also failed")

session_mock = MagicMock()
session_mock.get.return_value = MagicMock(id=1)
Comment on lines +617 to +618
session_mock.commit.side_effect = commit_error
session_mock.rollback.side_effect = rollback_error
scalar_result = MagicMock()
scalar_result.first.return_value = None
session_mock.scalars.return_value = scalar_result

bucket_mock = MagicMock()
file_mock = MagicMock()
file_mock.content_type = "image/png"
file_mock.filename = "x.png"
file_mock.size = 10

with (
patch(
"services.gcs_helper.gcs_upload",
return_value=("uri://blob", "x_hash.png", True),
),
patch("services.gcs_helper.check_asset_exists", return_value=None),
):
with pytest.raises(RuntimeError) as exc_info:
asyncio.run(
upload_and_record_asset(
user={"name": "test", "sub": "1"},
session=session_mock,
bucket=bucket_mock,
file=file_mock,
thing_id=1,
label=None,
name=None,
)
)

assert exc_info.value is commit_error


# ============= EOF =============================================
Loading