Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/conductor/client/automator/async_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import sys
import threading
import time
import traceback

Expand Down Expand Up @@ -40,6 +41,28 @@
)
)

# Exit code used when the poll-loop liveness watchdog restarts a wedged worker.
# Distinct, non-zero value so operators can recognise watchdog restarts and so
# TaskHandler's supervisor (restart_on_failure) treats it as a failure exit.
POLL_STALL_EXIT_CODE = 70 # EX_SOFTWARE


def _get_poll_stall_timeout_seconds() -> int:
"""Max seconds the poll loop may be silent before the watchdog restarts the
worker process. ``0`` disables the watchdog.

Override via env ``CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS``
(default 300s). The default is intentionally generous: a healthy poll loop
iterates within milliseconds even when every slot is busy, so a multi-minute
silence means the event loop is wedged (a never-returning poll/update, or a
blocking call that froze the loop) rather than legitimately busy.
"""
raw = os.getenv("CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS", "300")
try:
return max(0, int(float(raw)))
except (TypeError, ValueError):
return 300


class AsyncTaskRunner:
"""
Expand Down Expand Up @@ -117,6 +140,18 @@ def __init__(
self._tracked_task_ids = set() # Local set for cleanup on shutdown
self._sync_task_client = None # Created after fork for LeaseManager heartbeats

# Poll-loop liveness watchdog. The event loop runs poll + execute +
# update on a single thread; if a poll/update await never returns (e.g.
# a stale keep-alive connection silently dropped by a proxy/LB) or a
# blocking call freezes the loop, polling stops silently. TaskHandler's
# supervisor only restarts DEAD processes (is_alive()==False), so a
# wedged-but-alive worker stays stuck until a manual restart. The
# watchdog (a daemon thread, so a frozen loop can't block it) detects a
# stalled poll loop and exits the process so the supervisor restarts it.
self._poll_stall_timeout = _get_poll_stall_timeout_seconds()
self._last_loop_activity = time.monotonic()
self._watchdog_thread = None

async def run(self) -> None:
"""Main async loop - runs continuously in single event loop."""
if self.configuration is not None:
Expand Down Expand Up @@ -149,6 +184,10 @@ async def run(self) -> None:
# Create semaphore in the event loop (must be created within the loop)
self._semaphore = asyncio.Semaphore(self._max_workers)

# Start the poll-loop liveness watchdog (runs in this worker subprocess)
self._last_loop_activity = time.monotonic()
self.__start_poll_watchdog()

# Log worker configuration with correct PID (after fork)
task_name = self.worker.get_task_definition_name()
config_summary = get_worker_config_oneline(task_name, self._resolved_config)
Expand Down Expand Up @@ -177,6 +216,62 @@ async def stop(self) -> None:
"""Signal the runner to stop gracefully."""
self._shutdown = True

# -- Poll-loop liveness watchdog -------------------------------------------

def __start_poll_watchdog(self) -> None:
"""Start the daemon watchdog thread (no-op if disabled)."""
if self._poll_stall_timeout <= 0:
logger.debug("Poll-loop watchdog disabled (stall timeout <= 0)")
return
if self._watchdog_thread is not None and self._watchdog_thread.is_alive():
return
self._watchdog_thread = threading.Thread(
target=self.__poll_watchdog_loop,
name=f"poll-watchdog-{self.worker.get_task_definition_name()}",
daemon=True,
)
self._watchdog_thread.start()
logger.info(
"Poll-loop watchdog started for '%s' (stall_timeout=%ss)",
self.worker.get_task_definition_name(),
self._poll_stall_timeout,
)

def __poll_watchdog_loop(self) -> None:
# Check a few times per stall window, but at least every second and at
# most every 30s, so detection is timely without busy-spinning.
check_interval = max(1.0, min(self._poll_stall_timeout / 5.0, 30.0))
while not self._shutdown:
time.sleep(check_interval)
self._check_poll_stall()

def _check_poll_stall(self) -> bool:
"""Restart the process if the poll loop has been silent too long.

