From 1da42107ea57119b3e003503691171a64bc70afd Mon Sep 17 00:00:00 2001 From: Manan Bhatt Date: Fri, 12 Jun 2026 14:30:45 +0530 Subject: [PATCH] Add poll-loop liveness watchdog to auto-restart wedged workers Workers occasionally stop polling and stay stopped until manually restarted. The poll loop runs poll + execute + update on one thread (a single event loop for async workers); if a poll/update call never returns (e.g. a stale keep-alive connection silently dropped by a proxy/LB/NAT, which never reaches the server so server metrics stay clean) or a blocking call freezes the loop, polling halts with no error. TaskHandler already supervises and restarts worker processes, but only when is_alive()==False (task_handler.py). A wedged-but-alive process is invisible to it, so the worker stays dead until an operator restarts it. This adds a liveness watchdog to both TaskRunner and AsyncTaskRunner: run_once() records a monotonic heartbeat each iteration (a healthy loop reaches it within ms even at full capacity), and a daemon thread (so a frozen loop can't block it) exits the process via os._exit when the loop has been silent past CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS (default 300s, 0 to disable). The existing supervisor then restarts it, turning a permanent stall into a few-second blip. This is a defense-in-depth backstop for the symptom, not a root-cause fix: it auto-recovers a stalled worker regardless of why it wedged, and its critical log points operators at capturing a stack dump (py-spy dump --pid) to diagnose the underlying blocked call. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../client/automator/async_task_runner.py | 99 +++++++++++++++ src/conductor/client/automator/task_runner.py | 95 ++++++++++++++ .../automator/test_poll_stall_watchdog.py | 116 ++++++++++++++++++ 3 files changed, 310 insertions(+) create mode 100644 tests/unit/automator/test_poll_stall_watchdog.py diff --git a/src/conductor/client/automator/async_task_runner.py b/src/conductor/client/automator/async_task_runner.py index ba3e3653..4c8affb6 100644 --- a/src/conductor/client/automator/async_task_runner.py +++ b/src/conductor/client/automator/async_task_runner.py @@ -3,6 +3,7 @@ import logging import os import sys +import threading import time import traceback @@ -40,6 +41,28 @@ ) ) +# Exit code used when the poll-loop liveness watchdog restarts a wedged worker. +# Distinct, non-zero value so operators can recognise watchdog restarts and so +# TaskHandler's supervisor (restart_on_failure) treats it as a failure exit. +POLL_STALL_EXIT_CODE = 70 # EX_SOFTWARE + + +def _get_poll_stall_timeout_seconds() -> int: + """Max seconds the poll loop may be silent before the watchdog restarts the + worker process. ``0`` disables the watchdog. + + Override via env ``CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS`` + (default 300s). The default is intentionally generous: a healthy poll loop + iterates within milliseconds even when every slot is busy, so a multi-minute + silence means the event loop is wedged (a never-returning poll/update, or a + blocking call that froze the loop) rather than legitimately busy. + """ + raw = os.getenv("CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS", "300") + try: + return max(0, int(float(raw))) + except (TypeError, ValueError): + return 300 + class AsyncTaskRunner: """ @@ -117,6 +140,18 @@ def __init__( self._tracked_task_ids = set() # Local set for cleanup on shutdown self._sync_task_client = None # Created after fork for LeaseManager heartbeats + # Poll-loop liveness watchdog. The event loop runs poll + execute + + # update on a single thread; if a poll/update await never returns (e.g. + # a stale keep-alive connection silently dropped by a proxy/LB) or a + # blocking call freezes the loop, polling stops silently. TaskHandler's + # supervisor only restarts DEAD processes (is_alive()==False), so a + # wedged-but-alive worker stays stuck until a manual restart. The + # watchdog (a daemon thread, so a frozen loop can't block it) detects a + # stalled poll loop and exits the process so the supervisor restarts it. + self._poll_stall_timeout = _get_poll_stall_timeout_seconds() + self._last_loop_activity = time.monotonic() + self._watchdog_thread = None + async def run(self) -> None: """Main async loop - runs continuously in single event loop.""" if self.configuration is not None: @@ -149,6 +184,10 @@ async def run(self) -> None: # Create semaphore in the event loop (must be created within the loop) self._semaphore = asyncio.Semaphore(self._max_workers) + # Start the poll-loop liveness watchdog (runs in this worker subprocess) + self._last_loop_activity = time.monotonic() + self.__start_poll_watchdog() + # Log worker configuration with correct PID (after fork) task_name = self.worker.get_task_definition_name() config_summary = get_worker_config_oneline(task_name, self._resolved_config) @@ -177,6 +216,62 @@ async def stop(self) -> None: """Signal the runner to stop gracefully.""" self._shutdown = True + # -- Poll-loop liveness watchdog ------------------------------------------- + + def __start_poll_watchdog(self) -> None: + """Start the daemon watchdog thread (no-op if disabled).""" + if self._poll_stall_timeout <= 0: + logger.debug("Poll-loop watchdog disabled (stall timeout <= 0)") + return + if self._watchdog_thread is not None and self._watchdog_thread.is_alive(): + return + self._watchdog_thread = threading.Thread( + target=self.__poll_watchdog_loop, + name=f"poll-watchdog-{self.worker.get_task_definition_name()}", + daemon=True, + ) + self._watchdog_thread.start() + logger.info( + "Poll-loop watchdog started for '%s' (stall_timeout=%ss)", + self.worker.get_task_definition_name(), + self._poll_stall_timeout, + ) + + def __poll_watchdog_loop(self) -> None: + # Check a few times per stall window, but at least every second and at + # most every 30s, so detection is timely without busy-spinning. + check_interval = max(1.0, min(self._poll_stall_timeout / 5.0, 30.0)) + while not self._shutdown: + time.sleep(check_interval) + self._check_poll_stall() + + def _check_poll_stall(self) -> bool: + """Restart the process if the poll loop has been silent too long. + + Returns True if a stall was detected (after which the process exits). + Extracted from the watchdog loop so it can be unit-tested without the + sleep loop. Calls ``os._exit`` so a wedged event loop can't intercept it. + """ + if self._poll_stall_timeout <= 0 or self._shutdown: + return False + idle = time.monotonic() - self._last_loop_activity + if idle < self._poll_stall_timeout: + return False + logger.critical( + "Poll loop for '%s' stalled %.0fs (>= %ss): the event loop is wedged " + "and not polling. Exiting (code %d) so the supervisor restarts this " + "worker. If this recurs, capture a stack dump (py-spy dump --pid ) " + "to find the blocked call.", + self.worker.get_task_definition_name(), + idle, + self._poll_stall_timeout, + POLL_STALL_EXIT_CODE, + ) + os._exit(POLL_STALL_EXIT_CODE) + return True # pragma: no cover - process has exited + + # -------------------------------------------------------------------------- + async def _cleanup(self) -> None: """Clean up async resources.""" logger.debug("Cleaning up AsyncTaskRunner resources...") @@ -462,6 +557,10 @@ async def __async_register_task_definition(self) -> None: async def run_once(self) -> None: """Execute one iteration of the polling loop (async version).""" + # Liveness heartbeat for the watchdog: a healthy loop reaches this every + # iteration (within ms, even at full capacity). A stale value means the + # previous iteration is wedged on an await that never returned. + self._last_loop_activity = time.monotonic() try: # No need for manual cleanup - tasks remove themselves via add_done_callback # Just check capacity directly diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index af566de1..66f00b80 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -44,6 +44,28 @@ ) ) +# Exit code used when the poll-loop liveness watchdog restarts a wedged worker. +# Distinct, non-zero value so operators can recognise watchdog restarts and so +# TaskHandler's supervisor (restart_on_failure) treats it as a failure exit. +POLL_STALL_EXIT_CODE = 70 # EX_SOFTWARE + + +def _get_poll_stall_timeout_seconds() -> int: + """Max seconds the poll loop may be silent before the watchdog restarts the + worker process. ``0`` disables the watchdog. + + Override via env ``CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS`` + (default 300s). The default is intentionally generous: a healthy poll loop + iterates within milliseconds even when every slot is busy, so a multi-minute + silence means the loop is wedged on a call that never returned rather than + legitimately busy. + """ + raw = os.getenv("CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS", "300") + try: + return max(0, int(float(raw))) + except (TypeError, ValueError): + return 300 + class TaskRunner: def __init__( @@ -116,6 +138,17 @@ def __init__( self._tracked_task_ids = set() # Local set for cleanup on shutdown self._tracked_task_ids_lock = threading.Lock() + # Poll-loop liveness watchdog. If the poll thread wedges on a call that + # never returns (e.g. a stale keep-alive connection silently dropped by + # a proxy/LB), polling stops silently. TaskHandler's supervisor only + # restarts DEAD processes (is_alive()==False), so a wedged-but-alive + # worker stays stuck until a manual restart. The watchdog (a daemon + # thread) detects a stalled poll loop and exits the process so the + # supervisor restarts it. + self._poll_stall_timeout = _get_poll_stall_timeout_seconds() + self._last_loop_activity = time.monotonic() + self._watchdog_thread = None + def run(self) -> None: if self.configuration is not None: self.configuration.apply_logging_config() @@ -139,6 +172,10 @@ def run(self) -> None: self.worker.get_polling_interval_in_seconds() ) + # Start the poll-loop liveness watchdog (runs in this worker subprocess) + self._last_loop_activity = time.monotonic() + self.__start_poll_watchdog() + try: while not self._shutdown: self.run_once() @@ -150,6 +187,60 @@ def stop(self) -> None: """Signal the runner to stop gracefully.""" self._shutdown = True + # -- Poll-loop liveness watchdog ------------------------------------------- + + def __start_poll_watchdog(self) -> None: + """Start the daemon watchdog thread (no-op if disabled).""" + if self._poll_stall_timeout <= 0: + logger.debug("Poll-loop watchdog disabled (stall timeout <= 0)") + return + if self._watchdog_thread is not None and self._watchdog_thread.is_alive(): + return + self._watchdog_thread = threading.Thread( + target=self.__poll_watchdog_loop, + name=f"poll-watchdog-{self.worker.get_task_definition_name()}", + daemon=True, + ) + self._watchdog_thread.start() + logger.info( + "Poll-loop watchdog started for '%s' (stall_timeout=%ss)", + self.worker.get_task_definition_name(), + self._poll_stall_timeout, + ) + + def __poll_watchdog_loop(self) -> None: + check_interval = max(1.0, min(self._poll_stall_timeout / 5.0, 30.0)) + while not self._shutdown: + time.sleep(check_interval) + self._check_poll_stall() + + def _check_poll_stall(self) -> bool: + """Restart the process if the poll loop has been silent too long. + + Returns True if a stall was detected (after which the process exits). + Extracted from the watchdog loop so it can be unit-tested without the + sleep loop. Calls ``os._exit`` so a wedged loop can't intercept it. + """ + if self._poll_stall_timeout <= 0 or self._shutdown: + return False + idle = time.monotonic() - self._last_loop_activity + if idle < self._poll_stall_timeout: + return False + logger.critical( + "Poll loop for '%s' stalled %.0fs (>= %ss): the poll loop is wedged " + "and not polling. Exiting (code %d) so the supervisor restarts this " + "worker. If this recurs, capture a stack dump (py-spy dump --pid ) " + "to find the blocked call.", + self.worker.get_task_definition_name(), + idle, + self._poll_stall_timeout, + POLL_STALL_EXIT_CODE, + ) + os._exit(POLL_STALL_EXIT_CODE) + return True # pragma: no cover - process has exited + + # -------------------------------------------------------------------------- + def _cleanup(self) -> None: """Clean up resources - called on exit.""" logger.debug("Cleaning up TaskRunner resources...") @@ -432,6 +523,10 @@ def __register_task_definition(self) -> None: logger.warning(f"Failed to register task definition for {task_name}: {e}") def run_once(self) -> None: + # Liveness heartbeat for the watchdog: a healthy loop reaches this every + # iteration (within ms, even at full capacity). A stale value means the + # previous iteration is wedged on a call that never returned. + self._last_loop_activity = time.monotonic() try: # Check completed async tasks first (non-blocking) self.__check_completed_async_tasks() diff --git a/tests/unit/automator/test_poll_stall_watchdog.py b/tests/unit/automator/test_poll_stall_watchdog.py new file mode 100644 index 00000000..db141b03 --- /dev/null +++ b/tests/unit/automator/test_poll_stall_watchdog.py @@ -0,0 +1,116 @@ +"""Unit tests for the poll-loop liveness watchdog on TaskRunner / AsyncTaskRunner. + +The watchdog restarts a worker whose poll loop has gone silent (a wedged event +loop / poll thread that TaskHandler's is_alive()-only supervisor cannot detect). +We test the decision logic and env parsing; os._exit is patched so the test +process survives. +""" +import asyncio +import os +import time +import unittest +from unittest.mock import patch + +from conductor.client.automator import async_task_runner as atr +from conductor.client.automator import task_runner as tr +from conductor.client.automator.async_task_runner import AsyncTaskRunner +from conductor.client.automator.task_runner import TaskRunner +from conductor.client.configuration.configuration import Configuration +from conductor.client.worker.worker import Worker + + +def _async_worker(): + async def fn() -> dict: + await asyncio.sleep(0) + return {} + return Worker(task_definition_name="wd_async", execute_function=fn, thread_count=2) + + +def _sync_worker(): + def fn() -> dict: + return {} + return Worker(task_definition_name="wd_sync", execute_function=fn, thread_count=2) + + +class TestPollStallTimeoutEnv(unittest.TestCase): + def setUp(self): + self.original_env = os.environ.copy() + + def tearDown(self): + os.environ.clear() + os.environ.update(self.original_env) + + def test_default_is_300(self): + os.environ.pop("CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS", None) + self.assertEqual(atr._get_poll_stall_timeout_seconds(), 300) + self.assertEqual(tr._get_poll_stall_timeout_seconds(), 300) + + def test_custom_value(self): + os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "45" + self.assertEqual(atr._get_poll_stall_timeout_seconds(), 45) + + def test_zero_disables(self): + os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "0" + self.assertEqual(atr._get_poll_stall_timeout_seconds(), 0) + + def test_invalid_falls_back_to_default(self): + os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "not-a-number" + self.assertEqual(atr._get_poll_stall_timeout_seconds(), 300) + + def test_negative_clamped_to_zero(self): + os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "-5" + self.assertEqual(atr._get_poll_stall_timeout_seconds(), 0) + + +class _WatchdogChecks: + """Shared assertions; subclasses provide a freshly-built runner.""" + + def _make_runner(self): # pragma: no cover - overridden + raise NotImplementedError + + def test_fresh_loop_does_not_exit(self): + r = self._make_runner() + r._poll_stall_timeout = 10 + r._last_loop_activity = time.monotonic() + with patch.object(os, "_exit") as mock_exit: + self.assertFalse(r._check_poll_stall()) + mock_exit.assert_not_called() + + def test_stalled_loop_exits_with_code(self): + r = self._make_runner() + r._poll_stall_timeout = 5 + r._last_loop_activity = time.monotonic() - 60 # silent for 60s + with patch.object(os, "_exit") as mock_exit: + r._check_poll_stall() + mock_exit.assert_called_once_with(70) + + def test_disabled_never_exits_even_when_stale(self): + r = self._make_runner() + r._poll_stall_timeout = 0 # disabled + r._last_loop_activity = time.monotonic() - 99999 + with patch.object(os, "_exit") as mock_exit: + self.assertFalse(r._check_poll_stall()) + mock_exit.assert_not_called() + + def test_shutdown_suppresses_exit(self): + r = self._make_runner() + r._poll_stall_timeout = 5 + r._last_loop_activity = time.monotonic() - 60 + r._shutdown = True + with patch.object(os, "_exit") as mock_exit: + self.assertFalse(r._check_poll_stall()) + mock_exit.assert_not_called() + + +class TestAsyncWatchdog(_WatchdogChecks, unittest.TestCase): + def _make_runner(self): + return AsyncTaskRunner(worker=_async_worker(), configuration=Configuration()) + + +class TestSyncWatchdog(_WatchdogChecks, unittest.TestCase): + def _make_runner(self): + return TaskRunner(worker=_sync_worker(), configuration=Configuration()) + + +if __name__ == "__main__": + unittest.main()