Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions channels/feishu_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import base64
import json
import logging
import re
import threading
import time
Expand Down Expand Up @@ -385,12 +386,17 @@ def start(self) -> None:
return

try:
# The lark SDK's "Lark" logger ships its own stdout handler; stop it
# propagating to the root logger (configured via basicConfig in
# taskboard.py) so each line isn't emitted twice.
logging.getLogger("Lark").propagate = False

print("[Feishu] Building Lark client...")
self._client = (
lark.Client.builder()
.app_id(app_id)
.app_secret(app_secret)
.log_level(lark.LogLevel.DEBUG)
.log_level(lark.LogLevel.INFO)
.build()
)
print("[Feishu] Lark client built successfully")
Expand All @@ -402,6 +408,11 @@ def start(self) -> None:
.register_p2_im_chat_member_bot_added_v1(self._on_bot_added)
.register_p2_im_message_reaction_created_v1(self._on_reaction)
.register_p2_im_message_reaction_deleted_v1(self._on_reaction)
# No-op processors for read-only receipts we subscribe to but
# don't act on — without these the SDK logs "processor not
# found, type: im.message.message_read_v1" on every receipt.
.register_p2_im_message_message_read_v1(lambda data: None)
.register_p2_im_message_recalled_v1(lambda data: None)
.build()
)
print("[Feishu] Event handler registered")
Expand All @@ -411,7 +422,7 @@ def start(self) -> None:
app_id,
app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.DEBUG,
log_level=lark.LogLevel.INFO,
)
print("[Feishu] WebSocket client created")

Expand Down
39 changes: 26 additions & 13 deletions taskboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3281,6 +3281,9 @@ def _is_safe_image_path(path: str) -> bool:
cmd.extend(["--resume", task["session_id"]])
raw_stdout = ""
raw_stderr = ""
# Initialized before the try so the failure branch can read it even when
# Popen itself raises (e.g. CLI not found) before the timer is armed.
timed_out = [False]
try:
timeout_secs = int(self.db.get_setting("timeout", "600"))
start_time = time.time()
Expand Down Expand Up @@ -3327,8 +3330,6 @@ def _read_stderr():
stderr_thread.start()

# Timer that kills the entire process group if it exceeds the configured timeout
timed_out = [False]

def _kill():
timed_out[0] = True
try:
Expand Down Expand Up @@ -3509,21 +3510,33 @@ def _kill():
else:
# Extract a clean, human-readable error summary for task.error and
# notification channels. The full raw output is preserved in run_error.
error_summary = (
self._extract_error_summary(raw_stderr, raw_stdout)
if (raw_stderr or raw_stdout)
else (output or "Unknown error")
)
if timed_out[0]:
# The timeout IS the reason — don't let an unrelated stderr line
# (e.g. codex's "Reading additional input from stdin…") mask it.
error_summary = output
else:
error_summary = (
self._extract_error_summary(raw_stderr, raw_stdout)
if (raw_stderr or raw_stdout)
else (output or "Unknown error")
)
updates = {
"status": "failed",
"error": error_summary,
"last_run_at": datetime.now().isoformat(),
"run_count": new_count,
}
# Persist the conversation id even on failure so the task stays
# resumable (e.g. replying in a Feishu/Slack/Telegram thread to
# retry). Codex emits thread_id in the opening thread.started event,
# so even a started-then-failed run has one to recover.
if extracted_session_id:
updates["session_id"] = extracted_session_id
self.db.finish_run_and_update_task(
run_id,
"failed",
tid,
{
"status": "failed",
"error": error_summary,
"last_run_at": datetime.now().isoformat(),
"run_count": new_count,
},
updates,
run_error=output,
raw_output=raw_output_stored,
)
Expand Down
90 changes: 90 additions & 0 deletions tests/test_execute_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,64 @@ def test_execute_task_failure_sets_failed_with_error_summary(scheduler):
assert runs[0]["status"] == "failed"


# ── _execute_task: failure still persists session_id (resumable retry) ───────
def test_execute_task_codex_failure_still_persists_session_id(scheduler, monkeypatch):
"""A codex run that emits ``thread.started`` then fails must still persist
the ``thread_id`` as ``session_id``. Otherwise a failed task cannot be
resumed (e.g. replying in the Feishu thread to retry) → "no saved session".
"""
db = scheduler.db
tid = db.add_task(Task(title="codex boom", prompt="x", working_dir=".", agent="codex"))
task = db.get_task(tid)
scheduler._active_tasks[tid] = object()
monkeypatch.setattr(scheduler, "_find_codex_generated_images", lambda *a, **k: [])

stdout_lines = [
json.dumps({"type": "thread.started", "thread_id": "thread-fail-1"}) + "\n",
]
fake = FakePopen(
stdout_lines,
stderr_lines=["codex error: model overloaded\n"],
returncode=1,
)
with patch.object(taskboard.subprocess, "Popen", return_value=fake):
scheduler._execute_task(task)

refreshed = db.get_task(tid)
assert refreshed["status"] == "failed"
# The conversation id is preserved despite the failure → resumable.
assert refreshed["session_id"] == "thread-fail-1"


def test_execute_task_claude_failure_still_persists_session_id(scheduler):
"""A claude run whose trailing ``result`` event carries a ``session_id``
(e.g. ``error_during_execution``) must persist it on failure too.
"""
db = scheduler.db
tid = db.add_task(Task(title="claude boom", prompt="x", working_dir=".", agent="claude"))
task = db.get_task(tid)
scheduler._active_tasks[tid] = object()

