From ef5f1ebba700060fbf51f3bb470990d1079b48f8 Mon Sep 17 00:00:00 2001 From: "John R. D'Orazio" Date: Tue, 28 Apr 2026 07:55:10 +0200 Subject: [PATCH] fix(worker): use json.dumps for redis.publish payloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ARQ worker built Redis pubsub payloads via f-string-formatted JSON. This was fragile, and outright buggy in three places — `lint_failed`, `normalization_failed`, and `remote_check_failed` interpolated `str(e)` directly, so any exception message containing a `"`, `\`, or newline produced invalid JSON for subscribers calling `json.loads(payload)`. Replace every f-string JSON payload with `json.dumps({...})` (json was already imported). Schema is preserved exactly: same `type` values, same field names and order, UUIDs as strings, booleans as real booleans (json.dumps emits `true`/`false`, replacing the prior `str(x).lower()` workaround), ints as ints. Subscribers are unaffected. Sites converted (11 total): - lint_started, lint_complete, lint_failed - normalization_status (status check task), normalization_started, normalization_complete, normalization_failed, normalization_status (per-project loop in cron) - remote_check_started, remote_check_complete, remote_check_failed Closes #111 Co-Authored-By: Claude Opus 4.7 (1M context) --- ontokit/worker.py | 100 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 20 deletions(-) diff --git a/ontokit/worker.py b/ontokit/worker.py index c8562469..86dc2ba9 100644 --- a/ontokit/worker.py +++ b/ontokit/worker.py @@ -202,7 +202,13 @@ async def run_lint_task( # Notify via pubsub that lint has started await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_started", "project_id": "{project_id}", "run_id": "{run_id}"}}', + json.dumps( + { + "type": "lint_started", + "project_id": project_id, + "run_id": str(run_id), + } + ), ) # Load ontology from storage @@ -246,8 +252,14 @@ async def run_lint_task( # Notify via pubsub that lint is complete await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_complete", "project_id": "{project_id}", ' - f'"run_id": "{run_id}", "issues_found": {len(lint_results)}}}', + json.dumps( + { + "type": "lint_complete", + "project_id": project_id, + "run_id": str(run_id), + "issues_found": len(lint_results), + } + ), ) return { @@ -269,8 +281,14 @@ async def run_lint_task( # Notify via pubsub that lint failed await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_failed", "project_id": "{project_id}", ' - f'"run_id": "{run.id}", "error": "{str(e)}"}}', + json.dumps( + { + "type": "lint_failed", + "project_id": project_id, + "run_id": str(run.id), + "error": str(e), + } + ), ) raise @@ -316,8 +334,13 @@ async def check_normalization_status_task( # Publish status update via Redis await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_status", "project_id": "{project_id}", ' - f'"needs_normalization": {str(status["needs_normalization"]).lower()}}}', + json.dumps( + { + "type": "normalization_status", + "project_id": project_id, + "needs_normalization": status["needs_normalization"], + } + ), ) logger.info( @@ -376,8 +399,13 @@ async def run_normalization_task( # Notify start await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_started", "project_id": "{project_id}", ' - f'"dry_run": {str(dry_run).lower()}}}', + json.dumps( + { + "type": "normalization_started", + "project_id": project_id, + "dry_run": dry_run, + } + ), ) # Get project @@ -418,9 +446,15 @@ def __init__(self, uid: str | None, name: str | None, email: str | None): # Notify completion await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_complete", "project_id": "{project_id}", ' - f'"run_id": "{run.id}", "dry_run": {str(dry_run).lower()}, ' - f'"commit_hash": "{run.commit_hash or ""}"}}', + json.dumps( + { + "type": "normalization_complete", + "project_id": project_id, + "run_id": str(run.id), + "dry_run": dry_run, + "commit_hash": run.commit_hash or "", + } + ), ) return { @@ -437,8 +471,13 @@ def __init__(self, uid: str | None, name: str | None, email: str | None): # Notify failure await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_failed", "project_id": "{project_id}", ' - f'"error": "{str(e)}"}}', + json.dumps( + { + "type": "normalization_failed", + "project_id": project_id, + "error": str(e), + } + ), ) return { @@ -477,8 +516,13 @@ async def check_all_projects_normalization(ctx: dict[str, Any]) -> dict[str, Any # Publish status update await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_status", "project_id": "{project.id}", ' - f'"needs_normalization": true}}', + json.dumps( + { + "type": "normalization_status", + "project_id": str(project.id), + "needs_normalization": True, + } + ), ) except Exception as e: logger.warning(f"Failed to check normalization for project {project.id}: {e}") @@ -699,7 +743,12 @@ async def run_remote_check_task( # Notify start await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_started", "project_id": "{project_id}"}}', + json.dumps( + { + "type": "remote_check_started", + "project_id": project_id, + } + ), ) # Get a GitHub token — try the project's connected user first @@ -789,8 +838,13 @@ async def run_remote_check_task( # Notify completion await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_complete", "project_id": "{project_id}", ' - f'"has_changes": {str(has_changes).lower()}}}', + json.dumps( + { + "type": "remote_check_complete", + "project_id": project_id, + "has_changes": has_changes, + } + ), ) return { @@ -826,7 +880,13 @@ async def run_remote_check_task( # Notify failure await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_failed", "project_id": "{project_id}", "error": "{str(e)}"}}', + json.dumps( + { + "type": "remote_check_failed", + "project_id": project_id, + "error": str(e), + } + ), ) raise