From 801e20fadced10a5d16ce65fc93e268e51b3d1b2 Mon Sep 17 00:00:00 2001 From: taohe Date: Fri, 5 Jun 2026 15:37:18 +0800 Subject: [PATCH] fix: keep failed tasks resumable; clearer timeout error; quieter Feishu logs Four independent backend fixes surfaced while debugging a Feishu reply that said "Task #14 not found or has no saved session". - taskboard.py: persist the agent session_id (claude session / codex thread_id) even when a run FAILS. It was only saved on success, so a failed task could never be resumed (replying in a Feishu/Slack/Telegram thread hit "no saved session"). Codex emits thread_id in the opening thread.started event, so even a started-then-failed run is recoverable. - taskboard.py: on timeout, use "Task timed out after Ns" as the task.error summary instead of letting _extract_error_summary surface an unrelated stderr line (e.g. codex's "Reading additional input from stdin..."). Also hoist the timed_out init before the try block so the failure branch can read it when Popen itself raises (CLI not found) before the timer is armed. - channels/feishu_channel.py: register no-op processors for the message_read and recalled receipts we subscribe to but don't act on, silencing the SDK's "processor not found, type: im.message.message_read_v1" ERROR per receipt. - channels/feishu_channel.py: set the "Lark" logger propagate=False so lines aren't emitted twice (its own handler + root basicConfig handler), and lower log_level DEBUG -> INFO to further cut noise. Tests: +5 via red-green TDD; full suite 824 passed, coverage 92.79%. Co-Authored-By: Claude Opus 4.8 (1M context) --- channels/feishu_channel.py | 15 +++++- taskboard.py | 39 ++++++++++----- tests/test_execute_task.py | 90 ++++++++++++++++++++++++++++++++++ tests/test_feishu_lifecycle.py | 69 ++++++++++++++++++++++++++ 4 files changed, 198 insertions(+), 15 deletions(-) diff --git a/channels/feishu_channel.py b/channels/feishu_channel.py index e65dfea..5c9628e 100644 --- a/channels/feishu_channel.py +++ b/channels/feishu_channel.py @@ -14,6 +14,7 @@ import base64 import json +import logging import re import threading import time @@ -385,12 +386,17 @@ def start(self) -> None: return try: + # The lark SDK's "Lark" logger ships its own stdout handler; stop it + # propagating to the root logger (configured via basicConfig in + # taskboard.py) so each line isn't emitted twice. + logging.getLogger("Lark").propagate = False + print("[Feishu] Building Lark client...") self._client = ( lark.Client.builder() .app_id(app_id) .app_secret(app_secret) - .log_level(lark.LogLevel.DEBUG) + .log_level(lark.LogLevel.INFO) .build() ) print("[Feishu] Lark client built successfully") @@ -402,6 +408,11 @@ def start(self) -> None: .register_p2_im_chat_member_bot_added_v1(self._on_bot_added) .register_p2_im_message_reaction_created_v1(self._on_reaction) .register_p2_im_message_reaction_deleted_v1(self._on_reaction) + # No-op processors for read-only receipts we subscribe to but + # don't act on — without these the SDK logs "processor not + # found, type: im.message.message_read_v1" on every receipt. + .register_p2_im_message_message_read_v1(lambda data: None) + .register_p2_im_message_recalled_v1(lambda data: None) .build() ) print("[Feishu] Event handler registered") @@ -411,7 +422,7 @@ def start(self) -> None: app_id, app_secret, event_handler=event_handler, - log_level=lark.LogLevel.DEBUG, + log_level=lark.LogLevel.INFO, ) print("[Feishu] WebSocket client created") diff --git a/taskboard.py b/taskboard.py index b69cb3a..0172ae3 100644 --- a/taskboard.py +++ b/taskboard.py @@ -3281,6 +3281,9 @@ def _is_safe_image_path(path: str) -> bool: cmd.extend(["--resume", task["session_id"]]) raw_stdout = "" raw_stderr = "" + # Initialized before the try so the failure branch can read it even when + # Popen itself raises (e.g. CLI not found) before the timer is armed. + timed_out = [False] try: timeout_secs = int(self.db.get_setting("timeout", "600")) start_time = time.time() @@ -3327,8 +3330,6 @@ def _read_stderr(): stderr_thread.start() # Timer that kills the entire process group if it exceeds the configured timeout - timed_out = [False] - def _kill(): timed_out[0] = True try: @@ -3509,21 +3510,33 @@ def _kill(): else: # Extract a clean, human-readable error summary for task.error and # notification channels. The full raw output is preserved in run_error. - error_summary = ( - self._extract_error_summary(raw_stderr, raw_stdout) - if (raw_stderr or raw_stdout) - else (output or "Unknown error") - ) + if timed_out[0]: + # The timeout IS the reason — don't let an unrelated stderr line + # (e.g. codex's "Reading additional input from stdin…") mask it. + error_summary = output + else: + error_summary = ( + self._extract_error_summary(raw_stderr, raw_stdout) + if (raw_stderr or raw_stdout) + else (output or "Unknown error") + ) + updates = { + "status": "failed", + "error": error_summary, + "last_run_at": datetime.now().isoformat(), + "run_count": new_count, + } + # Persist the conversation id even on failure so the task stays + # resumable (e.g. replying in a Feishu/Slack/Telegram thread to + # retry). Codex emits thread_id in the opening thread.started event, + # so even a started-then-failed run has one to recover. + if extracted_session_id: + updates["session_id"] = extracted_session_id self.db.finish_run_and_update_task( run_id, "failed", tid, - { - "status": "failed", - "error": error_summary, - "last_run_at": datetime.now().isoformat(), - "run_count": new_count, - }, + updates, run_error=output, raw_output=raw_output_stored, ) diff --git a/tests/test_execute_task.py b/tests/test_execute_task.py index a3ebb9b..51b5256 100644 --- a/tests/test_execute_task.py +++ b/tests/test_execute_task.py @@ -357,6 +357,64 @@ def test_execute_task_failure_sets_failed_with_error_summary(scheduler): assert runs[0]["status"] == "failed" +# ── _execute_task: failure still persists session_id (resumable retry) ─────── +def test_execute_task_codex_failure_still_persists_session_id(scheduler, monkeypatch): + """A codex run that emits ``thread.started`` then fails must still persist + the ``thread_id`` as ``session_id``. Otherwise a failed task cannot be + resumed (e.g. replying in the Feishu thread to retry) → "no saved session". + """ + db = scheduler.db + tid = db.add_task(Task(title="codex boom", prompt="x", working_dir=".", agent="codex")) + task = db.get_task(tid) + scheduler._active_tasks[tid] = object() + monkeypatch.setattr(scheduler, "_find_codex_generated_images", lambda *a, **k: []) + + stdout_lines = [ + json.dumps({"type": "thread.started", "thread_id": "thread-fail-1"}) + "\n", + ] + fake = FakePopen( + stdout_lines, + stderr_lines=["codex error: model overloaded\n"], + returncode=1, + ) + with patch.object(taskboard.subprocess, "Popen", return_value=fake): + scheduler._execute_task(task) + + refreshed = db.get_task(tid) + assert refreshed["status"] == "failed" + # The conversation id is preserved despite the failure → resumable. + assert refreshed["session_id"] == "thread-fail-1" + + +def test_execute_task_claude_failure_still_persists_session_id(scheduler): + """A claude run whose trailing ``result`` event carries a ``session_id`` + (e.g. ``error_during_execution``) must persist it on failure too. + """ + db = scheduler.db + tid = db.add_task(Task(title="claude boom", prompt="x", working_dir=".", agent="claude")) + task = db.get_task(tid) + scheduler._active_tasks[tid] = object() + + stdout_lines = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-fail-9"}) + "\n", + json.dumps( + { + "type": "result", + "subtype": "error_during_execution", + "session_id": "sess-fail-9", + } + ) + + "\n", + ] + fake = FakePopen(stdout_lines, stderr_lines=["boom\n"], returncode=1) + with patch.object(taskboard.subprocess, "Popen", return_value=fake): + scheduler._execute_task(task) + + refreshed = db.get_task(tid) + assert refreshed["status"] == "failed" + assert refreshed["session_id"] == "sess-fail-9" + + # ── _execute_task: missing CLI binary ──────────────────────────────────────── def test_execute_task_missing_binary_fails_with_install_hint(scheduler): db = scheduler.db @@ -522,6 +580,38 @@ def slow_lines(): assert "timed out" in (runs[0]["error"] or "") +def test_execute_task_timeout_error_summary_states_timeout_not_stderr(scheduler, monkeypatch): + """On timeout the human-readable ``task.error`` must say it timed out, not + surface an unrelated stderr line. Regression: codex prints "Reading + additional input from stdin…" to stderr, which ``_extract_error_summary`` + picked over the real "Task timed out after Ns" reason. + """ + db = scheduler.db + db.set_setting("timeout", "0") # kill timer (delay 0) fires immediately + tid = db.add_task(Task(title="slow review", prompt="hang", working_dir=".", agent="codex")) + task = db.get_task(tid) + scheduler._active_tasks[tid] = object() + monkeypatch.setattr(scheduler, "_find_codex_generated_images", lambda *a, **k: []) + + def slow_lines(): + time.sleep(0.2) # let the delay-0 timer fire mid-stream + yield json.dumps({"type": "thread.started", "thread_id": "t-1"}) + "\n" + + fake = FakePopen( + slow_lines(), + stderr_lines=["Reading additional input from stdin...\n"], + returncode=0, + ) + with patch.object(taskboard.subprocess, "Popen", return_value=fake): + scheduler._execute_task(task) + + refreshed = db.get_task(tid) + assert refreshed["status"] == "failed" + # error summary states the timeout, not the stderr noise + assert "timed out" in (refreshed["error"] or "") + assert "stdin" not in (refreshed["error"] or "") + + # ── _run_agent_command: heartbeat/skill-sweep invocation ───────────────────── def test_run_agent_command_claude_returns_result_text(scheduler): db = scheduler.db diff --git a/tests/test_feishu_lifecycle.py b/tests/test_feishu_lifecycle.py index 8f9895a..7ed9443 100644 --- a/tests/test_feishu_lifecycle.py +++ b/tests/test_feishu_lifecycle.py @@ -75,6 +75,8 @@ def test_start_wires_handlers_and_spawns_thread_without_connecting(self): "register_p2_im_chat_member_bot_added_v1", "register_p2_im_message_reaction_created_v1", "register_p2_im_message_reaction_deleted_v1", + "register_p2_im_message_message_read_v1", + "register_p2_im_message_recalled_v1", ): getattr(dispatcher_builder, name).return_value = dispatcher_builder built_handler = Mock() @@ -120,6 +122,73 @@ def start(self): assert started_threads[0].started is True ws_client.start.assert_not_called() + def test_start_registers_readonly_event_noops(self): + """message_read / recalled receipts must have (no-op) processors, else + the lark SDK logs 'processor not found, type: im.message.message_read_v1' + on every read receipt. + """ + with patch("channels.feishu_channel.FEISHU_AVAILABLE", True): + from channels.feishu_channel import FeishuChannel + + ch = FeishuChannel(Mock(), Mock(), Mock()) + ch.db.get_setting.side_effect = lambda key: { + "feishu_app_id": "cli_app_id_value", + "feishu_app_secret": "secret_value", + }.get(key, "") + + fake_lark = Mock() + built_client = Mock() + fake_lark.Client.builder.return_value.app_id.return_value.app_secret.return_value.log_level.return_value.build.return_value = built_client + dispatcher_builder = fake_lark.EventDispatcherHandler.builder.return_value + # every register call returns the builder so the fluent chain holds + for name in ( + "register_p2_im_message_receive_v1", + "register_p2_im_chat_member_bot_added_v1", + "register_p2_im_message_reaction_created_v1", + "register_p2_im_message_reaction_deleted_v1", + "register_p2_im_message_message_read_v1", + "register_p2_im_message_recalled_v1", + ): + getattr(dispatcher_builder, name).return_value = dispatcher_builder + dispatcher_builder.build.return_value = Mock() + fake_lark.ws.Client.return_value = Mock() + + with ( + patch("channels.feishu_channel.lark", fake_lark), + patch("channels.feishu_channel.threading.Thread", Mock()), + ): + ch.start() + + dispatcher_builder.register_p2_im_message_message_read_v1.assert_called_once() + dispatcher_builder.register_p2_im_message_recalled_v1.assert_called_once() + + def test_start_disables_lark_logger_propagation(self): + """The lark SDK's 'Lark' logger has its own handler; if it also + propagates to the root logger (basicConfig) every line prints twice. + start() must turn propagation off. + """ + import logging + + logging.getLogger("Lark").propagate = True # reset global state + + with patch("channels.feishu_channel.FEISHU_AVAILABLE", True): + from channels.feishu_channel import FeishuChannel + + ch = FeishuChannel(Mock(), Mock(), Mock()) + ch.db.get_setting.side_effect = lambda key: { + "feishu_app_id": "cli_app_id_value", + "feishu_app_secret": "secret_value", + }.get(key, "") + + fake_lark = Mock() + with ( + patch("channels.feishu_channel.lark", fake_lark), + patch("channels.feishu_channel.threading.Thread", Mock()), + ): + ch.start() + + assert logging.getLogger("Lark").propagate is False + def test_start_handles_build_exception(self): with patch("channels.feishu_channel.FEISHU_AVAILABLE", True): from channels.feishu_channel import FeishuChannel