Returns True if a stall was detected (after which the process exits).
Extracted from the watchdog loop so it can be unit-tested without the
sleep loop. Calls ``os._exit`` so a wedged event loop can't intercept it.
"""
if self._poll_stall_timeout <= 0 or self._shutdown:
return False
idle = time.monotonic() - self._last_loop_activity
if idle < self._poll_stall_timeout:
return False
logger.critical(
"Poll loop for '%s' stalled %.0fs (>= %ss): the event loop is wedged "
"and not polling. Exiting (code %d) so the supervisor restarts this "
"worker. If this recurs, capture a stack dump (py-spy dump --pid <pid>) "
"to find the blocked call.",
self.worker.get_task_definition_name(),
idle,
self._poll_stall_timeout,
POLL_STALL_EXIT_CODE,
)
os._exit(POLL_STALL_EXIT_CODE)
return True # pragma: no cover - process has exited

# --------------------------------------------------------------------------

async def _cleanup(self) -> None:
"""Clean up async resources."""
logger.debug("Cleaning up AsyncTaskRunner resources...")
Expand Down Expand Up @@ -462,6 +557,10 @@ async def __async_register_task_definition(self) -> None:

async def run_once(self) -> None:
"""Execute one iteration of the polling loop (async version)."""
# Liveness heartbeat for the watchdog: a healthy loop reaches this every
# iteration (within ms, even at full capacity). A stale value means the
# previous iteration is wedged on an await that never returned.
self._last_loop_activity = time.monotonic()
try:
# No need for manual cleanup - tasks remove themselves via add_done_callback
# Just check capacity directly
Expand Down
95 changes: 95 additions & 0 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@
)
)

# Exit code used when the poll-loop liveness watchdog restarts a wedged worker.
# Distinct, non-zero value so operators can recognise watchdog restarts and so
# TaskHandler's supervisor (restart_on_failure) treats it as a failure exit.
POLL_STALL_EXIT_CODE = 70 # EX_SOFTWARE


def _get_poll_stall_timeout_seconds() -> int:
"""Max seconds the poll loop may be silent before the watchdog restarts the
worker process. ``0`` disables the watchdog.

Override via env ``CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS``
(default 300s). The default is intentionally generous: a healthy poll loop
iterates within milliseconds even when every slot is busy, so a multi-minute
silence means the loop is wedged on a call that never returned rather than
legitimately busy.
"""
raw = os.getenv("CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS", "300")
try:
return max(0, int(float(raw)))
except (TypeError, ValueError):
return 300


class TaskRunner:
def __init__(
Expand Down Expand Up @@ -116,6 +138,17 @@ def __init__(
self._tracked_task_ids = set() # Local set for cleanup on shutdown
self._tracked_task_ids_lock = threading.Lock()

# Poll-loop liveness watchdog. If the poll thread wedges on a call that
# never returns (e.g. a stale keep-alive connection silently dropped by
# a proxy/LB), polling stops silently. TaskHandler's supervisor only
# restarts DEAD processes (is_alive()==False), so a wedged-but-alive
# worker stays stuck until a manual restart. The watchdog (a daemon
# thread) detects a stalled poll loop and exits the process so the
# supervisor restarts it.
self._poll_stall_timeout = _get_poll_stall_timeout_seconds()
self._last_loop_activity = time.monotonic()
self._watchdog_thread = None

def run(self) -> None:
if self.configuration is not None:
self.configuration.apply_logging_config()
Expand All @@ -139,6 +172,10 @@ def run(self) -> None:
self.worker.get_polling_interval_in_seconds()
)

# Start the poll-loop liveness watchdog (runs in this worker subprocess)
self._last_loop_activity = time.monotonic()
self.__start_poll_watchdog()

try:
while not self._shutdown:
self.run_once()
Expand All @@ -150,6 +187,60 @@ def stop(self) -> None:
"""Signal the runner to stop gracefully."""
self._shutdown = True

# -- Poll-loop liveness watchdog -------------------------------------------

def __start_poll_watchdog(self) -> None:
"""Start the daemon watchdog thread (no-op if disabled)."""
if self._poll_stall_timeout <= 0:
logger.debug("Poll-loop watchdog disabled (stall timeout <= 0)")
return
if self._watchdog_thread is not None and self._watchdog_thread.is_alive():
return
self._watchdog_thread = threading.Thread(
target=self.__poll_watchdog_loop,
name=f"poll-watchdog-{self.worker.get_task_definition_name()}",
daemon=True,
)
self._watchdog_thread.start()
logger.info(
"Poll-loop watchdog started for '%s' (stall_timeout=%ss)",
self.worker.get_task_definition_name(),
self._poll_stall_timeout,
)

def __poll_watchdog_loop(self) -> None:
check_interval = max(1.0, min(self._poll_stall_timeout / 5.0, 30.0))
while not self._shutdown:
time.sleep(check_interval)
self._check_poll_stall()

def _check_poll_stall(self) -> bool:
"""Restart the process if the poll loop has been silent too long.

