Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 80 additions & 20 deletions ontokit/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ async def run_lint_task(
# Notify via pubsub that lint has started
await redis.publish(
LINT_UPDATES_CHANNEL,
f'{{"type": "lint_started", "project_id": "{project_id}", "run_id": "{run_id}"}}',
json.dumps(
{
"type": "lint_started",
"project_id": project_id,
"run_id": str(run_id),
}
),
)

# Load ontology from storage
Expand Down Expand Up @@ -246,8 +252,14 @@ async def run_lint_task(
# Notify via pubsub that lint is complete
await redis.publish(
LINT_UPDATES_CHANNEL,
f'{{"type": "lint_complete", "project_id": "{project_id}", '
f'"run_id": "{run_id}", "issues_found": {len(lint_results)}}}',
json.dumps(
{
"type": "lint_complete",
"project_id": project_id,
"run_id": str(run_id),
"issues_found": len(lint_results),
}
),
)

return {
Expand All @@ -269,8 +281,14 @@ async def run_lint_task(
# Notify via pubsub that lint failed
await redis.publish(
LINT_UPDATES_CHANNEL,
f'{{"type": "lint_failed", "project_id": "{project_id}", '
f'"run_id": "{run.id}", "error": "{str(e)}"}}',
json.dumps(
{
"type": "lint_failed",
"project_id": project_id,
"run_id": str(run.id),
"error": str(e),
}
),
)

raise
Expand Down Expand Up @@ -316,8 +334,13 @@ async def check_normalization_status_task(
# Publish status update via Redis
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_status", "project_id": "{project_id}", '
f'"needs_normalization": {str(status["needs_normalization"]).lower()}}}',
json.dumps(
{
"type": "normalization_status",
"project_id": project_id,
"needs_normalization": status["needs_normalization"],
}
),
)

logger.info(
Expand Down Expand Up @@ -376,8 +399,13 @@ async def run_normalization_task(
# Notify start
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_started", "project_id": "{project_id}", '
f'"dry_run": {str(dry_run).lower()}}}',
json.dumps(
{
"type": "normalization_started",
"project_id": project_id,
"dry_run": dry_run,
}
),
)

# Get project
Expand Down Expand Up @@ -418,9 +446,15 @@ def __init__(self, uid: str | None, name: str | None, email: str | None):
# Notify completion
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_complete", "project_id": "{project_id}", '
f'"run_id": "{run.id}", "dry_run": {str(dry_run).lower()}, '
f'"commit_hash": "{run.commit_hash or ""}"}}',
json.dumps(
{
"type": "normalization_complete",
"project_id": project_id,
"run_id": str(run.id),
"dry_run": dry_run,
"commit_hash": run.commit_hash or "",
}
),
)

return {
Expand All @@ -437,8 +471,13 @@ def __init__(self, uid: str | None, name: str | None, email: str | None):
# Notify failure
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_failed", "project_id": "{project_id}", '
f'"error": "{str(e)}"}}',
json.dumps(
{
"type": "normalization_failed",
"project_id": project_id,
"error": str(e),
}
),
)

return {
Expand Down Expand Up @@ -477,8 +516,13 @@ async def check_all_projects_normalization(ctx: dict[str, Any]) -> dict[str, Any
# Publish status update
await redis.publish(
NORMALIZATION_UPDATES_CHANNEL,
f'{{"type": "normalization_status", "project_id": "{project.id}", '
f'"needs_normalization": true}}',
json.dumps(
{
"type": "normalization_status",
"project_id": str(project.id),
"needs_normalization": True,
}
),
)
except Exception as e:
logger.warning(f"Failed to check normalization for project {project.id}: {e}")
Expand Down Expand Up @@ -699,7 +743,12 @@ async def run_remote_check_task(
# Notify start
await redis.publish(
REMOTE_SYNC_UPDATES_CHANNEL,
f'{{"type": "remote_check_started", "project_id": "{project_id}"}}',
json.dumps(
{
"type": "remote_check_started",
"project_id": project_id,
}
),
)

# Get a GitHub token — try the project's connected user first
Expand Down Expand Up @@ -789,8 +838,13 @@ async def run_remote_check_task(
# Notify completion
await redis.publish(
REMOTE_SYNC_UPDATES_CHANNEL,
f'{{"type": "remote_check_complete", "project_id": "{project_id}", '
f'"has_changes": {str(has_changes).lower()}}}',
json.dumps(
{
"type": "remote_check_complete",
"project_id": project_id,
"has_changes": has_changes,
}
),
)

return {
Expand Down Expand Up @@ -826,7 +880,13 @@ async def run_remote_check_task(
# Notify failure
await redis.publish(
REMOTE_SYNC_UPDATES_CHANNEL,
f'{{"type": "remote_check_failed", "project_id": "{project_id}", "error": "{str(e)}"}}',
json.dumps(
{
"type": "remote_check_failed",
"project_id": project_id,
"error": str(e),
}
),
)

raise
Expand Down
Loading