From b0d30a1de1a9829815636fe57e76195040615e8a Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 8 Jun 2026 11:23:53 -0600 Subject: [PATCH 1/5] fix: rollback and delete GCS blob if asset DB write fails If session.commit() (or .add/.refresh) raises after gcs_upload has already written the file, the bucket would retain an orphaned object with no Asset row pointing at it. Wrap the DB write in try/except: rollback the session and best-effort delete the blob, then re-raise. A failure to delete the blob is logged but does not mask the original exception. Co-Authored-By: Claude Opus 4.7 --- api/asset.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/api/asset.py b/api/asset.py index 6de8e132..d9ec7352 100644 --- a/api/asset.py +++ b/api/asset.py @@ -280,10 +280,24 @@ async def upload_and_record_asset( assoc.thing = thing assoc.asset = asset - session.add(asset) - session.add(assoc) - session.commit() - session.refresh(asset) + # If the DB write fails, roll back and best-effort delete the GCS blob so + # the bucket does not retain an object with no Asset row pointing at it. + try: + session.add(asset) + session.add(assoc) + session.commit() + session.refresh(asset) + except Exception: + session.rollback() + try: + await run_in_threadpool(bucket.blob(blob_name).delete) + except Exception: + logger.warning( + "Failed to clean up uploaded blob after DB failure: %s", + blob_name, + exc_info=True, + ) + raise return asset From 19246aa33acb7fdbc2f6e845b84bf9b790ac2f07 Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 8 Jun 2026 11:33:31 -0600 Subject: [PATCH 2/5] fix: only delete blob on cleanup when this request created it and no Asset still references it gcs_upload deduplicates by content hash: when an object with the hash-named blob already exists, it skips the upload and returns the existing blob_name. The previous cleanup-on-failure path would delete that shared object, breaking Assets owned by other Things that point at the same hash-named blob. - gcs_upload now also returns a `created` flag indicating whether this call actually wrote the blob. All callers updated to unpack three values. - In upload_and_record_asset, on DB-write failure: only delete the blob when (a) this request created it AND (b) no Asset row references storage_path == blob_name after rollback. If another Asset still references it, log and leave it alone. Co-Authored-By: Claude Opus 4.7 --- api/asset.py | 41 ++++++++++++++++++++++++++++------------ services/asset_helper.py | 2 +- services/gcs_helper.py | 4 +++- tests/test_asset.py | 6 ++++-- 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/api/asset.py b/api/asset.py index d9ec7352..27742f86 100644 --- a/api/asset.py +++ b/api/asset.py @@ -138,7 +138,7 @@ async def upload_asset( # GCS client calls are synchronous and can block for large uploads. request_started_at = time.perf_counter() - uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket) + uri, blob_name, _created = await run_in_threadpool(gcs_upload, file, bucket) if is_debug_timing_enabled(): logger.info( "asset upload request completed", @@ -255,7 +255,12 @@ async def upload_and_record_asset( ) # ── 4. Upload file to GCS (blocking I/O — run in thread pool) ──────────── - uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket) + # `created` is True only when this request actually wrote the blob — when + # gcs_upload deduplicates against an existing hash-named object it is + # False, meaning the blob is potentially shared by other Assets. + uri, blob_name, blob_created_by_request = await run_in_threadpool( + gcs_upload, file, bucket + ) # ── 5. Return existing record for duplicate file + thing combinations ───── existing = check_asset_exists(session, blob_name, thing_id=thing_id) @@ -280,8 +285,10 @@ async def upload_and_record_asset( assoc.thing = thing assoc.asset = asset - # If the DB write fails, roll back and best-effort delete the GCS blob so - # the bucket does not retain an object with no Asset row pointing at it. + # If the DB write fails, roll back. Only delete the blob if this request + # actually created it AND no Asset row references it after rollback; + # otherwise we would orphan another Thing's Asset that shares the same + # hash-named blob (gcs_upload deduplicates by content hash). try: session.add(asset) session.add(assoc) @@ -289,14 +296,24 @@ async def upload_and_record_asset( session.refresh(asset) except Exception: session.rollback() - try: - await run_in_threadpool(bucket.blob(blob_name).delete) - except Exception: - logger.warning( - "Failed to clean up uploaded blob after DB failure: %s", - blob_name, - exc_info=True, - ) + if blob_created_by_request: + still_referenced = session.scalars( + select(Asset).where(Asset.storage_path == blob_name) + ).first() + if still_referenced is None: + try: + await run_in_threadpool(bucket.blob(blob_name).delete) + except Exception: + logger.warning( + "Failed to clean up uploaded blob after DB failure: %s", + blob_name, + exc_info=True, + ) + else: + logger.info( + "Skipping blob cleanup; another Asset still references %s", + blob_name, + ) raise return asset diff --git a/services/asset_helper.py b/services/asset_helper.py index 51a4654f..0fb6df8d 100644 --- a/services/asset_helper.py +++ b/services/asset_helper.py @@ -24,7 +24,7 @@ def upload_and_associate( name: str, **asset_args, ) -> tuple[str, str]: - uri, blob_name = gcs_upload(ff, bucket) + uri, blob_name, _created = gcs_upload(ff, bucket) asset = Asset( name=name, storage_path=blob_name, diff --git a/services/gcs_helper.py b/services/gcs_helper.py index da9ce606..a8bf0988 100644 --- a/services/gcs_helper.py +++ b/services/gcs_helper.py @@ -141,6 +141,7 @@ def gcs_upload(file: UploadFile, bucket=None): blob_exists=eblob is not None, ) + created = False if not eblob: blob = bucket.blob(blob_name) file.file.seek(0) @@ -150,6 +151,7 @@ def gcs_upload(file: UploadFile, bucket=None): content_type=file.content_type, timeout=GCS_UPLOAD_TIMEOUT_SECS, ) + created = True _log_stage( "upload_blob", upload_blob_started_at, @@ -162,7 +164,7 @@ def gcs_upload(file: UploadFile, bucket=None): filename=file.filename, blob_name=blob_name, ) - return uri, blob_name + return uri, blob_name, created def gcs_remove(uri: str, bucket): diff --git a/tests/test_asset.py b/tests/test_asset.py index 1d9ece4c..cbac1e0e 100644 --- a/tests/test_asset.py +++ b/tests/test_asset.py @@ -122,13 +122,14 @@ def test_gcs_upload_logs_stage_timings(caplog): with patch.dict(os.environ, {"API_DEBUG_TIMING": "true"}): with caplog.at_level(logging.INFO, logger="services.gcs_helper"): - uri, blob_name = gcs_helper.gcs_upload(upload, bucket) + uri, blob_name, created = gcs_helper.gcs_upload(upload, bucket) stage_logs = [ record for record in caplog.records if record.msg == "gcs stage timing" ] assert uri.endswith(blob_name) + assert created is True assert {record.stage for record in stage_logs} >= { "hash_file", "lookup_blob", @@ -150,9 +151,10 @@ def test_gcs_upload_skips_existing_blob(): }, )() - gcs_helper.gcs_upload(upload, bucket) + _uri, _blob_name, created = gcs_helper.gcs_upload(upload, bucket) assert bucket._blob.upload_calls == 0 + assert created is False def test_make_blob_name_and_uri_rewinds_file_after_hashing(): From a28a38c417a77db103dfe587a3245feeac811637 Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 8 Jun 2026 11:36:17 -0600 Subject: [PATCH 3/5] fix: move session.refresh outside blob-cleanup try block session.refresh(asset) runs AFTER session.commit() has succeeded. Keeping it inside the cleanup except block meant a transient read failure during refresh would trigger blob deletion while the Asset row was already committed, leaving the DB pointing at a missing GCS object. Move refresh outside the try so cleanup is scoped to pre-commit failures only. Co-Authored-By: Claude Opus 4.7 --- api/asset.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/api/asset.py b/api/asset.py index 27742f86..68b500d5 100644 --- a/api/asset.py +++ b/api/asset.py @@ -285,15 +285,17 @@ async def upload_and_record_asset( assoc.thing = thing assoc.asset = asset - # If the DB write fails, roll back. Only delete the blob if this request - # actually created it AND no Asset row references it after rollback; - # otherwise we would orphan another Thing's Asset that shares the same - # hash-named blob (gcs_upload deduplicates by content hash). + # If the write fails BEFORE commit, roll back. Only delete the blob if + # this request actually created it AND no Asset row references it after + # rollback; otherwise we would orphan another Thing's Asset that shares + # the same hash-named blob (gcs_upload deduplicates by content hash). + # session.refresh() is intentionally outside the cleanup block: it runs + # AFTER the commit succeeded, so a refresh failure must not delete the + # blob — the committed Asset row would then point at a missing object. try: session.add(asset) session.add(assoc) session.commit() - session.refresh(asset) except Exception: session.rollback() if blob_created_by_request: @@ -316,6 +318,7 @@ async def upload_and_record_asset( ) raise + session.refresh(asset) return asset From 2408414ffc30f509a9f2fd6e15516dc23605c455 Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 8 Jun 2026 11:37:48 -0600 Subject: [PATCH 4/5] fix: harden asset cleanup against rollback and reference-check failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wrap session.rollback() in its own try/except. A rollback failure is logged but no longer masks the original DB exception, which is re-raised at the end of the handler. - Wrap the reference-count query in its own try/except. If the SELECT itself fails after rollback, we cannot confirm the blob is unreferenced, so default to keeping it (sentinel object → not None). Better to leave a possible orphan blob than delete one another Asset still points at. Co-Authored-By: Claude Opus 4.7 --- api/asset.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/api/asset.py b/api/asset.py index 68b500d5..e6a80a0e 100644 --- a/api/asset.py +++ b/api/asset.py @@ -297,11 +297,30 @@ async def upload_and_record_asset( session.add(assoc) session.commit() except Exception: - session.rollback() + # Best-effort rollback. If rollback itself raises, log and continue + # so the original DB exception still propagates as the request error. + try: + session.rollback() + except Exception: + logger.exception( + "session.rollback() failed after asset commit failure; " + "original exception will still be re-raised" + ) if blob_created_by_request: - still_referenced = session.scalars( - select(Asset).where(Asset.storage_path == blob_name) - ).first() + # Reference check is itself best-effort: if it raises, do NOT + # delete the blob (we cannot confirm it is unreferenced), just + # log and leave it in place. + try: + still_referenced = session.scalars( + select(Asset).where(Asset.storage_path == blob_name) + ).first() + except Exception: + logger.warning( + "Could not verify blob references; skipping cleanup for %s", + blob_name, + exc_info=True, + ) + still_referenced = object() # sentinel: assume referenced if still_referenced is None: try: await run_in_threadpool(bucket.blob(blob_name).delete) From 6d452a88e64bf7d1104b49dc41bbd156a5bfe4b0 Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 8 Jun 2026 11:42:19 -0600 Subject: [PATCH 5/5] test/fix: wrap entire asset-upload cleanup path and cover with tests Wrap the cleanup block in upload_and_record_asset in one outer try/except so any failure inside cleanup (rollback, reference query, bucket.blob(), delete) is logged but cannot mask the original commit exception that the request needs to surface. Add three direct-call tests in tests/test_asset.py that mock the session and bucket: - test_upload_and_record_cleans_blob_when_created_and_db_fails: asserts rollback runs, blob.delete is called, and the original commit exception is re-raised. - test_upload_and_record_skips_blob_delete_when_blob_was_preexisting: with gcs_upload reporting created=False (hash dedup hit), the cleanup must NOT delete the (possibly shared) blob. - test_upload_and_record_reraises_original_exception_when_rollback_fails: rollback raising must not replace the original commit exception. Co-Authored-By: Claude Opus 4.7 --- api/asset.py | 63 +++++++++++---------- tests/test_asset.py | 132 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 29 deletions(-) diff --git a/api/asset.py b/api/asset.py index e6a80a0e..f56f3f3d 100644 --- a/api/asset.py +++ b/api/asset.py @@ -297,44 +297,51 @@ async def upload_and_record_asset( session.add(assoc) session.commit() except Exception: - # Best-effort rollback. If rollback itself raises, log and continue - # so the original DB exception still propagates as the request error. + # Entire cleanup path is wrapped in one outer try/except so NOTHING + # in here (rollback, reference query, bucket.blob(), delete) can + # mask the original commit exception. Per-step try/excepts below + # produce finer-grained log messages. try: - session.rollback() - except Exception: - logger.exception( - "session.rollback() failed after asset commit failure; " - "original exception will still be re-raised" - ) - if blob_created_by_request: - # Reference check is itself best-effort: if it raises, do NOT - # delete the blob (we cannot confirm it is unreferenced), just - # log and leave it in place. try: - still_referenced = session.scalars( - select(Asset).where(Asset.storage_path == blob_name) - ).first() + session.rollback() except Exception: - logger.warning( - "Could not verify blob references; skipping cleanup for %s", - blob_name, - exc_info=True, + logger.exception( + "session.rollback() failed after asset commit failure; " + "original exception will still be re-raised" ) - still_referenced = object() # sentinel: assume referenced - if still_referenced is None: + if blob_created_by_request: + # Reference check is best-effort: if it raises, do NOT + # delete the blob (we cannot confirm it is unreferenced). try: - await run_in_threadpool(bucket.blob(blob_name).delete) + still_referenced = session.scalars( + select(Asset).where(Asset.storage_path == blob_name) + ).first() except Exception: logger.warning( - "Failed to clean up uploaded blob after DB failure: %s", + "Could not verify blob references; skipping cleanup for %s", blob_name, exc_info=True, ) - else: - logger.info( - "Skipping blob cleanup; another Asset still references %s", - blob_name, - ) + still_referenced = object() # sentinel: assume referenced + if still_referenced is None: + try: + await run_in_threadpool(bucket.blob(blob_name).delete) + except Exception: + logger.warning( + "Failed to clean up uploaded blob after DB failure: %s", + blob_name, + exc_info=True, + ) + else: + logger.info( + "Skipping blob cleanup; another Asset still references %s", + blob_name, + ) + except Exception: + logger.exception( + "Unexpected error during asset upload cleanup; original " + "commit exception will still be re-raised" + ) raise session.refresh(asset) diff --git a/tests/test_asset.py b/tests/test_asset.py index cbac1e0e..d7ee0289 100644 --- a/tests/test_asset.py +++ b/tests/test_asset.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import asyncio import io import logging import os from datetime import timezone -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -521,4 +522,133 @@ def test_upload_and_record_asset_file_too_large(water_well_thing): assert data["detail"][0]["type"] == "value_error" +def _run_upload_and_record_with_failing_commit(blob_created_by_request): + """ + Drive upload_and_record_asset with a mocked session whose commit() raises. + Returns (raised_exc, session_mock, bucket_mock, blob_mock). + """ + from api.asset import upload_and_record_asset + + commit_error = RuntimeError("simulated commit failure") + + session_mock = MagicMock() + session_mock.get.return_value = MagicMock(id=1) # Thing exists + session_mock.commit.side_effect = commit_error + # No Asset row references blob_name after rollback. + scalar_result = MagicMock() + scalar_result.first.return_value = None + session_mock.scalars.return_value = scalar_result + + blob_mock = MagicMock() + bucket_mock = MagicMock() + bucket_mock.blob.return_value = blob_mock + + file_mock = MagicMock() + file_mock.content_type = "image/png" + file_mock.filename = "x.png" + file_mock.size = 10 + + with ( + patch( + "services.gcs_helper.gcs_upload", + return_value=("uri://blob", "x_hash.png", blob_created_by_request), + ), + patch("services.gcs_helper.check_asset_exists", return_value=None), + ): + try: + asyncio.run( + upload_and_record_asset( + user={"name": "test", "sub": "1"}, + session=session_mock, + bucket=bucket_mock, + file=file_mock, + thing_id=1, + label=None, + name=None, + ) + ) + raised = None + except Exception as e: + raised = e + + return raised, commit_error, session_mock, bucket_mock, blob_mock + + +def test_upload_and_record_cleans_blob_when_created_and_db_fails(): + """ + DB commit failure path: rollback runs, blob.delete is called, original + commit exception is re-raised (cleanup must not mask it). + """ + raised, commit_error, session, bucket, blob = ( + _run_upload_and_record_with_failing_commit(blob_created_by_request=True) + ) + + assert raised is commit_error + session.rollback.assert_called_once() + bucket.blob.assert_called_with("x_hash.png") + blob.delete.assert_called_once() + + +def test_upload_and_record_skips_blob_delete_when_blob_was_preexisting(): + """ + When gcs_upload reports created=False (hash dedup hit a pre-existing + blob possibly shared with other Things' Assets), the cleanup path must + NOT delete the blob even though the DB write failed. + """ + raised, commit_error, session, bucket, blob = ( + _run_upload_and_record_with_failing_commit(blob_created_by_request=False) + ) + + assert raised is commit_error + session.rollback.assert_called_once() + blob.delete.assert_not_called() + + +def test_upload_and_record_reraises_original_exception_when_rollback_fails(): + """ + A rollback failure inside the cleanup handler must not replace the + original commit exception that the caller cares about. + """ + from api.asset import upload_and_record_asset + + commit_error = RuntimeError("simulated commit failure") + rollback_error = RuntimeError("rollback also failed") + + session_mock = MagicMock() + session_mock.get.return_value = MagicMock(id=1) + session_mock.commit.side_effect = commit_error + session_mock.rollback.side_effect = rollback_error + scalar_result = MagicMock() + scalar_result.first.return_value = None + session_mock.scalars.return_value = scalar_result + + bucket_mock = MagicMock() + file_mock = MagicMock() + file_mock.content_type = "image/png" + file_mock.filename = "x.png" + file_mock.size = 10 + + with ( + patch( + "services.gcs_helper.gcs_upload", + return_value=("uri://blob", "x_hash.png", True), + ), + patch("services.gcs_helper.check_asset_exists", return_value=None), + ): + with pytest.raises(RuntimeError) as exc_info: + asyncio.run( + upload_and_record_asset( + user={"name": "test", "sub": "1"}, + session=session_mock, + bucket=bucket_mock, + file=file_mock, + thing_id=1, + label=None, + name=None, + ) + ) + + assert exc_info.value is commit_error + + # ============= EOF =============================================