From 866e27f1e8a5b6d8a69726c574cd3084ffa84629 Mon Sep 17 00:00:00 2001 From: "John R. D'Orazio" Date: Mon, 27 Apr 2026 02:00:21 +0200 Subject: [PATCH] fix(worker): use json.dumps for redis.publish payloads (#111) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces 11 hand-rolled f-string JSON payloads in worker.py with json.dumps({...}). The f-string approach worked by accident for fields holding only UUIDs and ints, but the *_failed handlers interpolated str(e) directly — exception messages routinely contain quotes and newlines (e.g. `'foo' has no attribute "bar"\nstacktrace`), which produced unparseable JSON and broke any subscriber that does json.loads on the payload. Affected handlers (worker.py): - lint_started / lint_complete / lint_failed - normalization_started / _complete / _failed / _status (×2) - remote_check_started / _complete / _failed The newer index_*, consistency_*, and duplicates_* handlers were already using json.dumps; this brings the rest in line. Channel names, message types, and field names are unchanged so existing subscribers continue to work without modification. Adds test_lint_failure_payload_is_valid_json_with_quoted_error: invokes run_lint_task with a RuntimeError whose message contains both `"` and `\n`, then asserts the published lint_failed payload round-trips through json.loads. Locks in the regression. Closes #111 Co-Authored-By: Claude Opus 4.7 (1M context) --- ontokit/worker.py | 95 ++++++++++++++++++++++++++++++--------- tests/unit/test_worker.py | 45 +++++++++++++++++++ 2 files changed, 120 insertions(+), 20 deletions(-) diff --git a/ontokit/worker.py b/ontokit/worker.py index d5793ff..4c66a9d 100644 --- a/ontokit/worker.py +++ b/ontokit/worker.py @@ -251,7 +251,13 @@ async def run_lint_task( # Notify via pubsub that lint has started await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_started", "project_id": "{project_id}", "run_id": "{run_id}"}}', + json.dumps( + { + "type": "lint_started", + "project_id": project_id, + "run_id": str(run_id), + } + ), ) # Load ontology from git or storage @@ -325,8 +331,14 @@ async def run_lint_task( # Notify via pubsub that lint is complete await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_complete", "project_id": "{project_id}", ' - f'"run_id": "{run_id}", "issues_found": {len(lint_results)}}}', + json.dumps( + { + "type": "lint_complete", + "project_id": project_id, + "run_id": str(run_id), + "issues_found": len(lint_results), + } + ), ) return { @@ -348,8 +360,14 @@ async def run_lint_task( # Notify via pubsub that lint failed await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_failed", "project_id": "{project_id}", ' - f'"run_id": "{run.id}", "error": "{str(e)}"}}', + json.dumps( + { + "type": "lint_failed", + "project_id": project_id, + "run_id": str(run.id), + "error": str(e), + } + ), ) raise @@ -395,8 +413,13 @@ async def check_normalization_status_task( # Publish status update via Redis await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_status", "project_id": "{project_id}", ' - f'"needs_normalization": {str(status["needs_normalization"]).lower()}}}', + json.dumps( + { + "type": "normalization_status", + "project_id": project_id, + "needs_normalization": status["needs_normalization"], + } + ), ) logger.info( @@ -455,8 +478,13 @@ async def run_normalization_task( # Notify start await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_started", "project_id": "{project_id}", ' - f'"dry_run": {str(dry_run).lower()}}}', + json.dumps( + { + "type": "normalization_started", + "project_id": project_id, + "dry_run": dry_run, + } + ), ) # Get project @@ -497,9 +525,15 @@ def __init__(self, uid: str | None, name: str | None, email: str | None): # Notify completion await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_complete", "project_id": "{project_id}", ' - f'"run_id": "{run.id}", "dry_run": {str(dry_run).lower()}, ' - f'"commit_hash": "{run.commit_hash or ""}"}}', + json.dumps( + { + "type": "normalization_complete", + "project_id": project_id, + "run_id": str(run.id), + "dry_run": dry_run, + "commit_hash": run.commit_hash or "", + } + ), ) return { @@ -516,8 +550,13 @@ def __init__(self, uid: str | None, name: str | None, email: str | None): # Notify failure await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_failed", "project_id": "{project_id}", ' - f'"error": "{str(e)}"}}', + json.dumps( + { + "type": "normalization_failed", + "project_id": project_id, + "error": str(e), + } + ), ) return { @@ -556,8 +595,13 @@ async def check_all_projects_normalization(ctx: dict[str, Any]) -> dict[str, Any # Publish status update await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_status", "project_id": "{project.id}", ' - f'"needs_normalization": true}}', + json.dumps( + { + "type": "normalization_status", + "project_id": str(project.id), + "needs_normalization": True, + } + ), ) except Exception as e: logger.warning(f"Failed to check normalization for project {project.id}: {e}") @@ -1021,7 +1065,7 @@ async def run_remote_check_task( # Notify start await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_started", "project_id": "{project_id}"}}', + json.dumps({"type": "remote_check_started", "project_id": project_id}), ) # Get a GitHub token — try the project's connected user first @@ -1111,8 +1155,13 @@ async def run_remote_check_task( # Notify completion await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_complete", "project_id": "{project_id}", ' - f'"has_changes": {str(has_changes).lower()}}}', + json.dumps( + { + "type": "remote_check_complete", + "project_id": project_id, + "has_changes": has_changes, + } + ), ) return { @@ -1148,7 +1197,13 @@ async def run_remote_check_task( # Notify failure await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_failed", "project_id": "{project_id}", "error": "{str(e)}"}}', + json.dumps( + { + "type": "remote_check_failed", + "project_id": project_id, + "error": str(e), + } + ), ) raise diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 720e39e..94ae52c 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -1165,6 +1165,51 @@ async def test_lint_failure_updates_run_and_publishes( # Published: start + failed assert mock_ctx["redis"].publish.await_count >= 2 + @pytest.mark.asyncio + async def test_lint_failure_payload_is_valid_json_with_quoted_error( + self, mock_ctx: dict[str, Any], project_id: str + ) -> None: + """Regression for #111: error messages containing quotes/newlines must + produce valid JSON (json.dumps escapes), not the f-string concatenation + that previously produced unparseable payloads.""" + import json + + project = Mock() + project.source_file_path = "ontokit/test.ttl" + mock_result = Mock() + mock_result.scalar_one_or_none.return_value = project + mock_ctx["db"].execute.return_value = mock_result + + mock_run = MagicMock() + mock_run.id = uuid.uuid4() + + # Realistic Python error message with both " and \n — would have + # produced invalid JSON under the old f-string code. + nasty_msg = "'foo' has no attribute \"bar\"\nstacktrace line" + + with ( + patch("ontokit.worker.get_storage_service"), + patch("ontokit.worker.get_ontology_service") as mock_onto_svc, + patch("ontokit.worker.LintRun", return_value=mock_run), + ): + mock_onto_svc.return_value.load_from_storage = AsyncMock( + side_effect=RuntimeError(nasty_msg) + ) + + with pytest.raises(RuntimeError): + await run_lint_task(mock_ctx, project_id) + + # Find the lint_failed publish call and verify its payload parses. + failed_calls = [ + c.args + for c in mock_ctx["redis"].publish.await_args_list + if len(c.args) >= 2 and '"lint_failed"' in c.args[1] + ] + assert failed_calls, "expected a lint_failed publish" + payload = json.loads(failed_calls[-1][1]) # must not raise + assert payload["type"] == "lint_failed" + assert payload["error"] == nasty_msg + # --------------------------------------------------------------------------- # check_normalization_status_task – exception path