Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Each analysis generates an HTML report documenting annotation decisions, reviewe
<img width="1000" alt="CyteType HTML report showing cell type annotations marker genes" src="https://github.com/user-attachments/assets/e5373fdd-7173-42db-b863-76a1e8ecfe01" />


[View example report](https://prod.cytetype.nygen.io/report/e70e2883-7713-4121-94f2-5b57eabd1468?v=260303)
[View example report](https://cytetype.nygen.io/report/e70e2883-7713-4121-94f2-5b57eabd1468?v=260303)

---

Expand Down
2 changes: 1 addition & 1 deletion cytetype/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.19.3"
__version__ = "0.19.4"

import requests

Expand Down
48 changes: 27 additions & 21 deletions cytetype/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,25 @@ def fetch_job_results(
def _sleep_with_spinner(
seconds: int,
progress: ProgressDisplay | None,
cluster_status: dict[str, str],
job_status: str,
) -> None:
"""Sleep for specified seconds while updating spinner animation.

Args:
seconds: Number of seconds to sleep
progress: ProgressDisplay instance (if showing progress)
cluster_status: Current cluster status for display
job_status: Current overall job status for display
"""
for _ in range(seconds * 2):
if progress:
progress.update(cluster_status)
progress.update(job_status)
time.sleep(0.5)


def _log_report_cta(report_url: str) -> None:
logger.info(f"\n[TRACK PROGRESS]\n{report_url}")


def wait_for_completion(
base_url: str,
auth_token: str | None,
Expand All @@ -401,26 +405,26 @@ def wait_for_completion(
"""Poll job until completion and return results."""
progress = ProgressDisplay() if show_progress else None
start_time = time.time()
report_url = f"{base_url.rstrip('/')}/report/{job_id}"

logger.info(f"CyteType job (id: {job_id}) submitted. Polling for results...")

# Initial delay
time.sleep(5)

# Show report URL
report_url = f"{base_url}/report/{job_id}"
logger.info(f"Report (updates automatically) available at: {report_url}")
logger.info("CyteType job submitted.")
logger.info(
"If network disconnects, the results can still be fetched:\n"
"If your session disconnects, results can still be fetched later with:\n"
"`results = annotator.get_results()`"
)
_log_report_cta(report_url)

# Initial delay
time.sleep(5)

consecutive_not_found = 0
job_status = "pending"
cluster_status: dict[str, str] = {}

while (time.time() - start_time) < timeout:
try:
status_data = get_job_status(base_url, auth_token, job_id)
job_status = status_data.get("jobStatus")
job_status = str(status_data.get("jobStatus") or "")
cluster_status = status_data.get("clusterStatus", {})

# Reset 404 counter on valid response
Expand All @@ -429,20 +433,21 @@ def wait_for_completion(

if job_status == "completed":
if progress:
progress.finalize(cluster_status)
progress.finalize("completed", cluster_status)
logger.success(f"Job {job_id} completed successfully.")
return fetch_job_results(base_url, auth_token, job_id)

elif job_status == "failed":
if progress:
progress.finalize(cluster_status)
progress.finalize("failed", cluster_status)
logger.info(f"Report:\n{report_url}")
raise JobFailedError(f"Job {job_id} failed")

elif job_status in ["processing", "pending"]:
logger.debug(
f"Job {job_id} status: {job_status}. Waiting {poll_interval}s..."
)
_sleep_with_spinner(poll_interval, progress, cluster_status)
_sleep_with_spinner(poll_interval, progress, job_status)

elif job_status == "not_found":
consecutive_not_found += 1
Expand All @@ -459,24 +464,25 @@ def wait_for_completion(
f"Status endpoint not ready for job {job_id}. "
f"Waiting {poll_interval}s..."
)
_sleep_with_spinner(poll_interval, progress, cluster_status)
_sleep_with_spinner(poll_interval, progress, job_status)

else:
logger.warning(f"Unknown job status: '{job_status}'. Continuing...")
_sleep_with_spinner(poll_interval, progress, cluster_status)
_sleep_with_spinner(poll_interval, progress, job_status)

except APIError:
# Let API errors (auth, etc.) bubble up immediately
if progress:
progress.finalize({})
progress.finalize()
raise
except Exception as e:
# Network errors - log and retry
logger.debug(f"Error during polling: {e}. Retrying...")
retry_interval = min(poll_interval, 5)
_sleep_with_spinner(retry_interval, progress, cluster_status)
_sleep_with_spinner(retry_interval, progress, job_status)

# Timeout reached
if progress:
progress.finalize({})
progress.finalize("timed_out")
logger.info(f"Report:\n{report_url}")
raise TimeoutError(f"Job {job_id} did not complete within {timeout}s")
226 changes: 138 additions & 88 deletions cytetype/api/progress.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,166 @@
import sys
import time
from html import escape
from typing import Any, Callable, TextIO, cast


def _in_notebook() -> bool:
try:
from IPython import get_ipython
except ImportError:
return False

shell_getter = cast(Callable[[], Any | None], get_ipython)
shell = shell_getter()
return bool(shell and shell.__class__.__name__ == "ZMQInteractiveShell")


def _create_notebook_display_handle(message: str) -> Any | None:
try:
from IPython.display import display
except ImportError:
return None

_display: Callable[..., Any] = display
return _display(_render_notebook_message(message), display_id=True)


def _render_notebook_message(message: str) -> Any:
try:
from IPython.display import HTML
except ImportError:
return message

html_cls = cast(Callable[[str], Any], HTML)
return html_cls(
"<pre style='margin: 0; white-space: pre-wrap; font-family: monospace;'>"
f"{escape(message)}"
"</pre>"
)


class ProgressDisplay:
"""Manages terminal progress display during job polling."""

# Class constants
COLORS = {
"completed": "\033[92m",
"processing": "\033[93m",
"pending": "\033[94m",
"failed": "\033[91m",
"reset": "\033[0m",
}
SYMBOLS = {"completed": "✓", "processing": "⟳", "pending": "○", "failed": "✗"}
"""Manages terminal and notebook progress display during job polling."""

COLORS = {"failed": "\033[91m", "reset": "\033[0m"}
SPINNER_CHARS = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]

def __init__(self) -> None:
def __init__(self, stream: TextIO | None = None) -> None:
self.stream = stream or sys.stdout
self._interactive = bool(
hasattr(self.stream, "isatty") and self.stream.isatty()
)
self._use_notebook_display = not self._interactive and _in_notebook()
self._display_handle: Any | None = None
self._finalized = False
self._last_plain_status: str | None = None
self._start_time = time.monotonic()
self.spinner_frame = 0
self.last_status: dict[str, str] = {}

def update(self, cluster_status: dict[str, str]) -> None:
"""Update progress display with current cluster status."""
if not cluster_status:
def update(self, job_status: str) -> None:
"""Update progress display with the overall job status."""
if self._finalized:
return

# Always render to keep spinner animating
self._render(cluster_status, is_final=False)

# Track last status for potential future use
if cluster_status != self.last_status:
self.last_status = cluster_status.copy()
if self._interactive:
message = self._build_running_line(job_status)
print(f"\r{message}\033[K", end="", file=self.stream, flush=True)
elif self._use_notebook_display:
self._update_notebook_display(self._build_running_line(job_status))
else:
message = self._build_plain_line(job_status)
if message != self._last_plain_status:
print(message, file=self.stream, flush=True)
self._last_plain_status = message

# Always increment spinner to show activity
self.spinner_frame += 1

def finalize(self, cluster_status: dict[str, str]) -> None:
def finalize(
self,
final_status: str | None = None,
cluster_status: dict[str, str] | None = None,
) -> None:
"""Show final status and cleanup."""
if cluster_status:
self._render(cluster_status, is_final=True)
print() # Ensure newline

def _render(self, cluster_status: dict[str, str], is_final: bool) -> None:
"""Render status to terminal."""
status_counts = self._count_statuses(cluster_status)
progress_bar = self._build_progress_bar(cluster_status)
status_line = self._build_status_line(
progress_bar, status_counts, is_final=is_final
)
if self._finalized:
return
self._finalized = True

# Print status line
if is_final:
print(f"\r{status_line}{self.COLORS['reset']}")
sys.stdout.flush()
self._show_failed_clusters(cluster_status, status_counts["failed"])
else:
print(f"\r{status_line}{self.COLORS['reset']}", end="", flush=True)

def _count_statuses(self, cluster_status: dict[str, str]) -> dict[str, int]:
"""Count occurrences of each status."""
counts = {"completed": 0, "failed": 0}
for status in cluster_status.values():
counts[status] = counts.get(status, 0) + 1
return counts

def _build_progress_bar(self, cluster_status: dict[str, str]) -> str:
"""Build colored progress bar from cluster statuses."""
progress_units = []
for cluster_id in self._sorted_cluster_ids(cluster_status):
status = cluster_status[cluster_id]
color = self.COLORS.get(status, self.COLORS["reset"])
symbol = self.SYMBOLS.get(status, "?")
progress_units.append(f"{color}{symbol}{self.COLORS['reset']}")
return "".join(progress_units)

def _build_status_line(
self, progress_bar: str, counts: dict[str, int], is_final: bool
) -> str:
"""Build status line with progress bar and counts."""
total = sum(counts.values())
completed = counts["completed"]

if is_final:
status_line = f"[DONE] [{progress_bar}] {completed}/{total}"
if counts["failed"] > 0:
status_line += f" ({counts['failed']} failed)"
elif completed == total:
status_line += " completed"
if final_status is None:
if self._interactive:
print(file=self.stream, flush=True)
return

message = self._build_final_line(final_status)
if self._interactive:
print(f"\r{message}\033[K", file=self.stream, flush=True)
elif self._use_notebook_display:
self._update_notebook_display(message)
else:
spinner = self.SPINNER_CHARS[self.spinner_frame % len(self.SPINNER_CHARS)]
status_line = f"{spinner} [{progress_bar}] {completed}/{total} completed"
print(message, file=self.stream, flush=True)

if final_status == "failed" and cluster_status:
self._show_failed_clusters(cluster_status)

def _update_notebook_display(self, message: str) -> None:
"""Update a single notebook output cell instead of printing many lines."""
if self._display_handle is None:
self._display_handle = _create_notebook_display_handle(message)
if self._display_handle is None:
self._use_notebook_display = False
print(message, file=self.stream, flush=True)
return

return status_line
self._display_handle.update(_render_notebook_message(message))

def _show_failed_clusters(
self, cluster_status: dict[str, str], failed_count: int
) -> None:
"""Show details of failed clusters."""
if failed_count == 0:
return
def _build_running_line(self, job_status: str) -> str:
spinner = self.SPINNER_CHARS[self.spinner_frame % len(self.SPINNER_CHARS)]
elapsed = self._format_elapsed()
return f"{spinner} {self._status_message(job_status)} {elapsed} elapsed"

def _build_plain_line(self, job_status: str) -> str:
return self._status_message(job_status)

@staticmethod
def _build_final_line(final_status: str) -> str:
if final_status == "completed":
return "[DONE] CyteType job completed."
if final_status == "failed":
return "[FAILED] CyteType job failed."
if final_status == "timed_out":
return "[TIMEOUT] CyteType job timed out."
return "[STOPPED] CyteType job stopped."

@staticmethod
def _status_message(job_status: str) -> str:
if job_status == "pending":
return "CyteType job queued..."
if job_status == "processing":
return "CyteType job running..."
if job_status == "not_found":
return "Waiting for CyteType job to start..."
return "Waiting for CyteType results..."

def _format_elapsed(self) -> str:
elapsed = int(time.monotonic() - self._start_time)
minutes, seconds = divmod(elapsed, 60)
return f"{minutes:02d}:{seconds:02d}"

def _show_failed_clusters(self, cluster_status: dict[str, str]) -> None:
"""Show details of failed clusters."""
failed_details = []
for cluster_id in self._sorted_cluster_ids(cluster_status):
if cluster_status[cluster_id] == "failed":
color = self.COLORS["failed"]
symbol = self.SYMBOLS["failed"]
if cluster_status[cluster_id] != "failed":
continue

if self._interactive:
failed_details.append(
f"{color}{symbol} Cluster {cluster_id}{self.COLORS['reset']}"
f"{self.COLORS['failed']}✗ Cluster {cluster_id}{self.COLORS['reset']}"
)
else:
failed_details.append(f"✗ Cluster {cluster_id}")

# Group into lines of 4
for i in range(0, len(failed_details), 4):
print(f" {' | '.join(failed_details[i : i + 4])}")
print(f" {' | '.join(failed_details[i : i + 4])}", file=self.stream)

@staticmethod
def _sorted_cluster_ids(cluster_status: dict[str, str]) -> list[str]:
Expand Down
Loading
Loading