diff --git a/api/asset.py b/api/asset.py index 6de8e132..f56f3f3d 100644 --- a/api/asset.py +++ b/api/asset.py @@ -138,7 +138,7 @@ async def upload_asset( # GCS client calls are synchronous and can block for large uploads. request_started_at = time.perf_counter() - uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket) + uri, blob_name, _created = await run_in_threadpool(gcs_upload, file, bucket) if is_debug_timing_enabled(): logger.info( "asset upload request completed", @@ -255,7 +255,12 @@ async def upload_and_record_asset( ) # ── 4. Upload file to GCS (blocking I/O — run in thread pool) ──────────── - uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket) + # `created` is True only when this request actually wrote the blob — when + # gcs_upload deduplicates against an existing hash-named object it is + # False, meaning the blob is potentially shared by other Assets. + uri, blob_name, blob_created_by_request = await run_in_threadpool( + gcs_upload, file, bucket + ) # ── 5. Return existing record for duplicate file + thing combinations ───── existing = check_asset_exists(session, blob_name, thing_id=thing_id) @@ -280,11 +285,66 @@ async def upload_and_record_asset( assoc.thing = thing assoc.asset = asset - session.add(asset) - session.add(assoc) - session.commit() - session.refresh(asset) + # If the write fails BEFORE commit, roll back. Only delete the blob if + # this request actually created it AND no Asset row references it after + # rollback; otherwise we would orphan another Thing's Asset that shares + # the same hash-named blob (gcs_upload deduplicates by content hash). + # session.refresh() is intentionally outside the cleanup block: it runs + # AFTER the commit succeeded, so a refresh failure must not delete the + # blob — the committed Asset row would then point at a missing object. + try: + session.add(asset) + session.add(assoc) + session.commit() + except Exception: + # Entire cleanup path is wrapped in one outer try/except so NOTHING + # in here (rollback, reference query, bucket.blob(), delete) can + # mask the original commit exception. Per-step try/excepts below + # produce finer-grained log messages. + try: + try: + session.rollback() + except Exception: + logger.exception( + "session.rollback() failed after asset commit failure; " + "original exception will still be re-raised" + ) + if blob_created_by_request: + # Reference check is best-effort: if it raises, do NOT + # delete the blob (we cannot confirm it is unreferenced). + try: + still_referenced = session.scalars( + select(Asset).where(Asset.storage_path == blob_name) + ).first() + except Exception: + logger.warning( + "Could not verify blob references; skipping cleanup for %s", + blob_name, + exc_info=True, + ) + still_referenced = object() # sentinel: assume referenced + if still_referenced is None: + try: + await run_in_threadpool(bucket.blob(blob_name).delete) + except Exception: + logger.warning( + "Failed to clean up uploaded blob after DB failure: %s", + blob_name, + exc_info=True, + ) + else: + logger.info( + "Skipping blob cleanup; another Asset still references %s", + blob_name, + ) + except Exception: + logger.exception( + "Unexpected error during asset upload cleanup; original " + "commit exception will still be re-raised" + ) + raise + session.refresh(asset) return asset diff --git a/services/asset_helper.py b/services/asset_helper.py index 51a4654f..0fb6df8d 100644 --- a/services/asset_helper.py +++ b/services/asset_helper.py @@ -24,7 +24,7 @@ def upload_and_associate( name: str, **asset_args, ) -> tuple[str, str]: - uri, blob_name = gcs_upload(ff, bucket) + uri, blob_name, _created = gcs_upload(ff, bucket) asset = Asset( name=name, storage_path=blob_name, diff --git a/services/gcs_helper.py b/services/gcs_helper.py index da9ce606..a8bf0988 100644 --- a/services/gcs_helper.py +++ b/services/gcs_helper.py @@ -141,6 +141,7 @@ def gcs_upload(file: UploadFile, bucket=None): blob_exists=eblob is not None, ) + created = False if not eblob: blob = bucket.blob(blob_name) file.file.seek(0) @@ -150,6 +151,7 @@ def gcs_upload(file: UploadFile, bucket=None): content_type=file.content_type, timeout=GCS_UPLOAD_TIMEOUT_SECS, ) + created = True _log_stage( "upload_blob", upload_blob_started_at, @@ -162,7 +164,7 @@ def gcs_upload(file: UploadFile, bucket=None): filename=file.filename, blob_name=blob_name, ) - return uri, blob_name + return uri, blob_name, created def gcs_remove(uri: str, bucket): diff --git a/tests/test_asset.py b/tests/test_asset.py index 1d9ece4c..d7ee0289 100644 --- a/tests/test_asset.py +++ b/tests/test_asset.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import asyncio import io import logging import os from datetime import timezone -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -122,13 +123,14 @@ def test_gcs_upload_logs_stage_timings(caplog): with patch.dict(os.environ, {"API_DEBUG_TIMING": "true"}): with caplog.at_level(logging.INFO, logger="services.gcs_helper"): - uri, blob_name = gcs_helper.gcs_upload(upload, bucket) + uri, blob_name, created = gcs_helper.gcs_upload(upload, bucket) stage_logs = [ record for record in caplog.records if record.msg == "gcs stage timing" ] assert uri.endswith(blob_name) + assert created is True assert {record.stage for record in stage_logs} >= { "hash_file", "lookup_blob", @@ -150,9 +152,10 @@ def test_gcs_upload_skips_existing_blob(): }, )() - gcs_helper.gcs_upload(upload, bucket) + _uri, _blob_name, created = gcs_helper.gcs_upload(upload, bucket) assert bucket._blob.upload_calls == 0 + assert created is False def test_make_blob_name_and_uri_rewinds_file_after_hashing(): @@ -519,4 +522,133 @@ def test_upload_and_record_asset_file_too_large(water_well_thing): assert data["detail"][0]["type"] == "value_error" +def _run_upload_and_record_with_failing_commit(blob_created_by_request): + """ + Drive upload_and_record_asset with a mocked session whose commit() raises. + Returns (raised_exc, session_mock, bucket_mock, blob_mock). + """ + from api.asset import upload_and_record_asset + + commit_error = RuntimeError("simulated commit failure") + + session_mock = MagicMock() + session_mock.get.return_value = MagicMock(id=1) # Thing exists + session_mock.commit.side_effect = commit_error + # No Asset row references blob_name after rollback. + scalar_result = MagicMock() + scalar_result.first.return_value = None + session_mock.scalars.return_value = scalar_result + + blob_mock = MagicMock() + bucket_mock = MagicMock() + bucket_mock.blob.return_value = blob_mock + + file_mock = MagicMock() + file_mock.content_type = "image/png" + file_mock.filename = "x.png" + file_mock.size = 10 + + with ( + patch( + "services.gcs_helper.gcs_upload", + return_value=("uri://blob", "x_hash.png", blob_created_by_request), + ), + patch("services.gcs_helper.check_asset_exists", return_value=None), + ): + try: + asyncio.run( + upload_and_record_asset( + user={"name": "test", "sub": "1"}, + session=session_mock, + bucket=bucket_mock, + file=file_mock, + thing_id=1, + label=None, + name=None, + ) + ) + raised = None + except Exception as e: + raised = e + + return raised, commit_error, session_mock, bucket_mock, blob_mock + + +def test_upload_and_record_cleans_blob_when_created_and_db_fails(): + """ + DB commit failure path: rollback runs, blob.delete is called, original + commit exception is re-raised (cleanup must not mask it). + """ + raised, commit_error, session, bucket, blob = ( + _run_upload_and_record_with_failing_commit(blob_created_by_request=True) + ) + + assert raised is commit_error + session.rollback.assert_called_once() + bucket.blob.assert_called_with("x_hash.png") + blob.delete.assert_called_once() + + +def test_upload_and_record_skips_blob_delete_when_blob_was_preexisting(): + """ + When gcs_upload reports created=False (hash dedup hit a pre-existing + blob possibly shared with other Things' Assets), the cleanup path must + NOT delete the blob even though the DB write failed. + """ + raised, commit_error, session, bucket, blob = ( + _run_upload_and_record_with_failing_commit(blob_created_by_request=False) + ) + + assert raised is commit_error + session.rollback.assert_called_once() + blob.delete.assert_not_called() + + +def test_upload_and_record_reraises_original_exception_when_rollback_fails(): + """ + A rollback failure inside the cleanup handler must not replace the + original commit exception that the caller cares about. + """ + from api.asset import upload_and_record_asset + + commit_error = RuntimeError("simulated commit failure") + rollback_error = RuntimeError("rollback also failed") + + session_mock = MagicMock() + session_mock.get.return_value = MagicMock(id=1) + session_mock.commit.side_effect = commit_error + session_mock.rollback.side_effect = rollback_error + scalar_result = MagicMock() + scalar_result.first.return_value = None + session_mock.scalars.return_value = scalar_result + + bucket_mock = MagicMock() + file_mock = MagicMock() + file_mock.content_type = "image/png" + file_mock.filename = "x.png" + file_mock.size = 10 + + with ( + patch( + "services.gcs_helper.gcs_upload", + return_value=("uri://blob", "x_hash.png", True), + ), + patch("services.gcs_helper.check_asset_exists", return_value=None), + ): + with pytest.raises(RuntimeError) as exc_info: + asyncio.run( + upload_and_record_asset( + user={"name": "test", "sub": "1"}, + session=session_mock, + bucket=bucket_mock, + file=file_mock, + thing_id=1, + label=None, + name=None, + ) + ) + + assert exc_info.value is commit_error + + # ============= EOF =============================================