stdout_lines = [
json.dumps({"type": "system", "subtype": "init", "session_id": "sess-fail-9"}) + "\n",
json.dumps(
{
"type": "result",
"subtype": "error_during_execution",
"session_id": "sess-fail-9",
}
)
+ "\n",
]
fake = FakePopen(stdout_lines, stderr_lines=["boom\n"], returncode=1)
with patch.object(taskboard.subprocess, "Popen", return_value=fake):
scheduler._execute_task(task)

refreshed = db.get_task(tid)
assert refreshed["status"] == "failed"
assert refreshed["session_id"] == "sess-fail-9"


# ── _execute_task: missing CLI binary ────────────────────────────────────────
def test_execute_task_missing_binary_fails_with_install_hint(scheduler):
db = scheduler.db
Expand Down Expand Up @@ -522,6 +580,38 @@ def slow_lines():
assert "timed out" in (runs[0]["error"] or "")


def test_execute_task_timeout_error_summary_states_timeout_not_stderr(scheduler, monkeypatch):
"""On timeout the human-readable ``task.error`` must say it timed out, not
surface an unrelated stderr line. Regression: codex prints "Reading
additional input from stdin…" to stderr, which ``_extract_error_summary``
picked over the real "Task timed out after Ns" reason.
"""
db = scheduler.db
db.set_setting("timeout", "0") # kill timer (delay 0) fires immediately
tid = db.add_task(Task(title="slow review", prompt="hang", working_dir=".", agent="codex"))
task = db.get_task(tid)
scheduler._active_tasks[tid] = object()
monkeypatch.setattr(scheduler, "_find_codex_generated_images", lambda *a, **k: [])

def slow_lines():
time.sleep(0.2) # let the delay-0 timer fire mid-stream
yield json.dumps({"type": "thread.started", "thread_id": "t-1"}) + "\n"

fake = FakePopen(
slow_lines(),
stderr_lines=["Reading additional input from stdin...\n"],
returncode=0,
)
with patch.object(taskboard.subprocess, "Popen", return_value=fake):
scheduler._execute_task(task)

refreshed = db.get_task(tid)
assert refreshed["status"] == "failed"
# error summary states the timeout, not the stderr noise
assert "timed out" in (refreshed["error"] or "")
assert "stdin" not in (refreshed["error"] or "")


# ── _run_agent_command: heartbeat/skill-sweep invocation ─────────────────────
def test_run_agent_command_claude_returns_result_text(scheduler):
db = scheduler.db
Expand Down
69 changes: 69 additions & 0 deletions tests/test_feishu_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def test_start_wires_handlers_and_spawns_thread_without_connecting(self):
"register_p2_im_chat_member_bot_added_v1",
"register_p2_im_message_reaction_created_v1",
"register_p2_im_message_reaction_deleted_v1",
"register_p2_im_message_message_read_v1",
"register_p2_im_message_recalled_v1",
):
getattr(dispatcher_builder, name).return_value = dispatcher_builder
built_handler = Mock()
Expand Down Expand Up @@ -120,6 +122,73 @@ def start(self):
assert started_threads[0].started is True
ws_client.start.assert_not_called()

def test_start_registers_readonly_event_noops(self):
"""message_read / recalled receipts must have (no-op) processors, else
the lark SDK logs 'processor not found, type: im.message.message_read_v1'
on every read receipt.
"""
with patch("channels.feishu_channel.FEISHU_AVAILABLE", True):
from channels.feishu_channel import FeishuChannel

ch = FeishuChannel(Mock(), Mock(), Mock())
ch.db.get_setting.side_effect = lambda key: {
"feishu_app_id": "cli_app_id_value",
"feishu_app_secret": "secret_value",
}.get(key, "")

fake_lark = Mock()
built_client = Mock()
fake_lark.Client.builder.return_value.app_id.return_value.app_secret.return_value.log_level.return_value.build.return_value = built_client
dispatcher_builder = fake_lark.EventDispatcherHandler.builder.return_value
# every register call returns the builder so the fluent chain holds
for name in (
"register_p2_im_message_receive_v1",
"register_p2_im_chat_member_bot_added_v1",
"register_p2_im_message_reaction_created_v1",
"register_p2_im_message_reaction_deleted_v1",
"register_p2_im_message_message_read_v1",
"register_p2_im_message_recalled_v1",
):
getattr(dispatcher_builder, name).return_value = dispatcher_builder
dispatcher_builder.build.return_value = Mock()
fake_lark.ws.Client.return_value = Mock()

with (
patch("channels.feishu_channel.lark", fake_lark),
patch("channels.feishu_channel.threading.Thread", Mock()),
):
ch.start()

dispatcher_builder.register_p2_im_message_message_read_v1.assert_called_once()
dispatcher_builder.register_p2_im_message_recalled_v1.assert_called_once()

def test_start_disables_lark_logger_propagation(self):
"""The lark SDK's 'Lark' logger has its own handler; if it also
propagates to the root logger (basicConfig) every line prints twice.
start() must turn propagation off.
"""
import logging

logging.getLogger("Lark").propagate = True # reset global state

with patch("channels.feishu_channel.FEISHU_AVAILABLE", True):
from channels.feishu_channel import FeishuChannel

ch = FeishuChannel(Mock(), Mock(), Mock())
ch.db.get_setting.side_effect = lambda key: {
"feishu_app_id": "cli_app_id_value",
"feishu_app_secret": "secret_value",
}.get(key, "")

fake_lark = Mock()
with (
patch("channels.feishu_channel.lark", fake_lark),
patch("channels.feishu_channel.threading.Thread", Mock()),
):
ch.start()

assert logging.getLogger("Lark").propagate is False

def test_start_handles_build_exception(self):
with patch("channels.feishu_channel.FEISHU_AVAILABLE", True):
from channels.feishu_channel import FeishuChannel
Expand Down
Loading