diff --git a/ontokit/worker.py b/ontokit/worker.py index c8562469..86dc2ba9 100644 --- a/ontokit/worker.py +++ b/ontokit/worker.py @@ -202,7 +202,13 @@ async def run_lint_task( # Notify via pubsub that lint has started await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_started", "project_id": "{project_id}", "run_id": "{run_id}"}}', + json.dumps( + { + "type": "lint_started", + "project_id": project_id, + "run_id": str(run_id), + } + ), ) # Load ontology from storage @@ -246,8 +252,14 @@ async def run_lint_task( # Notify via pubsub that lint is complete await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_complete", "project_id": "{project_id}", ' - f'"run_id": "{run_id}", "issues_found": {len(lint_results)}}}', + json.dumps( + { + "type": "lint_complete", + "project_id": project_id, + "run_id": str(run_id), + "issues_found": len(lint_results), + } + ), ) return { @@ -269,8 +281,14 @@ async def run_lint_task( # Notify via pubsub that lint failed await redis.publish( LINT_UPDATES_CHANNEL, - f'{{"type": "lint_failed", "project_id": "{project_id}", ' - f'"run_id": "{run.id}", "error": "{str(e)}"}}', + json.dumps( + { + "type": "lint_failed", + "project_id": project_id, + "run_id": str(run.id), + "error": str(e), + } + ), ) raise @@ -316,8 +334,13 @@ async def check_normalization_status_task( # Publish status update via Redis await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_status", "project_id": "{project_id}", ' - f'"needs_normalization": {str(status["needs_normalization"]).lower()}}}', + json.dumps( + { + "type": "normalization_status", + "project_id": project_id, + "needs_normalization": status["needs_normalization"], + } + ), ) logger.info( @@ -376,8 +399,13 @@ async def run_normalization_task( # Notify start await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_started", "project_id": "{project_id}", ' - f'"dry_run": {str(dry_run).lower()}}}', + json.dumps( + { + "type": "normalization_started", + "project_id": project_id, + "dry_run": dry_run, + } + ), ) # Get project @@ -418,9 +446,15 @@ def __init__(self, uid: str | None, name: str | None, email: str | None): # Notify completion await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_complete", "project_id": "{project_id}", ' - f'"run_id": "{run.id}", "dry_run": {str(dry_run).lower()}, ' - f'"commit_hash": "{run.commit_hash or ""}"}}', + json.dumps( + { + "type": "normalization_complete", + "project_id": project_id, + "run_id": str(run.id), + "dry_run": dry_run, + "commit_hash": run.commit_hash or "", + } + ), ) return { @@ -437,8 +471,13 @@ def __init__(self, uid: str | None, name: str | None, email: str | None): # Notify failure await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_failed", "project_id": "{project_id}", ' - f'"error": "{str(e)}"}}', + json.dumps( + { + "type": "normalization_failed", + "project_id": project_id, + "error": str(e), + } + ), ) return { @@ -477,8 +516,13 @@ async def check_all_projects_normalization(ctx: dict[str, Any]) -> dict[str, Any # Publish status update await redis.publish( NORMALIZATION_UPDATES_CHANNEL, - f'{{"type": "normalization_status", "project_id": "{project.id}", ' - f'"needs_normalization": true}}', + json.dumps( + { + "type": "normalization_status", + "project_id": str(project.id), + "needs_normalization": True, + } + ), ) except Exception as e: logger.warning(f"Failed to check normalization for project {project.id}: {e}") @@ -699,7 +743,12 @@ async def run_remote_check_task( # Notify start await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_started", "project_id": "{project_id}"}}', + json.dumps( + { + "type": "remote_check_started", + "project_id": project_id, + } + ), ) # Get a GitHub token — try the project's connected user first @@ -789,8 +838,13 @@ async def run_remote_check_task( # Notify completion await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_complete", "project_id": "{project_id}", ' - f'"has_changes": {str(has_changes).lower()}}}', + json.dumps( + { + "type": "remote_check_complete", + "project_id": project_id, + "has_changes": has_changes, + } + ), ) return { @@ -826,7 +880,13 @@ async def run_remote_check_task( # Notify failure await redis.publish( REMOTE_SYNC_UPDATES_CHANNEL, - f'{{"type": "remote_check_failed", "project_id": "{project_id}", "error": "{str(e)}"}}', + json.dumps( + { + "type": "remote_check_failed", + "project_id": project_id, + "error": str(e), + } + ), ) raise