diff --git a/services/hermes_platform_gateway/hermes-config.yaml b/services/hermes_platform_gateway/hermes-config.yaml index 27e9822..a03ec2c 100644 --- a/services/hermes_platform_gateway/hermes-config.yaml +++ b/services/hermes_platform_gateway/hermes-config.yaml @@ -7,6 +7,9 @@ model: api_key: ${VLLM_API_KEY} api_mode: chat_completions +display: + busy_input_mode: interrupt + plugins: enabled: - hermes_platform_gateway diff --git a/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/__init__.py b/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/__init__.py index 63e0c85..cb6acad 100644 --- a/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/__init__.py +++ b/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/__init__.py @@ -54,12 +54,16 @@ def register(ctx) -> None: # correlation IDs (author, interaction_id, turn_id, event_id). _turn_lock = threading.Lock() _current_turn: list[Any] = [None] # [BaseEvent | None] + _generating: list[bool] = [False] # [bool] — True while Hermes LLM loop is active def _on_agent_start(event: Any) -> None: text = _extract_user_text(event) if not text: logger.warning("platform-gateway: AGENT_START with no extractable text, skipping") return + # Clear any stale stop flag from a previous turn that completed before + # the cancel signal arrived (race condition: cancel arrives after post_llm_call). + gateway._stop_requested.clear() with _turn_lock: _current_turn[0] = event if not ctx.inject_message(text): @@ -71,11 +75,19 @@ def _on_agent_start(event: Any) -> None: from hermes_platform_gateway.client import GatewayConnection, build_ws_url ws_url = build_ws_url(base_url) headers = {"channel-id": channel_id, "space-id": space_id, "api-key": api_key} + def _on_stop() -> None: + if not _generating[0]: + logger.info("platform-gateway: stop signal received but generation already finished — skipping /stop") + return + logger.info("platform-gateway: stop signal — injecting /stop to interrupt generation") + ctx.inject_message("/stop") + gateway = GatewayConnection( ws_url=ws_url, headers=headers, timeout=timeout, on_agent_start=_on_agent_start, + on_stop=_on_stop, ) gateway.start() except Exception as exc: @@ -140,17 +152,53 @@ def _on_agent_start(event: Any) -> None: # path). Instead we deliver one final AGENT_END carrying the full # assistant text — same shape `domyn expose`'s Runtime uses. + _STOP_MESSAGE = "Response stopped as per user request." + + def _on_interrupted( + interrupted: bool = False, + **_: Any, + ) -> None: + # Fires at the end of every run_conversation() call. When interrupted=True + # and a stop was requested, post_llm_call never fires so we send the + # canned AGENT_END here. + if not interrupted or not gateway._stop_requested.is_set(): + return + from domyn_agents.core import BaseEvent, ExecutionEventType, Part + with _turn_lock: + turn = _current_turn[0] + _current_turn[0] = None + gateway._stop_requested.clear() + if turn is None: + logger.warning("platform-gateway: interrupted but no active turn to acknowledge") + return + logger.info("platform-gateway: generation interrupted, sending canned AGENT_END for turn %s", turn.event_id) + event = BaseEvent( + event_type=ExecutionEventType.AGENT_END, + author=turn.author, + event_id=turn.event_id, + interaction_id=turn.interaction_id, + turn_id=turn.turn_id, + conversation_id=turn.conversation_id, + content=[Part(text=_STOP_MESSAGE)], + ) + gateway.send_event(event) + def _on_turn_complete( assistant_response: str = "", - session_id: str | None = None, **_: Any, ) -> None: + # post_llm_call fires only when generation completed normally (no /stop). + # If a stop arrived after generation already finished (race condition), + # _stop_requested is still set — handle it here as a fallback. from domyn_agents.core import BaseEvent, ExecutionEventType, Part with _turn_lock: turn = _current_turn[0] _current_turn[0] = None if turn is None: return + if gateway._stop_requested.is_set(): + gateway._stop_requested.clear() + assistant_response = _STOP_MESSAGE event = BaseEvent( event_type=ExecutionEventType.AGENT_END, author=turn.author, @@ -163,7 +211,16 @@ def _on_turn_complete( gateway.send_event(event) logger.debug("platform-gateway: sent AGENT_END for turn %s", turn.event_id) + def _on_generation_start(**_: Any) -> None: + _generating[0] = True + + def _on_generation_end(**_: Any) -> None: + _generating[0] = False + + ctx.register_hook("on_session_end", _on_interrupted) ctx.register_hook("post_llm_call", _on_turn_complete) + ctx.register_hook("pre_llm_call", _on_generation_start) + ctx.register_hook("on_session_end", _on_generation_end) def _make_handler( diff --git a/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/client.py b/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/client.py index 8b252b7..13d2f9f 100644 --- a/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/client.py +++ b/services/hermes_platform_gateway/plugins/hermes-platform-gateway/hermes_platform_gateway/client.py @@ -108,16 +108,19 @@ def __init__( headers: dict[str, str], timeout: float = 120.0, on_agent_start: Callable[["BaseEvent"], None] | None = None, + on_stop: Callable[[], None] | None = None, ) -> None: self._ws_url = ws_url self._headers = headers self._timeout = timeout self._on_agent_start = on_agent_start + self._on_stop = on_stop self._pending: dict[str, concurrent.futures.Future] = {} self._lock = threading.Lock() self._loop: asyncio.AbstractEventLoop | None = None self._ws: Any = None self._ready = threading.Event() + self._stop_requested = threading.Event() def start(self) -> None: """Spawn the background WebSocket thread and wait up to 15s for first connect.""" @@ -174,6 +177,7 @@ async def _receive_loop(self, ws: Any) -> None: continue if event.event_type == ExecutionEventType.AGENT_START: + self._stop_requested.clear() if self._on_agent_start is not None: try: self._on_agent_start(event) @@ -181,6 +185,23 @@ async def _receive_loop(self, ws: Any) -> None: logger.warning("platform-gateway: on_agent_start callback failed - %s", exc) continue + if event.event_type == ExecutionEventType.AGENT_END: + # Only treat it as a cancellation signal if it carries the explicit + # user_cancelled error code. This avoids false triggers from echoed + # AGENT_ENDs that the plugin itself sent (normal turn completions). + if getattr(event, "error_code", None) == "user_cancelled": + logger.info( + "platform-gateway: received cancellation signal from platform — stopping current turn" + ) + self._stop_requested.set() + self._fail_pending("Execution cancelled by user") + if self._on_stop is not None: + try: + self._on_stop() + except Exception as exc: + logger.warning("platform-gateway: on_stop callback failed - %s", exc) + continue + if event.event_type not in ( ExecutionEventType.TOOL_END, ExecutionEventType.TOOL_ERROR, diff --git a/services/langgraph_agent_example/agent_expose.py b/services/langgraph_agent_example/agent_expose.py index caca5ee..c74669f 100644 --- a/services/langgraph_agent_example/agent_expose.py +++ b/services/langgraph_agent_example/agent_expose.py @@ -46,8 +46,10 @@ def add_numbers(a: float, b: float) -> float: @tool -def multiply_numbers(a: float, b: float) -> float: +async def multiply_numbers(a: float, b: float) -> float: """Multiply two numbers together.""" + import asyncio + await asyncio.sleep(10) return a * b