Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 75 additions & 20 deletions ontokit/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,13 @@ async def run_lint_task(
# Notify via pubsub that lint has started
await redis.publish(
LINT_UPDATES_CHANNEL,
f'{{"type": "lint_started", "project_id": "{project_id}", "run_id": "{run_id}"}}',
json.dumps(
{
"type": "lint_started",
"project_id": project_id,
"run_id": str(run_id),
}
),
)

# Load ontology from git or storage
Expand Down Expand Up @@ -325,8 +331,14 @@ async def run_lint_task(
# Notify via pubsub that lint is complete
await redis.publish(
LINT_UPDATES_CHANNEL,
f'{{"type": "lint_complete", "project_id": "{project_id}", '
f'"run_id": "{run_id}", "issues_found": {len(lint_results)}}}',
json.dumps(
{
"type": "lint_complete",
"project_id": project_id,
"run_id": str(run_id),
"issues_found": len(lint_results),
}
),
)

return {
Expand All @@ -348,8 +360,14 @@ async def run_lint_task(
# Notify via pubsub that lint failed
await redis.publish(
LINT_UPDATES_CHANNEL,
f'{{"type": "lint_failed", "project_id": "{project_id}", '
f'"run_id": "{run.id}", "error": "{str(e)}"}}',
json.dumps(
{
"type": "lint_failed",
"project_id": project_id,
"run_id": str(run.id),
"error": str(e),
}
),
)

raise
Expand Down Expand Up @@ -395,8 +413,13 @@ async def check_normalization_status_task(
# Publish status update via Redis
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_status", "project_id": "{project_id}", '
f'"needs_normalization": {str(status["needs_normalization"]).lower()}}}',
json.dumps(
{
"type": "normalization_status",
"project_id": project_id,
"needs_normalization": status["needs_normalization"],
}
),
)

logger.info(
Expand Down Expand Up @@ -455,8 +478,13 @@ async def run_normalization_task(
# Notify start
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_started", "project_id": "{project_id}", '
f'"dry_run": {str(dry_run).lower()}}}',
json.dumps(
{
"type": "normalization_started",
"project_id": project_id,
"dry_run": dry_run,
}
),
)

# Get project
Expand Down Expand Up @@ -497,9 +525,15 @@ def __init__(self, uid: str | None, name: str | None, email: str | None):
# Notify completion
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_complete", "project_id": "{project_id}", '
f'"run_id": "{run.id}", "dry_run": {str(dry_run).lower()}, '
f'"commit_hash": "{run.commit_hash or ""}"}}',
json.dumps(
{
"type": "normalization_complete",
"project_id": project_id,
"run_id": str(run.id),
"dry_run": dry_run,
"commit_hash": run.commit_hash or "",
}
),
)

return {
Expand All @@ -516,8 +550,13 @@ def __init__(self, uid: str | None, name: str | None, email: str | None):
# Notify failure
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_failed", "project_id": "{project_id}", '
f'"error": "{str(e)}"}}',
json.dumps(
{
"type": "normalization_failed",
"project_id": project_id,
"error": str(e),
}
),
)

return {
Expand Down Expand Up @@ -556,8 +595,13 @@ async def check_all_projects_normalization(ctx: dict[str, Any]) -> dict[str, Any
# Publish status update
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_status", "project_id": "{project.id}", '
f'"needs_normalization": true}}',
json.dumps(
{
"type": "normalization_status",
"project_id": str(project.id),
"needs_normalization": True,
}
),
)
except Exception as e:
logger.warning(f"Failed to check normalization for project {project.id}: {e}")
Expand Down Expand Up @@ -1021,7 +1065,7 @@ async def run_remote_check_task(
# Notify start
await redis.publish(
REMOTE_SYNC_UPDATES_CHANNEL,
f'{{"type": "remote_check_started", "project_id": "{project_id}"}}',
json.dumps({"type": "remote_check_started", "project_id": project_id}),
)

# Get a GitHub token — try the project's connected user first
Expand Down Expand Up @@ -1111,8 +1155,13 @@ async def run_remote_check_task(
# Notify completion
await redis.publish(
REMOTE_SYNC_UPDATES_CHANNEL,
f'{{"type": "remote_check_complete", "project_id": "{project_id}", '
f'"has_changes": {str(has_changes).lower()}}}',
json.dumps(
{
"type": "remote_check_complete",
"project_id": project_id,
"has_changes": has_changes,
}
),
)

return {
Expand Down Expand Up @@ -1148,7 +1197,13 @@ async def run_remote_check_task(
# Notify failure
await redis.publish(
REMOTE_SYNC_UPDATES_CHANNEL,
f'{{"type": "remote_check_failed", "project_id": "{project_id}", "error": "{str(e)}"}}',
json.dumps(
{
"type": "remote_check_failed",
"project_id": project_id,
"error": str(e),
}
),
)

raise
Expand Down
45 changes: 45 additions & 0 deletions tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,51 @@ async def test_lint_failure_updates_run_and_publishes(
# Published: start + failed
assert mock_ctx["redis"].publish.await_count >= 2

@pytest.mark.asyncio
async def test_lint_failure_payload_is_valid_json_with_quoted_error(
self, mock_ctx: dict[str, Any], project_id: str
) -> None:
"""Regression for #111: error messages containing quotes/newlines must
produce valid JSON (json.dumps escapes), not the f-string concatenation
that previously produced unparseable payloads."""
import json

project = Mock()
project.source_file_path = "ontokit/test.ttl"
mock_result = Mock()
mock_result.scalar_one_or_none.return_value = project
mock_ctx["db"].execute.return_value = mock_result

mock_run = MagicMock()
mock_run.id = uuid.uuid4()

# Realistic Python error message with both " and \n — would have
# produced invalid JSON under the old f-string code.
nasty_msg = "'foo' has no attribute \"bar\"\nstacktrace line"

with (
patch("ontokit.worker.get_storage_service"),
patch("ontokit.worker.get_ontology_service") as mock_onto_svc,
patch("ontokit.worker.LintRun", return_value=mock_run),
):
mock_onto_svc.return_value.load_from_storage = AsyncMock(
side_effect=RuntimeError(nasty_msg)
)

with pytest.raises(RuntimeError):
await run_lint_task(mock_ctx, project_id)

# Find the lint_failed publish call and verify its payload parses.
failed_calls = [
c.args
for c in mock_ctx["redis"].publish.await_args_list
if len(c.args) >= 2 and '"lint_failed"' in c.args[1]
]
assert failed_calls, "expected a lint_failed publish"
payload = json.loads(failed_calls[-1][1]) # must not raise
assert payload["type"] == "lint_failed"
assert payload["error"] == nasty_msg


# ---------------------------------------------------------------------------
# check_normalization_status_task – exception path
Expand Down
Loading