From 95460802f4434af7fcdb54e7516973282a875c2a Mon Sep 17 00:00:00 2001 From: Tony Narlock Date: Sun, 19 Apr 2026 12:57:05 -0500 Subject: [PATCH] fix(wait_for_tools): unblock event loop via asyncio.to_thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit wait_for_channel and signal_channel were sync `def` handlers running blocking subprocess.run(timeout=N) on FastMCP's event loop — for the duration of the wait the server couldn't service other tool calls, MCP pings, or client cancellations, and clients whose stdio keepalive was shorter than the timeout would disconnect mid-wait. Port to async def + asyncio.to_thread, matching the pattern already used by wait_for_text / wait_for_content_change (added in 0.1.0a2 via 0a408fe). Swap @handle_tool_errors → @handle_tool_errors_async on both. The narrow subprocess.* except blocks and the async decorator's Exception-only catch mean asyncio.CancelledError now propagates through naturally — tested as a regression guard. Also convert the one existing caller of wait_for_channel in tests/test_pane_tools.py to asyncio.run() so mypy stays clean. Closes #18 --- CHANGES | 8 ++ src/libtmux_mcp/tools/wait_for_tools.py | 29 +++-- tests/test_pane_tools.py | 7 +- tests/test_wait_for_tools.py | 145 ++++++++++++++++++++---- 4 files changed, 161 insertions(+), 28 deletions(-) diff --git a/CHANGES b/CHANGES index ffd8b2a..47b347c 100644 --- a/CHANGES +++ b/CHANGES @@ -26,6 +26,14 @@ _Notes on upcoming releases will be added here_ `_caller_is_on_server` (the same socket-scoped comparator used by the self-kill guard) via the new `_compute_is_caller` helper (#22, fixes #19). +- {tooliconl}`wait-for-channel` and {tooliconl}`signal-channel` no + longer block the FastMCP event loop. Both were sync `def` handlers + running `subprocess.run(timeout=N)` on the main loop — for the + duration of the wait the server could not service other tool calls, + MCP pings, or client cancellations. Ported to `async def` + + {func}`asyncio.to_thread`, matching the pattern already used by + {tooliconl}`wait-for-text` / {tooliconl}`wait-for-content-change` + (#21, fixes #18). ## libtmux-mcp 0.1.0a2 (2026-04-19) diff --git a/src/libtmux_mcp/tools/wait_for_tools.py b/src/libtmux_mcp/tools/wait_for_tools.py index 5e2e089..9d238a8 100644 --- a/src/libtmux_mcp/tools/wait_for_tools.py +++ b/src/libtmux_mcp/tools/wait_for_tools.py @@ -24,6 +24,7 @@ from __future__ import annotations +import asyncio import re import subprocess import typing as t @@ -35,7 +36,7 @@ TAG_MUTATING, _get_server, _tmux_argv, - handle_tool_errors, + handle_tool_errors_async, ) if t.TYPE_CHECKING: @@ -96,8 +97,8 @@ def _validate_channel_name(name: str) -> str: return name -@handle_tool_errors -def wait_for_channel( +@handle_tool_errors_async +async def wait_for_channel( channel: str, timeout: float = 30.0, socket_name: str | None = None, @@ -143,7 +144,15 @@ def wait_for_channel( cname = _validate_channel_name(channel) argv = _tmux_argv(server, "wait-for", cname) try: - subprocess.run(argv, check=True, capture_output=True, timeout=timeout) + # FastMCP direct-awaits async tools on its event loop. ``tmux + # wait-for`` blocks for the full timeout (up to 30 s by default) + # so the synchronous ``subprocess.run`` must run off the loop — + # otherwise no other tool call, MCP ping, or cancellation can + # be serviced for the duration of the wait. Mirror the pattern + # used by :func:`~libtmux_mcp.tools.pane_tools.wait.wait_for_text`. + await asyncio.to_thread( + subprocess.run, argv, check=True, capture_output=True, timeout=timeout + ) except subprocess.TimeoutExpired as e: msg = f"wait-for timeout: channel {cname!r} was not signalled within {timeout}s" raise ToolError(msg) from e @@ -154,8 +163,8 @@ def wait_for_channel( return f"Channel {cname!r} was signalled" -@handle_tool_errors -def signal_channel( +@handle_tool_errors_async +async def signal_channel( channel: str, socket_name: str | None = None, ) -> str: @@ -180,8 +189,12 @@ def signal_channel( cname = _validate_channel_name(channel) argv = _tmux_argv(server, "wait-for", "-S", cname) try: - subprocess.run( - argv, check=True, capture_output=True, timeout=_SIGNAL_TIMEOUT_SECONDS + await asyncio.to_thread( + subprocess.run, + argv, + check=True, + capture_output=True, + timeout=_SIGNAL_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired as e: msg = ( diff --git a/tests/test_pane_tools.py b/tests/test_pane_tools.py index b86f839..c42b105 100644 --- a/tests/test_pane_tools.py +++ b/tests/test_pane_tools.py @@ -742,6 +742,7 @@ def test_search_panes_per_pane_matched_lines_cap( ``capture_pane`` (command-line plus output-line for each), well past the truncation threshold of three. """ + import asyncio import uuid from libtmux_mcp.tools.wait_for_tools import wait_for_channel @@ -753,7 +754,11 @@ def test_search_panes_per_pane_matched_lines_cap( f"tmux wait-for -S {channel}" ) mcp_pane.send_keys(payload, enter=True) - wait_for_channel(channel=channel, timeout=5.0, socket_name=mcp_server.socket_name) + asyncio.run( + wait_for_channel( + channel=channel, timeout=5.0, socket_name=mcp_server.socket_name + ) + ) result = search_panes( pattern=marker, diff --git a/tests/test_wait_for_tools.py b/tests/test_wait_for_tools.py index daf3f3c..c086c50 100644 --- a/tests/test_wait_for_tools.py +++ b/tests/test_wait_for_tools.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import threading import time import typing as t @@ -38,6 +39,20 @@ def test_validate_channel_name_rejects_invalid(name: str) -> None: _validate_channel_name(name) +def test_channel_tools_are_coroutines() -> None: + """Both tools must be ``async def`` so FastMCP awaits them. + + Regression guard for tmux-python/libtmux-mcp#18: sync ``def`` tools + were direct-called on FastMCP's event loop and the internal + ``subprocess.run`` blocked stdio for the full timeout window. The + fix converts both to ``async def`` + ``asyncio.to_thread``; this + assertion pins the async surface so a silent revert doesn't sneak + through. + """ + assert asyncio.iscoroutinefunction(wait_for_channel) + assert asyncio.iscoroutinefunction(signal_channel) + + @pytest.mark.usefixtures("mcp_session") def test_signal_channel_no_waiter_is_noop(mcp_server: Server) -> None: """``tmux wait-for -S`` on an unwaited channel returns successfully. @@ -47,9 +62,11 @@ def test_signal_channel_no_waiter_is_noop(mcp_server: Server) -> None: unstarted Server instance, so ``mcp_session`` is what actually boots the tmux process. """ - result = signal_channel( - channel="wf_test_noop", - socket_name=mcp_server.socket_name, + result = asyncio.run( + signal_channel( + channel="wf_test_noop", + socket_name=mcp_server.socket_name, + ) ) assert "signalled" in result @@ -61,15 +78,17 @@ def test_wait_for_channel_returns_when_signalled(mcp_server: Server) -> None: def _signal_after_delay() -> None: time.sleep(0.3) - signal_channel(channel=channel, socket_name=mcp_server.socket_name) + asyncio.run(signal_channel(channel=channel, socket_name=mcp_server.socket_name)) thread = threading.Thread(target=_signal_after_delay) thread.start() try: - result = wait_for_channel( - channel=channel, - timeout=5.0, - socket_name=mcp_server.socket_name, + result = asyncio.run( + wait_for_channel( + channel=channel, + timeout=5.0, + socket_name=mcp_server.socket_name, + ) ) assert "signalled" in result finally: @@ -81,10 +100,12 @@ def test_wait_for_channel_times_out(mcp_server: Server) -> None: """Unsignalled channel raises a timeout ``ToolError`` within the cap.""" start = time.monotonic() with pytest.raises(ToolError, match="wait-for timeout"): - wait_for_channel( - channel="wf_timeout_test", - timeout=0.5, - socket_name=mcp_server.socket_name, + asyncio.run( + wait_for_channel( + channel="wf_timeout_test", + timeout=0.5, + socket_name=mcp_server.socket_name, + ) ) elapsed = time.monotonic() - start # Allow generous slack for tmux subprocess spawn overhead. @@ -94,17 +115,103 @@ def test_wait_for_channel_times_out(mcp_server: Server) -> None: def test_wait_for_channel_rejects_invalid_name(mcp_server: Server) -> None: """Invalid channel names are rejected before spawning tmux.""" with pytest.raises(ToolError, match="Invalid channel name"): - wait_for_channel( - channel="has space", - timeout=1.0, - socket_name=mcp_server.socket_name, + asyncio.run( + wait_for_channel( + channel="has space", + timeout=1.0, + socket_name=mcp_server.socket_name, + ) ) def test_signal_channel_rejects_invalid_name(mcp_server: Server) -> None: """Invalid channel names are rejected before spawning tmux.""" with pytest.raises(ToolError, match="Invalid channel name"): - signal_channel( - channel="has/slash", - socket_name=mcp_server.socket_name, + asyncio.run( + signal_channel( + channel="has/slash", + socket_name=mcp_server.socket_name, + ) ) + + +@pytest.mark.usefixtures("mcp_session") +def test_wait_for_channel_does_not_block_event_loop(mcp_server: Server) -> None: + """Concurrent coroutines must make progress while the wait is pending. + + Regression guard for tmux-python/libtmux-mcp#18. Before the fix, + ``subprocess.run`` blocked the FastMCP event loop for the full + timeout; the ticker below would advance only between poll iterations + (which there aren't any of — the subprocess is a single blocking + call). With ``asyncio.to_thread`` the ticker must fire many times + while the tmux subprocess waits for its signal. + + Discriminator: the wait is set to 0.5 s on an unsignalled channel. + The ticker samples at 10 ms. With the fix we expect ≥ 20 ticks + (500 ms / 10 ms = 50 nominal, halved to guard against CI jitter); + without the fix we expect 0 — the event loop is pinned in + ``subprocess.run`` until it times out. + """ + + async def _drive() -> int: + ticks = 0 + stop = asyncio.Event() + + async def _ticker() -> None: + nonlocal ticks + while not stop.is_set(): + ticks += 1 + await asyncio.sleep(0.01) + + async def _waiter() -> None: + try: + with pytest.raises(ToolError, match="wait-for timeout"): + await wait_for_channel( + channel="wf_evtloop_test", + timeout=0.5, + socket_name=mcp_server.socket_name, + ) + finally: + stop.set() + + await asyncio.gather(_ticker(), _waiter()) + return ticks + + ticks = asyncio.run(_drive()) + assert ticks >= 20, ( + f"ticker advanced only {ticks} times — wait_for_channel is blocking " + f"the event loop instead of running the subprocess in a thread" + ) + + +@pytest.mark.usefixtures("mcp_session") +def test_wait_for_channel_propagates_cancellation(mcp_server: Server) -> None: + """``wait_for_channel`` raises ``CancelledError`` (not ``ToolError``). + + MCP cancellation semantics: when a client cancels an in-flight tool + call, the awaiting ``asyncio.Task`` receives ``CancelledError``. + ``handle_tool_errors_async`` catches ``Exception`` (not + ``BaseException``), and the function's narrow ``subprocess.*`` + except-blocks cannot swallow ``CancelledError`` either — so the + cancellation propagates through the decorator naturally. This test + locks that contract in so a future broadening of the catch + (e.g. ``except BaseException``) trips immediately. + + Uses ``task.cancel()`` rather than ``asyncio.wait_for`` so the + raised exception is the inner ``CancelledError`` directly. + """ + + async def _runner() -> None: + task = asyncio.create_task( + wait_for_channel( + channel="wf_cancel_test", + timeout=10.0, + socket_name=mcp_server.socket_name, + ) + ) + await asyncio.sleep(0.1) # let the to_thread handoff start + task.cancel() + await task + + with pytest.raises(asyncio.CancelledError): + asyncio.run(_runner())