Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions services/hermes_platform_gateway/hermes-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ model:
api_key: ${VLLM_API_KEY}
api_mode: chat_completions

display:
busy_input_mode: interrupt

plugins:
enabled:
- hermes_platform_gateway
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ def register(ctx) -> None:
# correlation IDs (author, interaction_id, turn_id, event_id).
_turn_lock = threading.Lock()
_current_turn: list[Any] = [None] # [BaseEvent | None]
_generating: list[bool] = [False] # [bool] — True while Hermes LLM loop is active

def _on_agent_start(event: Any) -> None:
text = _extract_user_text(event)
if not text:
logger.warning("platform-gateway: AGENT_START with no extractable text, skipping")
return
# Clear any stale stop flag from a previous turn that completed before
# the cancel signal arrived (race condition: cancel arrives after post_llm_call).
gateway._stop_requested.clear()
with _turn_lock:
_current_turn[0] = event
if not ctx.inject_message(text):
Expand All @@ -71,11 +75,19 @@ def _on_agent_start(event: Any) -> None:
from hermes_platform_gateway.client import GatewayConnection, build_ws_url
ws_url = build_ws_url(base_url)
headers = {"channel-id": channel_id, "space-id": space_id, "api-key": api_key}
def _on_stop() -> None:
if not _generating[0]:
logger.info("platform-gateway: stop signal received but generation already finished — skipping /stop")
return
logger.info("platform-gateway: stop signal — injecting /stop to interrupt generation")
ctx.inject_message("/stop")

gateway = GatewayConnection(
ws_url=ws_url,
headers=headers,
timeout=timeout,
on_agent_start=_on_agent_start,
on_stop=_on_stop,
)
gateway.start()
except Exception as exc:
Expand Down Expand Up @@ -140,17 +152,53 @@ def _on_agent_start(event: Any) -> None:
# path). Instead we deliver one final AGENT_END carrying the full
# assistant text — same shape `domyn expose`'s Runtime uses.

_STOP_MESSAGE = "Response stopped as per user request."

def _on_interrupted(
interrupted: bool = False,
**_: Any,
) -> None:
# Fires at the end of every run_conversation() call. When interrupted=True
# and a stop was requested, post_llm_call never fires so we send the
# canned AGENT_END here.
if not interrupted or not gateway._stop_requested.is_set():
return
from domyn_agents.core import BaseEvent, ExecutionEventType, Part
with _turn_lock:
turn = _current_turn[0]
_current_turn[0] = None
gateway._stop_requested.clear()
if turn is None:
logger.warning("platform-gateway: interrupted but no active turn to acknowledge")
return
logger.info("platform-gateway: generation interrupted, sending canned AGENT_END for turn %s", turn.event_id)
event = BaseEvent(
event_type=ExecutionEventType.AGENT_END,
author=turn.author,
event_id=turn.event_id,
interaction_id=turn.interaction_id,
turn_id=turn.turn_id,
conversation_id=turn.conversation_id,
content=[Part(text=_STOP_MESSAGE)],
)
gateway.send_event(event)

def _on_turn_complete(
assistant_response: str = "",
session_id: str | None = None,
**_: Any,
) -> None:
# post_llm_call fires only when generation completed normally (no /stop).
# If a stop arrived after generation already finished (race condition),
# _stop_requested is still set — handle it here as a fallback.
from domyn_agents.core import BaseEvent, ExecutionEventType, Part
with _turn_lock:
turn = _current_turn[0]
_current_turn[0] = None
if turn is None:
return
if gateway._stop_requested.is_set():
gateway._stop_requested.clear()
assistant_response = _STOP_MESSAGE
event = BaseEvent(
event_type=ExecutionEventType.AGENT_END,
author=turn.author,
Expand All @@ -163,7 +211,16 @@ def _on_turn_complete(
gateway.send_event(event)
logger.debug("platform-gateway: sent AGENT_END for turn %s", turn.event_id)

def _on_generation_start(**_: Any) -> None:
_generating[0] = True

def _on_generation_end(**_: Any) -> None:
_generating[0] = False

ctx.register_hook("on_session_end", _on_interrupted)
ctx.register_hook("post_llm_call", _on_turn_complete)
ctx.register_hook("pre_llm_call", _on_generation_start)
ctx.register_hook("on_session_end", _on_generation_end)


def _make_handler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,19 @@ def __init__(
headers: dict[str, str],
timeout: float = 120.0,
on_agent_start: Callable[["BaseEvent"], None] | None = None,
on_stop: Callable[[], None] | None = None,
) -> None:
self._ws_url = ws_url
self._headers = headers
self._timeout = timeout
self._on_agent_start = on_agent_start
self._on_stop = on_stop
self._pending: dict[str, concurrent.futures.Future] = {}
self._lock = threading.Lock()
self._loop: asyncio.AbstractEventLoop | None = None
self._ws: Any = None
self._ready = threading.Event()
self._stop_requested = threading.Event()

def start(self) -> None:
"""Spawn the background WebSocket thread and wait up to 15s for first connect."""
Expand Down Expand Up @@ -174,13 +177,31 @@ async def _receive_loop(self, ws: Any) -> None:
continue

if event.event_type == ExecutionEventType.AGENT_START:
self._stop_requested.clear()
if self._on_agent_start is not None:
try:
self._on_agent_start(event)
except Exception as exc:
logger.warning("platform-gateway: on_agent_start callback failed - %s", exc)
continue

if event.event_type == ExecutionEventType.AGENT_END:
# Only treat it as a cancellation signal if it carries the explicit
# user_cancelled error code. This avoids false triggers from echoed
# AGENT_ENDs that the plugin itself sent (normal turn completions).
if getattr(event, "error_code", None) == "user_cancelled":
logger.info(
"platform-gateway: received cancellation signal from platform — stopping current turn"
)
self._stop_requested.set()
self._fail_pending("Execution cancelled by user")
if self._on_stop is not None:
try:
self._on_stop()
except Exception as exc:
logger.warning("platform-gateway: on_stop callback failed - %s", exc)
continue

if event.event_type not in (
ExecutionEventType.TOOL_END,
ExecutionEventType.TOOL_ERROR,
Expand Down
4 changes: 3 additions & 1 deletion services/langgraph_agent_example/agent_expose.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ def add_numbers(a: float, b: float) -> float:


@tool
def multiply_numbers(a: float, b: float) -> float:
async def multiply_numbers(a: float, b: float) -> float:
"""Multiply two numbers together."""
import asyncio
await asyncio.sleep(10)
return a * b


Expand Down