Returns True if a stall was detected (after which the process exits).
Extracted from the watchdog loop so it can be unit-tested without the
sleep loop. Calls ``os._exit`` so a wedged loop can't intercept it.
"""
if self._poll_stall_timeout <= 0 or self._shutdown:
return False
idle = time.monotonic() - self._last_loop_activity
if idle < self._poll_stall_timeout:
return False
logger.critical(
"Poll loop for '%s' stalled %.0fs (>= %ss): the poll loop is wedged "
"and not polling. Exiting (code %d) so the supervisor restarts this "
"worker. If this recurs, capture a stack dump (py-spy dump --pid <pid>) "
"to find the blocked call.",
self.worker.get_task_definition_name(),
idle,
self._poll_stall_timeout,
POLL_STALL_EXIT_CODE,
)
os._exit(POLL_STALL_EXIT_CODE)
return True # pragma: no cover - process has exited

# --------------------------------------------------------------------------

def _cleanup(self) -> None:
"""Clean up resources - called on exit."""
logger.debug("Cleaning up TaskRunner resources...")
Expand Down Expand Up @@ -432,6 +523,10 @@ def __register_task_definition(self) -> None:
logger.warning(f"Failed to register task definition for {task_name}: {e}")

def run_once(self) -> None:
# Liveness heartbeat for the watchdog: a healthy loop reaches this every
# iteration (within ms, even at full capacity). A stale value means the
# previous iteration is wedged on a call that never returned.
self._last_loop_activity = time.monotonic()
try:
# Check completed async tasks first (non-blocking)
self.__check_completed_async_tasks()
Expand Down
116 changes: 116 additions & 0 deletions tests/unit/automator/test_poll_stall_watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Unit tests for the poll-loop liveness watchdog on TaskRunner / AsyncTaskRunner.

The watchdog restarts a worker whose poll loop has gone silent (a wedged event
loop / poll thread that TaskHandler's is_alive()-only supervisor cannot detect).
We test the decision logic and env parsing; os._exit is patched so the test
process survives.
"""
import asyncio
import os
import time
import unittest
from unittest.mock import patch

from conductor.client.automator import async_task_runner as atr
from conductor.client.automator import task_runner as tr
from conductor.client.automator.async_task_runner import AsyncTaskRunner
from conductor.client.automator.task_runner import TaskRunner
from conductor.client.configuration.configuration import Configuration
from conductor.client.worker.worker import Worker


def _async_worker():
async def fn() -> dict:
await asyncio.sleep(0)
return {}
return Worker(task_definition_name="wd_async", execute_function=fn, thread_count=2)


def _sync_worker():
def fn() -> dict:
return {}
return Worker(task_definition_name="wd_sync", execute_function=fn, thread_count=2)


class TestPollStallTimeoutEnv(unittest.TestCase):
def setUp(self):
self.original_env = os.environ.copy()

def tearDown(self):
os.environ.clear()
os.environ.update(self.original_env)

def test_default_is_300(self):
os.environ.pop("CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS", None)
self.assertEqual(atr._get_poll_stall_timeout_seconds(), 300)
self.assertEqual(tr._get_poll_stall_timeout_seconds(), 300)

def test_custom_value(self):
os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "45"
self.assertEqual(atr._get_poll_stall_timeout_seconds(), 45)

def test_zero_disables(self):
os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "0"
self.assertEqual(atr._get_poll_stall_timeout_seconds(), 0)

def test_invalid_falls_back_to_default(self):
os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "not-a-number"
self.assertEqual(atr._get_poll_stall_timeout_seconds(), 300)

def test_negative_clamped_to_zero(self):
os.environ["CONDUCTOR_WORKER_POLL_STALL_TIMEOUT_SECONDS"] = "-5"
self.assertEqual(atr._get_poll_stall_timeout_seconds(), 0)


class _WatchdogChecks:
"""Shared assertions; subclasses provide a freshly-built runner."""

def _make_runner(self): # pragma: no cover - overridden
raise NotImplementedError

def test_fresh_loop_does_not_exit(self):
r = self._make_runner()
r._poll_stall_timeout = 10
r._last_loop_activity = time.monotonic()
with patch.object(os, "_exit") as mock_exit:
self.assertFalse(r._check_poll_stall())
mock_exit.assert_not_called()

def test_stalled_loop_exits_with_code(self):
r = self._make_runner()
r._poll_stall_timeout = 5
r._last_loop_activity = time.monotonic() - 60 # silent for 60s
with patch.object(os, "_exit") as mock_exit:
r._check_poll_stall()
mock_exit.assert_called_once_with(70)

def test_disabled_never_exits_even_when_stale(self):
r = self._make_runner()
r._poll_stall_timeout = 0 # disabled
r._last_loop_activity = time.monotonic() - 99999
with patch.object(os, "_exit") as mock_exit:
self.assertFalse(r._check_poll_stall())
mock_exit.assert_not_called()

def test_shutdown_suppresses_exit(self):
r = self._make_runner()
r._poll_stall_timeout = 5
r._last_loop_activity = time.monotonic() - 60
r._shutdown = True
with patch.object(os, "_exit") as mock_exit:
self.assertFalse(r._check_poll_stall())
mock_exit.assert_not_called()


class TestAsyncWatchdog(_WatchdogChecks, unittest.TestCase):
def _make_runner(self):
return AsyncTaskRunner(worker=_async_worker(), configuration=Configuration())


class TestSyncWatchdog(_WatchdogChecks, unittest.TestCase):
def _make_runner(self):
return TaskRunner(worker=_sync_worker(), configuration=Configuration())


if __name__ == "__main__":
unittest.main()
Loading