diff --git a/.agents/skills/test-python-binding/SKILL.md b/.agents/skills/test-python-binding/SKILL.md index 330eda60..b3081130 100644 --- a/.agents/skills/test-python-binding/SKILL.md +++ b/.agents/skills/test-python-binding/SKILL.md @@ -38,6 +38,13 @@ Use this skill when the change is primarily in `python/nemo_flow`, - The name of the mocked class should be prefixed with `mock`, not `fake`. - Prefer pytest fixtures over helper methods. - Do not repeat fixtures, if a fixture is needed in multiple test files, place it in a `conftest.py` file. +- When creating a fixture follow this pattern: + ```python + @pytest.fixture(name=""[, scope=""]) + def _fixture() -> : + ... + ``` + Only specify the scope argument when the value is something other than "function". - Prefer `pytest.mark.parametrize` over creating individual tests for different input types. diff --git a/.github/ci-path-filters.yml b/.github/ci-path-filters.yml index e6dcf43a..ec6a2767 100644 --- a/.github/ci-path-filters.yml +++ b/.github/ci-path-filters.yml @@ -67,6 +67,20 @@ python_package: - 'rust-toolchain.toml' - 'uv.lock' +python_integration_langchain: + # Includes LangGraph and DeepAgents integrations as well + - 'justfile' + - 'pyproject.toml' + - 'python/nemo_flow/integrations/deepagents/**' + - 'python/nemo_flow/integrations/langchain/**' + - 'python/nemo_flow/integrations/langgraph/**' + - 'python/tests/integrations/conftest.py' + - 'python/tests/integrations/deepagents_tests/**' + - 'python/tests/integrations/langchain_tests/**' + - 'python/tests/integrations/langgraph_tests/**' + - 'rust-toolchain.toml' + - 'uv.lock' + wasm_package: - 'Cargo.lock' - 'Cargo.toml' diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 29890de1..66608177 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -127,6 +127,7 @@ jobs: with: full_ci: ${{ needs.prepare.outputs.full_ci == 'true' }} base: ${{ needs.ci_changes.outputs.base }} + run_python_integration_langchain: ${{ needs.ci_changes.outputs.run_python_integration_langchain == 'true' }} ci_license_diff: name: License Diff @@ -203,7 +204,7 @@ jobs: name: Python needs: [prepare, ci_changes, ci_check] uses: ./.github/workflows/ci_python.yml - if: ${{ needs.ci_check.result == 'success' && needs.ci_changes.outputs.run_python == 'true' }} + if: ${{ needs.ci_check.result == 'success' && ( needs.ci_changes.outputs.run_python == 'true' || needs.ci_changes.outputs.run_python_integration_langchain == 'true' ) }} permissions: contents: read secrets: @@ -211,7 +212,8 @@ jobs: with: ref_type: ${{ github.ref_type }} ref_name: ${{ github.ref_name }} - run_package: ${{ needs.ci_changes.outputs.run_python_package == 'true' }} + run_package: ${{ ( needs.ci_changes.outputs.run_python == 'true' || needs.ci_changes.outputs.run_python_integration_langchain == 'true' ) }} + run_integration_langchain: ${{ needs.ci_changes.outputs.run_python_integration_langchain == 'true' }} ci_wasm: name: WebAssembly diff --git a/.github/workflows/ci_changes.yml b/.github/workflows/ci_changes.yml index 6785992a..ddf02859 100644 --- a/.github/workflows/ci_changes.yml +++ b/.github/workflows/ci_changes.yml @@ -48,6 +48,9 @@ on: run_python: description: 'Whether Python jobs should run' value: ${{ jobs.changes.outputs.run_python }} + run_python_integration_langchain: + description: 'Whether LangChain, LangGraph, and DeepAgents Python integration jobs should run' + value: ${{ jobs.changes.outputs.run_python_integration_langchain }} run_python_package: description: 'Whether Python packaging jobs should run' value: ${{ jobs.changes.outputs.run_python_package }} @@ -80,6 +83,7 @@ jobs: run_node_package: ${{ inputs.full_ci || steps.filter.outputs.ci == 'true' || steps.filter.outputs.node_package == 'true' }} run_openclaw: ${{ inputs.full_ci || steps.filter.outputs.ci == 'true' || steps.filter.outputs.shared == 'true' || steps.filter.outputs.node == 'true' || steps.filter.outputs.openclaw == 'true' }} run_python: ${{ inputs.full_ci || steps.filter.outputs.ci == 'true' || steps.filter.outputs.shared == 'true' || steps.filter.outputs.python == 'true' }} + run_python_integration_langchain: ${{ inputs.full_ci || steps.filter.outputs.ci == 'true' || steps.filter.outputs.shared == 'true' || steps.filter.outputs.python_integration_langchain == 'true' }} run_python_package: ${{ inputs.full_ci || steps.filter.outputs.ci == 'true' || steps.filter.outputs.python_package == 'true' }} run_rust: ${{ inputs.full_ci || steps.filter.outputs.ci == 'true' || steps.filter.outputs.shared == 'true' || steps.filter.outputs.rust == 'true' }} run_rust_package: ${{ inputs.full_ci || steps.filter.outputs.ci == 'true' || steps.filter.outputs.rust_package == 'true' }} diff --git a/.github/workflows/ci_check.yml b/.github/workflows/ci_check.yml index 131c5e6e..e11e78af 100644 --- a/.github/workflows/ci_check.yml +++ b/.github/workflows/ci_check.yml @@ -11,6 +11,11 @@ on: required: false default: false type: boolean + run_python_integration_langchain: + description: 'Whether LangChain, LangGraph, and DeepAgents Python integration checks may need optional extras' + required: false + default: false + type: boolean base: description: 'The comparison base used for filtered pre-commit checks' required: false @@ -104,12 +109,18 @@ jobs: FULL_CI: ${{ inputs.full_ci }} PRE_COMMIT_BASE: ${{ inputs.base }} PRE_COMMIT_HOME: ${{ runner.temp }}/.cache/pre-commit + PYTHON_INTEGRATION_LANGCHAIN: ${{ inputs.run_python_integration_langchain }} # The attribution hook syncs docs deps; do not apply repo warning policy to third-party Rust builds. RUSTFLAGS: "" run: | set -e uv tool install pre-commit==${{ steps.ci-config.outputs.pre_commit_version }} - uv sync --inexact --no-install-project --no-install-package nemo-flow --extra langchain --extra langgraph --extra deepagents + FLOW_CI_UV_SYNC_EXTRA_ARGS=() + if [[ "$PYTHON_INTEGRATION_LANGCHAIN" == "true" ]]; then + FLOW_CI_UV_SYNC_EXTRA_ARGS+=(--extra langchain --extra langgraph --extra deepagents) + fi + + uv sync --inexact --no-install-project --no-install-package nemo-flow "${FLOW_CI_UV_SYNC_EXTRA_ARGS[@]}" if [[ "$FULL_CI" == "true" || -z "$PRE_COMMIT_BASE" ]]; then pre-commit run --all-files --show-diff-on-failure else diff --git a/.github/workflows/ci_python.yml b/.github/workflows/ci_python.yml index ebbc4fd0..7242f1cb 100644 --- a/.github/workflows/ci_python.yml +++ b/.github/workflows/ci_python.yml @@ -19,6 +19,11 @@ on: required: false default: true type: boolean + run_integration_langchain: + description: 'Whether to run LangChain, LangGraph, and DeepAgents Python integration tests' + required: false + default: false + type: boolean secrets: CODECOV_TOKEN: required: false @@ -131,6 +136,11 @@ jobs: working-directory: ${{ env.NEMO_FLOW_CI_WORKSPACE }} run: just --set ci true --set output_dir "${{ github.workspace }}" test-python + - name: Run Python LangChain integration tests + if: ${{ inputs.run_integration_langchain == 'true' }} + working-directory: ${{ env.NEMO_FLOW_CI_WORKSPACE }} + run: just --set ci true --set output_dir "${{ github.workspace }}" test-python-langchain + - name: Upload Python coverage to Codecov uses: codecov/codecov-action@57e3a136b779b570ffcdbf80b3bdc90e7fab3de2 # v6 with: diff --git a/justfile b/justfile index 18dcbae6..4ebd4a91 100644 --- a/justfile +++ b/justfile @@ -854,12 +854,12 @@ test-python: fi cargo test -p nemo-flow-python --lib fi - uv sync --inexact --no-install-project --no-install-package nemo-flow --extra langchain --extra langgraph --extra deepagents + uv sync --inexact --no-install-project --no-install-package nemo-flow activate_project_venv python_executable="$(project_python_executable)" use_project_python_source "$python_executable" "$python_executable" -m maturin develop --skip-install - "$python_executable" -m "${pytest_cmd[@]}" + "$python_executable" -m "${pytest_cmd[@]}" --ignore=python/tests/integrations if is_true "{{ ci }}" && [[ -n "$rust_coverage_out" ]]; then cargo llvm-cov report \ -p nemo-flow-python \ @@ -868,6 +868,21 @@ test-python: --output-path "$rust_coverage_out" fi +test-python-langchain: + #!/usr/bin/env bash + {{ bash_helpers }} + pytest_cmd=(pytest) + cd "$NEMO_FLOW_REPO_ROOT" + uv sync --inexact --no-install-project --no-install-package nemo-flow --extra langchain --extra langgraph --extra deepagents + activate_project_venv + python_executable="$(project_python_executable)" + use_project_python_source "$python_executable" + "$python_executable" -m maturin develop --skip-install + "$python_executable" -m "${pytest_cmd[@]}" \ + python/tests/integrations/deepagents_tests \ + python/tests/integrations/langchain_tests \ + python/tests/integrations/langgraph_tests + # --set [output_dir=] [ci=true|false] test-go: #!/usr/bin/env bash @@ -1012,7 +1027,7 @@ test-wasm: fi # --set [output_dir=] [ci=true|false] -test-all: test-rust test-python test-go test-node test-openclaw test-wasm +test-all: test-rust test-python test-python-langchain test-go test-node test-openclaw test-wasm # [version] or --set ref_name= set-version version="": diff --git a/pyproject.toml b/pyproject.toml index f9167b20..95b140ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,6 +113,10 @@ package = true testpaths = ["python/tests", "third_party/langgraph_tests"] asyncio_mode = "auto" +[tool.coverage.run] +# Exclude integration tests from coverage, since we don't run these by default +omit = ["python/nemo_flow/integrations/*"] + [tool.ty.analysis] # nemo_flow._native is a compiled Rust extension (built by maturin) that only # exists after `uv sync` / `pip install -e .`. Suppress unresolved-import for it. diff --git a/python/tests/integrations/conftest.py b/python/tests/integrations/conftest.py new file mode 100644 index 00000000..a83bf56e --- /dev/null +++ b/python/tests/integrations/conftest.py @@ -0,0 +1,45 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + + +@pytest.fixture(name="integration_langchain", scope="session") +def integration_langchain_fixture() -> types.ModuleType: + """ + Use for integration tests that require LangChain to be installed. + """ + try: + import langchain + + return langchain + except Exception: + pytest.skip(reason="langchain must be installed to run LangChain based tests") + + +@pytest.fixture(name="integration_langgraph", scope="session") +def integration_langgraph_fixture(integration_langchain: types.ModuleType) -> types.ModuleType: + """ + Use for integration tests that require LangGraph to be installed. + """ + try: + import langgraph + + return langgraph + except Exception: + pytest.skip(reason="langgraph must be installed to run LangGraph based tests") + + +@pytest.fixture(name="integration_deepagents", scope="session") +def integration_deepagents_fixture(integration_langgraph: types.ModuleType) -> types.ModuleType: + """ + Use for integration tests that require Deep Agents to be installed. + """ + try: + import deepagents + + return deepagents + except Exception: + pytest.skip(reason="deepagents must be installed to run Deep Agents based tests") diff --git a/python/tests/integrations/deepagents_tests/conftest.py b/python/tests/integrations/deepagents_tests/conftest.py new file mode 100644 index 00000000..f5f69fb5 --- /dev/null +++ b/python/tests/integrations/deepagents_tests/conftest.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + + +@pytest.fixture(name="integration_deepagents", scope="session", autouse=True) +def integration_deepagents_fixture(integration_deepagents: types.ModuleType) -> types.ModuleType: + """ + Override the integration_deepagents fixture to make it autouse + """ + return integration_deepagents diff --git a/python/tests/integrations/deepagents/test_deepagents_integration.py b/python/tests/integrations/deepagents_tests/test_deepagents_integration.py similarity index 71% rename from python/tests/integrations/deepagents/test_deepagents_integration.py rename to python/tests/integrations/deepagents_tests/test_deepagents_integration.py index ea70cb01..ac34267f 100644 --- a/python/tests/integrations/deepagents/test_deepagents_integration.py +++ b/python/tests/integrations/deepagents_tests/test_deepagents_integration.py @@ -5,31 +5,46 @@ from __future__ import annotations +import types from pathlib import Path -from typing import Any, cast +from typing import TYPE_CHECKING, Any, cast from unittest.mock import MagicMock from uuid import uuid4 -from deepagents import create_deep_agent -from deepagents.backends import LocalShellBackend -from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel -from langchain_core.messages import AIMessage, ToolMessage -from langgraph.callbacks import GraphInterruptEvent, GraphResumeEvent -from langgraph.types import Interrupt +import pytest import nemo_flow -from nemo_flow.integrations.deepagents import ( - NemoFlowDeepAgentsCallbackHandler, - NemoFlowDeepAgentsMiddleware, - add_nemo_flow_integration, -) +if TYPE_CHECKING: + from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel -class _MockDeepAgentsChatModel(FakeMessagesListChatModel): - model: str = "mock-model" + import nemo_flow.integrations.deepagents as deepagents_integration - def bind_tools(self, _tools: Any, *_args: Any, **_kwargs: Any) -> _MockDeepAgentsChatModel: - return self + +@pytest.fixture(name="deepagents_integration_module", scope="session") +def deepagents_integration_module_fixture() -> types.ModuleType: + import nemo_flow.integrations.deepagents as deepagents_integration + + return deepagents_integration + + +@pytest.fixture(name="callback_handler") +def callback_handler_fixture( + deepagents_integration_module: types.ModuleType, +) -> deepagents_integration.NemoFlowDeepAgentsCallbackHandler: + return deepagents_integration_module.NemoFlowDeepAgentsCallbackHandler() + + +def _mock_deepagents_chat_model(responses: list[Any]) -> FakeMessagesListChatModel: + from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel + + class _MockDeepAgentsChatModel(FakeMessagesListChatModel): + model: str = "mock-model" + + def bind_tools(self, _tools: Any, *_args: Any, **_kwargs: Any) -> _MockDeepAgentsChatModel: + return self + + return _MockDeepAgentsChatModel(responses=responses) def _filter_mark_events(events: list[nemo_flow.Event]) -> list[nemo_flow.MarkEvent]: @@ -46,8 +61,11 @@ def _mark_metadata(mark: nemo_flow.MarkEvent) -> dict[str, Any]: return cast(dict[str, Any], mark.metadata) -def test_before_agent_emits_configuration_mark(subscribed_events: list[nemo_flow.Event]): - middleware = NemoFlowDeepAgentsMiddleware( +def test_before_agent_emits_configuration_mark( + subscribed_events: list[nemo_flow.Event], + deepagents_integration_module: types.ModuleType, +): + middleware = deepagents_integration_module.NemoFlowDeepAgentsMiddleware( agent_name="main-agent", skills=["/skills/research/"], subagents=[{"name": "researcher"}], @@ -65,8 +83,13 @@ def test_before_agent_emits_configuration_mark(subscribed_events: list[nemo_flow assert _mark_data(marks[0])["backend"] == "StateBackend" -def test_callback_handler_emits_human_in_the_loop_marks(subscribed_events: list[nemo_flow.Event]): - handler = NemoFlowDeepAgentsCallbackHandler() +def test_callback_handler_emits_human_in_the_loop_marks( + subscribed_events: list[nemo_flow.Event], + callback_handler: deepagents_integration.NemoFlowDeepAgentsCallbackHandler, +): + from langgraph.callbacks import GraphInterruptEvent, GraphResumeEvent + from langgraph.types import Interrupt + run_id = uuid4() hitl_request = { "action_requests": [ @@ -80,7 +103,7 @@ def test_callback_handler_emits_human_in_the_loop_marks(subscribed_events: list[ } with nemo_flow.scope.scope("request", nemo_flow.ScopeType.Agent): - handler.on_interrupt( + callback_handler.on_interrupt( GraphInterruptEvent( run_id=run_id, status="interrupt_after", @@ -89,7 +112,7 @@ def test_callback_handler_emits_human_in_the_loop_marks(subscribed_events: list[ interrupts=(Interrupt(hitl_request, id="interrupt-1"),), ) ) - handler.on_resume( + callback_handler.on_resume( GraphResumeEvent( run_id=run_id, status="pending", @@ -108,12 +131,17 @@ def test_callback_handler_emits_human_in_the_loop_marks(subscribed_events: list[ assert _mark_metadata(marks[1])["phase"] == "resume" -def test_callback_handler_falls_back_for_non_hitl_interrupt(subscribed_events: list[nemo_flow.Event]): - handler = NemoFlowDeepAgentsCallbackHandler() +def test_callback_handler_falls_back_for_non_hitl_interrupt( + subscribed_events: list[nemo_flow.Event], + callback_handler: deepagents_integration.NemoFlowDeepAgentsCallbackHandler, +): + from langgraph.callbacks import GraphInterruptEvent, GraphResumeEvent + from langgraph.types import Interrupt + run_id = uuid4() with nemo_flow.scope.scope("request", nemo_flow.ScopeType.Agent): - handler.on_interrupt( + callback_handler.on_interrupt( GraphInterruptEvent( run_id=run_id, status="interrupt_after", @@ -122,7 +150,7 @@ def test_callback_handler_falls_back_for_non_hitl_interrupt(subscribed_events: l interrupts=(Interrupt("custom pause", id="interrupt-1"),), ) ) - handler.on_resume( + callback_handler.on_resume( GraphResumeEvent( run_id=run_id, status="pending", @@ -137,10 +165,10 @@ def test_callback_handler_falls_back_for_non_hitl_interrupt(subscribed_events: l assert "deepagents_kind" not in _mark_metadata(marks[0]) -def test_add_nemo_flow_integration_preserves_backend(): +def test_add_nemo_flow_integration_preserves_backend(deepagents_integration_module: types.ModuleType): mock_backend = MagicMock(name="mock_backend") mock_compiled_subagent = MagicMock(name="mock_compiled_subagent") - kwargs = add_nemo_flow_integration( + kwargs = deepagents_integration_module.add_nemo_flow_integration( model="mock-model", name="main-agent", skills=["/skills/main/"], @@ -153,22 +181,32 @@ def test_add_nemo_flow_integration_preserves_backend(): ) assert kwargs["backend"] is mock_backend - assert any(isinstance(item, NemoFlowDeepAgentsMiddleware) for item in kwargs["middleware"]) - assert any(isinstance(item, NemoFlowDeepAgentsMiddleware) for item in kwargs["subagents"][0]["middleware"]) + assert any( + isinstance(item, deepagents_integration_module.NemoFlowDeepAgentsMiddleware) for item in kwargs["middleware"] + ) + assert any( + isinstance(item, deepagents_integration_module.NemoFlowDeepAgentsMiddleware) + for item in kwargs["subagents"][0]["middleware"] + ) assert kwargs["subagents"][1] is mock_compiled_subagent def test_e2e_agent( tmp_path: Path, subscribed_events: list[nemo_flow.Event], + deepagents_integration_module: types.ModuleType, ): + from deepagents import create_deep_agent + from deepagents.backends import LocalShellBackend + from langchain_core.messages import AIMessage, ToolMessage + reviewer_description = "Reviews filesystem work performed by the main agent." - reviewer_model = _MockDeepAgentsChatModel( + reviewer_model = _mock_deepagents_chat_model( responses=[ AIMessage(content="reviewer verified turtle"), ] ) - model = _MockDeepAgentsChatModel( + model = _mock_deepagents_chat_model( responses=[ AIMessage( content="", @@ -196,7 +234,7 @@ def test_e2e_agent( AIMessage(content="created turtle after reviewer verified turtle"), ] ) - kwargs = add_nemo_flow_integration( + kwargs = deepagents_integration_module.add_nemo_flow_integration( model=model, tools=[], name="main-agent", diff --git a/python/tests/integrations/langchain/test_middleware.py b/python/tests/integrations/langchain/test_middleware.py deleted file mode 100644 index b5b1a18f..00000000 --- a/python/tests/integrations/langchain/test_middleware.py +++ /dev/null @@ -1,291 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""Tests for the LangChain NeMo Flow middleware.""" - -from __future__ import annotations - -import asyncio -from typing import Any -from unittest.mock import AsyncMock, MagicMock - -import pytest -from langchain.agents import create_agent -from langchain.agents.middleware import ModelRequest, ModelResponse, ToolCallRequest -from langchain_core.language_models import BaseChatModel -from langchain_core.messages import AIMessage, HumanMessage, ToolMessage -from langchain_core.tools import tool - -import nemo_flow -from nemo_flow.codecs import AnthropicMessagesCodec, OpenAIChatCodec, OpenAIResponsesCodec -from nemo_flow.integrations.langchain import _serialization -from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware - -_DEFAULT_MOCK_RESPONSE_MSG = "nemo_flow unittest result" - - -def _mk_mock_model(returned_message: str | list[AIMessage] = _DEFAULT_MOCK_RESPONSE_MSG) -> MagicMock: - mock_model = MagicMock(spec=BaseChatModel) - mock_model.bind.return_value = mock_model - mock_model.bind_tools.return_value = mock_model - mock_model.model = "mock-model" - - if isinstance(returned_message, str): - msg = AIMessage(content=returned_message) - mock_model.invoke.return_value = msg - mock_model.ainvoke = AsyncMock(return_value=msg) - else: - mock_model.invoke.side_effect = list(returned_message) - mock_model.ainvoke = AsyncMock(side_effect=list(returned_message)) - - return mock_model - - -class RecordingMiddleware(NemoFlowMiddleware): - def __init__(self) -> None: - super().__init__() - self.calls: list[dict[str, Any]] = [] - - async def _llm_execute( - self, - model_name: str, - request: nemo_flow.LLMRequest, - codec: Any, - response_codec: Any, - func: Any, - ) -> Any: - self.calls.append( - { - "model_name": model_name, - "request": request, - "codec": codec, - "response_codec": response_codec, - } - ) - intercepted = nemo_flow.LLMRequest( - request.headers, - { - **request.content, - "model_settings": {"temperature": 0.25}, - }, - ) - return await func(intercepted) - - -def _model_request() -> ModelRequest[Any]: - mock_model = _mk_mock_model() - - return ModelRequest( - model=mock_model, - messages=[HumanMessage(content="hello")], - model_settings={"temperature": 1.0}, - ) - - -def _tool_call_request() -> ToolCallRequest: - return ToolCallRequest( - tool_call={"name": "lookup", "args": {"query": "original"}, "id": "call-1"}, - tool=None, - state={}, - runtime=MagicMock(), - ) - - -def test_wrap_model_call_routes_through_llm_execute() -> None: - middleware = RecordingMiddleware() - seen_request: ModelRequest[Any] | None = None - - def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: - nonlocal seen_request - seen_request = request - return ModelResponse(result=[AIMessage(content="done")]) - - response = middleware.wrap_model_call(_model_request(), handler) - - assert response.result[0].content == "done" - assert seen_request is not None - assert seen_request.model_settings == {"temperature": 0.25} - assert middleware.calls[0]["model_name"] == "mock-model" - assert middleware.calls[0]["request"].content["model"] == "mock-model" - assert middleware.calls[0]["codec"] is None - assert middleware.calls[0]["response_codec"] is None - - -def test_awrap_model_call_routes_through_llm_execute() -> None: - middleware = RecordingMiddleware() - seen_request: ModelRequest[Any] | None = None - - async def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: - nonlocal seen_request - seen_request = request - return ModelResponse(result=[AIMessage(content="done")]) - - response = asyncio.run(middleware.awrap_model_call(_model_request(), handler)) - - assert response.result[0].content == "done" - assert seen_request is not None - assert seen_request.model_settings == {"temperature": 0.25} - assert middleware.calls[0]["model_name"] == "mock-model" - assert middleware.calls[0]["request"].content["model"] == "mock-model" - assert middleware.calls[0]["codec"] is None - assert middleware.calls[0]["response_codec"] is None - - -def test_wrap_tool_call_routes_through_tool_execute(monkeypatch: pytest.MonkeyPatch) -> None: - middleware = NemoFlowMiddleware() - parent_handle = MagicMock() - seen_request: ToolCallRequest | None = None - - async def execute_side_effect(*, func: Any, **kwargs: Any) -> ToolMessage: - return func({"query": "intercepted"}) - - mock_tool_execute = AsyncMock(side_effect=execute_side_effect) - - def handler(request: ToolCallRequest) -> ToolMessage: - nonlocal seen_request - seen_request = request - return ToolMessage(content="done", tool_call_id=request.tool_call["id"]) - - monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) - monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) - - response = middleware.wrap_tool_call(_tool_call_request(), handler) - - assert response.content == "done" - assert seen_request is not None - assert seen_request.tool_call["args"] == {"query": "intercepted"} - mock_tool_execute.assert_awaited_once() - kwargs = mock_tool_execute.await_args.kwargs - assert kwargs["name"] == "lookup" - assert kwargs["args"] == {"query": "original"} - assert kwargs["handle"] is parent_handle - assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) - assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) - - -def test_awrap_tool_call_routes_through_tool_execute(monkeypatch: pytest.MonkeyPatch) -> None: - middleware = NemoFlowMiddleware() - parent_handle = MagicMock() - seen_request: ToolCallRequest | None = None - - async def execute_side_effect(*, func: Any, **kwargs: Any) -> ToolMessage: - return await func({"query": "intercepted"}) - - mock_tool_execute = AsyncMock(side_effect=execute_side_effect) - - async def handler(request: ToolCallRequest) -> ToolMessage: - nonlocal seen_request - seen_request = request - return ToolMessage(content="done", tool_call_id=request.tool_call["id"]) - - monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) - monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) - - response = asyncio.run(middleware.awrap_tool_call(_tool_call_request(), handler)) - - assert response.content == "done" - assert seen_request is not None - assert seen_request.tool_call["args"] == {"query": "intercepted"} - mock_tool_execute.assert_awaited_once() - kwargs = mock_tool_execute.await_args.kwargs - assert kwargs["name"] == "lookup" - assert kwargs["args"] == {"query": "original"} - assert kwargs["handle"] is parent_handle - assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) - assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) - - -def test_infer_codec_from_supported_model_classes(monkeypatch: pytest.MonkeyPatch) -> None: - class FakeChatAnthropic: - pass - - class FakeChatOpenAI: - def __init__(self, *, use_responses_api: bool = False) -> None: - self.use_responses_api = use_responses_api - - class FakeChatNVIDIA: - pass - - monkeypatch.setattr(_serialization, "ChatAnthropic", FakeChatAnthropic, raising=False) - monkeypatch.setattr(_serialization, "ChatOpenAI", FakeChatOpenAI, raising=False) - monkeypatch.setattr(_serialization, "ChatNVIDIA", FakeChatNVIDIA, raising=False) - monkeypatch.setattr(_serialization, "_HAS_ANTHROPIC", True) - monkeypatch.setattr(_serialization, "_HAS_OPENAI", True) - monkeypatch.setattr(_serialization, "_HAS_NVIDIA", True) - - assert isinstance(_serialization.infer_codec_from_model(FakeChatAnthropic()), AnthropicMessagesCodec) - assert isinstance(_serialization.infer_codec_from_model(FakeChatOpenAI()), OpenAIChatCodec) - assert isinstance( - _serialization.infer_codec_from_model(FakeChatOpenAI(use_responses_api=True)), - OpenAIResponsesCodec, - ) - assert isinstance(_serialization.infer_codec_from_model(FakeChatNVIDIA()), OpenAIChatCodec) - assert _serialization.infer_codec_from_model(object()) is None - - -@pytest.mark.parametrize("use_async", [False, True]) -def test_agent_integration(use_async: bool) -> None: - """An integration test to verify that the middleware correctly wraps a model call end-to-end.""" - model_responses = [ - AIMessage( - content="", - tool_calls=[ - { - "name": "get_weather", - "args": {"location": "San Francisco"}, - "id": "call-1", - } - ], - ), - AIMessage(content=_DEFAULT_MOCK_RESPONSE_MSG), - ] - - mock_model = _mk_mock_model(model_responses) - - @tool - def get_weather(location: str) -> str: - """Get the current weather for a location.""" - return f"The weather in {location} is sunny and 72 degrees." - - agent = create_agent(model=mock_model, tools=[get_weather], middleware=[NemoFlowMiddleware()]) - - input_payload = { - "messages": [ - { - "role": "user", - "content": "What is the weather in San Francisco?", - } - ] - } - - events = [] - expected_events = [ - "scope.start.langchain-request", - "scope.start.mock-model", - "scope.end.mock-model", - "scope.start.get_weather", - "scope.end.get_weather", - "scope.start.mock-model", - "scope.end.mock-model", - "scope.end.langchain-request", - ] - - def event_recorder(event) -> None: - events.append(f"{event.kind}.{event.scope_category}.{event.name}") - - nemo_flow.subscribers.register("event_recorder", event_recorder) - - try: - with nemo_flow.scope.scope("langchain-request", nemo_flow.ScopeType.Agent): - if use_async: - result = asyncio.run(agent.ainvoke(input_payload)) - else: - result = agent.invoke(input_payload) - finally: - nemo_flow.subscribers.deregister("event_recorder") - - assert any( - message.content == "The weather in San Francisco is sunny and 72 degrees." for message in result["messages"] - ) - assert result["messages"][-1].content == _DEFAULT_MOCK_RESPONSE_MSG - assert events == expected_events diff --git a/python/tests/integrations/langchain_tests/conftest.py b/python/tests/integrations/langchain_tests/conftest.py new file mode 100644 index 00000000..80511211 --- /dev/null +++ b/python/tests/integrations/langchain_tests/conftest.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + + +@pytest.fixture(name="integration_langchain", scope="session", autouse=True) +def integration_langchain_fixture(integration_langchain: types.ModuleType) -> types.ModuleType: + """ + Override the integration_langchain fixture to make it autouse + """ + return integration_langchain diff --git a/python/tests/integrations/langchain/test_callbacks.py b/python/tests/integrations/langchain_tests/test_callbacks.py similarity index 84% rename from python/tests/integrations/langchain/test_callbacks.py rename to python/tests/integrations/langchain_tests/test_callbacks.py index 60700e1e..31fb4d23 100644 --- a/python/tests/integrations/langchain/test_callbacks.py +++ b/python/tests/integrations/langchain_tests/test_callbacks.py @@ -5,26 +5,25 @@ from __future__ import annotations -from types import SimpleNamespace +import types +import typing from unittest.mock import MagicMock from uuid import uuid4 import pytest -from langchain_core.messages import ToolMessage -from langgraph.types import Command -from nemo_flow.integrations.langchain import callbacks as callbacks_module -from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler +if typing.TYPE_CHECKING: + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler def _make_mock_nemo_flow() -> MagicMock: """Build a minimal mock of the ``nemo_flow`` module.""" mock_nemo_flow = MagicMock(name="nemo_flow") - mock_nemo_flow.ScopeType = SimpleNamespace(Agent="Agent") + mock_nemo_flow.ScopeType = types.SimpleNamespace(Agent="Agent") - scope = SimpleNamespace() + scope = types.SimpleNamespace() scope.push = MagicMock( - side_effect=lambda name, scope_type, **kwargs: SimpleNamespace( + side_effect=lambda name, scope_type, **kwargs: types.SimpleNamespace( uuid=str(uuid4()), name=name, scope_type=scope_type, @@ -36,8 +35,16 @@ def _make_mock_nemo_flow() -> MagicMock: return mock_nemo_flow +@pytest.fixture(name="callbacks_module", scope="session") +def callbacks_module_fixture() -> types.ModuleType: + """Fixture to provide the callbacks module.""" + import nemo_flow.integrations.langchain.callbacks as callbacks_module + + return callbacks_module + + @pytest.fixture() -def mock_nemo_flow(monkeypatch: pytest.MonkeyPatch) -> MagicMock: +def mock_nemo_flow(monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType) -> MagicMock: mock_nemo_flow = _make_mock_nemo_flow() monkeypatch.setattr(callbacks_module, "nemo_flow", mock_nemo_flow) return mock_nemo_flow @@ -45,6 +52,8 @@ def mock_nemo_flow(monkeypatch: pytest.MonkeyPatch) -> MagicMock: @pytest.fixture() def handler(mock_nemo_flow: MagicMock) -> NemoFlowCallbackHandler: + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler + return NemoFlowCallbackHandler() @@ -121,6 +130,9 @@ def test_on_chain_error_pops_scope(self, handler: NemoFlowCallbackHandler, mock_ assert run_id not in handler._scope_handles def test_on_chain_end_prepares_command_outputs(self, handler: NemoFlowCallbackHandler, mock_nemo_flow: MagicMock): + from langchain_core.messages import ToolMessage + from langgraph.types import Command + run_id = uuid4() handler.on_chain_start( {"name": "MyChain"}, @@ -216,20 +228,26 @@ def test_name_fallback_to_id(self, handler: NemoFlowCallbackHandler, mock_nemo_f class TestGracefulNoOp: """Verify callbacks are silent if the module-level runtime is unavailable.""" - def test_no_nemo_flow_on_chain_start(self, monkeypatch: pytest.MonkeyPatch): + def test_no_nemo_flow_on_chain_start(self, monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType): monkeypatch.setattr(callbacks_module, "nemo_flow", None) + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler + handler = NemoFlowCallbackHandler() handler.on_chain_start({"name": "x"}, {}, run_id=uuid4()) - def test_no_nemo_flow_on_chain_end(self, monkeypatch: pytest.MonkeyPatch): + def test_no_nemo_flow_on_chain_end(self, monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType): monkeypatch.setattr(callbacks_module, "nemo_flow", None) + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler + handler = NemoFlowCallbackHandler() handler.on_chain_end({}, run_id=uuid4()) - def test_no_nemo_flow_on_chain_error(self, monkeypatch: pytest.MonkeyPatch): + def test_no_nemo_flow_on_chain_error(self, monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType): monkeypatch.setattr(callbacks_module, "nemo_flow", None) + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler + handler = NemoFlowCallbackHandler() handler.on_chain_error(RuntimeError("e"), run_id=uuid4()) @@ -238,9 +256,8 @@ def test_no_nemo_flow_on_chain_error(self, monkeypatch: pytest.MonkeyPatch): class TestErrorSwallowing: """Ensure NeMo Flow errors never propagate.""" - def test_scope_push_error_swallowed(self, mock_nemo_flow: MagicMock): + def test_scope_push_error_swallowed(self, handler: NemoFlowCallbackHandler, mock_nemo_flow: MagicMock): mock_nemo_flow.scope.push.side_effect = RuntimeError("nemo flow failure") - handler = NemoFlowCallbackHandler() handler.on_chain_start({"name": "x"}, {}, run_id=uuid4()) diff --git a/python/tests/integrations/langchain_tests/test_middleware.py b/python/tests/integrations/langchain_tests/test_middleware.py new file mode 100644 index 00000000..72fe9642 --- /dev/null +++ b/python/tests/integrations/langchain_tests/test_middleware.py @@ -0,0 +1,373 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for the LangChain NeMo Flow middleware.""" + +from __future__ import annotations + +import asyncio +import inspect +from collections.abc import Awaitable, Callable +from typing import TYPE_CHECKING, Any, Protocol +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import nemo_flow +from nemo_flow.codecs import AnthropicMessagesCodec, OpenAIChatCodec, OpenAIResponsesCodec + +if TYPE_CHECKING: + from langchain.agents.middleware import ModelRequest, ModelResponse, ToolCallRequest + from langchain_core.messages import AIMessage, ToolMessage + + from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware + +_DEFAULT_MOCK_RESPONSE_MSG = "nemo_flow unittest result" + + +@pytest.fixture(name="model_request_handler") +def model_request_handler_fixture() -> tuple[ + Callable[[ModelRequest[Any]], ModelResponse[Any]], dict[str, ModelRequest[Any]] +]: + from langchain.agents.middleware import ModelResponse + from langchain_core.messages import AIMessage + + seen_request: dict[str, ModelRequest[Any]] = {} + + def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: + seen_request["request"] = request + return ModelResponse(result=[AIMessage(content="done")]) + + return handler, seen_request + + +@pytest.fixture(name="async_model_request_handler") +def async_model_request_handler_fixture( + model_request_handler: tuple[Callable[[ModelRequest[Any]], ModelResponse[Any]], dict[str, ModelRequest[Any]]], +) -> tuple[Callable[[ModelRequest[Any]], Awaitable[ModelResponse[Any]]], dict[str, ModelRequest[Any]]]: + (sync_handler, seen_request) = model_request_handler + + async def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: + return sync_handler(request) + + return handler, seen_request + + +@pytest.fixture(name="tool_request_handler") +def tool_request_handler_fixture() -> tuple[Callable[[ToolCallRequest], ToolMessage], dict[str, ToolCallRequest]]: + from langchain_core.messages import ToolMessage + + seen_request: dict[str, ToolCallRequest] = {} + + def handler(request: ToolCallRequest) -> ToolMessage: + seen_request["request"] = request + return ToolMessage(content="done", tool_call_id=request.tool_call["id"]) + + return handler, seen_request + + +@pytest.fixture(name="async_tool_request_handler") +def async_tool_request_handler_fixture( + tool_request_handler: tuple[Callable[[ToolCallRequest], ToolMessage], dict[str, ToolCallRequest]], +) -> tuple[Callable[[ToolCallRequest], Awaitable[ToolMessage]], dict[str, ToolCallRequest]]: + (sync_handler, seen_request) = tool_request_handler + + async def handler(request: ToolCallRequest) -> ToolMessage: + return sync_handler(request) + + return handler, seen_request + + +@pytest.fixture(name="mock_tool_execute") +def mock_tool_execute_fixture() -> AsyncMock: + async def execute_side_effect(*, func: Any, **kwargs: Any) -> ToolMessage: + result = func({"query": "intercepted"}) + if inspect.isawaitable(result): + return await result + return result + + return AsyncMock(side_effect=execute_side_effect) + + +def _mk_mock_model(returned_message: str | list[AIMessage] = _DEFAULT_MOCK_RESPONSE_MSG) -> MagicMock: + from langchain_core.language_models import BaseChatModel + from langchain_core.messages import AIMessage + + mock_model = MagicMock(spec=BaseChatModel) + mock_model.bind.return_value = mock_model + mock_model.bind_tools.return_value = mock_model + mock_model.model = "mock-model" + + if isinstance(returned_message, str): + msg = AIMessage(content=returned_message) + mock_model.invoke.return_value = msg + mock_model.ainvoke = AsyncMock(return_value=msg) + else: + mock_model.invoke.side_effect = list(returned_message) + mock_model.ainvoke = AsyncMock(side_effect=list(returned_message)) + + return mock_model + + +@pytest.fixture(name="nemo_flow_middleware") +def nemo_flow_middleware_fixture() -> NemoFlowMiddleware: + from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware + + return NemoFlowMiddleware() + + +class RecordingMiddleware(Protocol): + calls: list[dict[str, Any]] + wrap_model_call: Callable + awrap_model_call: Callable + + +@pytest.fixture(name="recording_middleware") +def recording_middleware_fixture() -> RecordingMiddleware: + from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware + + class _RecordingMiddleware(NemoFlowMiddleware, RecordingMiddleware): + def __init__(self): + super().__init__() + self.calls: list[dict[str, Any]] = [] + + async def _llm_execute( + self, + model_name: str, + request: nemo_flow.LLMRequest, + codec: Any, + response_codec: Any, + func: Any, + ) -> Any: + self.calls.append( + { + "model_name": model_name, + "request": request, + "codec": codec, + "response_codec": response_codec, + } + ) + intercepted = nemo_flow.LLMRequest( + request.headers, + { + **request.content, + "model_settings": {"temperature": 0.25}, + }, + ) + return await func(intercepted) + + return _RecordingMiddleware() + + +@pytest.fixture(name="model_request") +def model_request_fixture() -> ModelRequest[Any]: + from langchain.agents.middleware import ModelRequest + from langchain_core.messages import HumanMessage + + mock_model = _mk_mock_model() + + return ModelRequest( + model=mock_model, + messages=[HumanMessage(content="hello")], + model_settings={"temperature": 1.0}, + ) + + +@pytest.fixture(name="tool_call_request") +def tool_call_request_fixture() -> ToolCallRequest: + from langchain.agents.middleware import ToolCallRequest + + return ToolCallRequest( + tool_call={"name": "lookup", "args": {"query": "original"}, "id": "call-1"}, + tool=None, + state={}, + runtime=MagicMock(), + ) + + +def test_wrap_model_call_routes_through_llm_execute( + model_request: ModelRequest[Any], + model_request_handler: tuple[Callable[[ModelRequest[Any]], ModelResponse[Any]], dict[str, ModelRequest[Any]]], + recording_middleware: RecordingMiddleware, +): + (handler, seen_request) = model_request_handler + + response = recording_middleware.wrap_model_call(model_request, handler) + + assert response.result[0].content == "done" + assert seen_request["request"].model_settings == {"temperature": 0.25} + assert recording_middleware.calls[0]["model_name"] == "mock-model" + assert recording_middleware.calls[0]["request"].content["model"] == "mock-model" + assert recording_middleware.calls[0]["codec"] is None + assert recording_middleware.calls[0]["response_codec"] is None + + +def test_awrap_model_call_routes_through_llm_execute( + model_request: ModelRequest[Any], + async_model_request_handler: tuple[ + Callable[[ModelRequest[Any]], Awaitable[ModelResponse[Any]]], dict[str, ModelRequest[Any]] + ], + recording_middleware: RecordingMiddleware, +): + (handler, seen_request) = async_model_request_handler + + response = asyncio.run(recording_middleware.awrap_model_call(model_request, handler)) + + assert response.result[0].content == "done" + assert seen_request["request"].model_settings == {"temperature": 0.25} + assert recording_middleware.calls[0]["model_name"] == "mock-model" + assert recording_middleware.calls[0]["request"].content["model"] == "mock-model" + assert recording_middleware.calls[0]["codec"] is None + assert recording_middleware.calls[0]["response_codec"] is None + + +def test_wrap_tool_call_routes_through_tool_execute( + monkeypatch: pytest.MonkeyPatch, + nemo_flow_middleware: NemoFlowMiddleware, + mock_tool_execute: AsyncMock, + tool_call_request: ToolCallRequest, + tool_request_handler: tuple[Callable[[ToolCallRequest], ToolMessage], dict[str, ToolCallRequest]], +): + (handler, seen_request) = tool_request_handler + parent_handle = MagicMock() + + monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) + monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) + + response = nemo_flow_middleware.wrap_tool_call(tool_call_request, handler) + + assert response.content == "done" + assert seen_request["request"].tool_call["args"] == {"query": "intercepted"} + mock_tool_execute.assert_awaited_once() + assert mock_tool_execute.await_args is not None + kwargs = mock_tool_execute.await_args.kwargs + assert kwargs["name"] == "lookup" + assert kwargs["args"] == {"query": "original"} + assert kwargs["handle"] is parent_handle + assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) + assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) + + +def test_awrap_tool_call_routes_through_tool_execute( + monkeypatch: pytest.MonkeyPatch, + nemo_flow_middleware: NemoFlowMiddleware, + mock_tool_execute: AsyncMock, + tool_call_request: ToolCallRequest, + async_tool_request_handler: tuple[Callable[[ToolCallRequest], Awaitable[ToolMessage]], dict[str, ToolCallRequest]], +): + parent_handle = MagicMock() + (handler, seen_request) = async_tool_request_handler + + monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) + monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) + + response = asyncio.run(nemo_flow_middleware.awrap_tool_call(tool_call_request, handler)) + + assert response.content == "done" + assert seen_request["request"].tool_call["args"] == {"query": "intercepted"} + mock_tool_execute.assert_awaited_once() + assert mock_tool_execute.await_args is not None + kwargs = mock_tool_execute.await_args.kwargs + assert kwargs["name"] == "lookup" + assert kwargs["args"] == {"query": "original"} + assert kwargs["handle"] is parent_handle + assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) + assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) + + +def test_infer_codec_from_supported_model_classes(monkeypatch: pytest.MonkeyPatch): + from nemo_flow.integrations.langchain import _serialization + + MockChatAnthropic = MagicMock(spec=type("MockChatAnthropic", (), {})) + MockChatOpenAI = MagicMock(spec=type("MockChatOpenAI", (), {})) + MockChatOpenAIResponses = MagicMock(spec=MockChatOpenAI.__class__) + MockChatOpenAIResponses.use_responses_api = True + MockChatNVIDIA = MagicMock(spec=type("MockChatNVIDIA", (), {})) + + monkeypatch.setattr(_serialization, "ChatAnthropic", MockChatAnthropic.__class__, raising=False) + monkeypatch.setattr(_serialization, "ChatOpenAI", MockChatOpenAI.__class__, raising=False) + monkeypatch.setattr(_serialization, "ChatNVIDIA", MockChatNVIDIA.__class__, raising=False) + monkeypatch.setattr(_serialization, "_HAS_ANTHROPIC", True) + monkeypatch.setattr(_serialization, "_HAS_OPENAI", True) + monkeypatch.setattr(_serialization, "_HAS_NVIDIA", True) + + assert isinstance(_serialization.infer_codec_from_model(MockChatAnthropic), AnthropicMessagesCodec) + assert isinstance(_serialization.infer_codec_from_model(MockChatOpenAI), OpenAIChatCodec) + assert isinstance( + _serialization.infer_codec_from_model(MockChatOpenAIResponses), + OpenAIResponsesCodec, + ) + assert isinstance(_serialization.infer_codec_from_model(MockChatNVIDIA), OpenAIChatCodec) + assert _serialization.infer_codec_from_model(object()) is None + + +@pytest.mark.parametrize("use_async", [False, True]) +def test_agent_integration(use_async: bool, nemo_flow_middleware: NemoFlowMiddleware): + """An integration test to verify that the middleware correctly wraps a model call end-to-end.""" + from langchain.agents import create_agent + from langchain_core.messages import AIMessage + from langchain_core.tools import tool + + model_responses = [ + AIMessage( + content="", + tool_calls=[ + { + "name": "get_weather", + "args": {"location": "San Francisco"}, + "id": "call-1", + } + ], + ), + AIMessage(content=_DEFAULT_MOCK_RESPONSE_MSG), + ] + + mock_model = _mk_mock_model(model_responses) + + @tool + def get_weather(location: str) -> str: + """Get the current weather for a location.""" + return f"The weather in {location} is sunny and 72 degrees." + + agent = create_agent(model=mock_model, tools=[get_weather], middleware=[nemo_flow_middleware]) + + input_payload = { + "messages": [ + { + "role": "user", + "content": "What is the weather in San Francisco?", + } + ] + } + + events = [] + expected_events = [ + "scope.start.langchain-request", + "scope.start.mock-model", + "scope.end.mock-model", + "scope.start.get_weather", + "scope.end.get_weather", + "scope.start.mock-model", + "scope.end.mock-model", + "scope.end.langchain-request", + ] + + def event_recorder(event): + events.append(f"{event.kind}.{event.scope_category}.{event.name}") + + nemo_flow.subscribers.register("event_recorder", event_recorder) + + try: + with nemo_flow.scope.scope("langchain-request", nemo_flow.ScopeType.Agent): + if use_async: + result = asyncio.run(agent.ainvoke(input_payload)) + else: + result = agent.invoke(input_payload) + finally: + nemo_flow.subscribers.deregister("event_recorder") + + assert any( + message.content == "The weather in San Francisco is sunny and 72 degrees." for message in result["messages"] + ) + assert result["messages"][-1].content == _DEFAULT_MOCK_RESPONSE_MSG + assert events == expected_events diff --git a/python/tests/integrations/langgraph_tests/conftest.py b/python/tests/integrations/langgraph_tests/conftest.py new file mode 100644 index 00000000..9a7c1ade --- /dev/null +++ b/python/tests/integrations/langgraph_tests/conftest.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + + +@pytest.fixture(name="integration_langgraph", scope="session", autouse=True) +def integration_langgraph_fixture(integration_langgraph: types.ModuleType) -> types.ModuleType: + """ + Override the integration_langgraph fixture to make it autouse + """ + return integration_langgraph diff --git a/python/tests/integrations/langgraph/test_langgraph_integration.py b/python/tests/integrations/langgraph_tests/test_langgraph_integration.py similarity index 67% rename from python/tests/integrations/langgraph/test_langgraph_integration.py rename to python/tests/integrations/langgraph_tests/test_langgraph_integration.py index 02e9443b..5b09bb6b 100644 --- a/python/tests/integrations/langgraph/test_langgraph_integration.py +++ b/python/tests/integrations/langgraph_tests/test_langgraph_integration.py @@ -10,18 +10,15 @@ from uuid import uuid4 import pytest -from langgraph.callbacks import GraphCallbackHandler, GraphInterruptEvent, GraphResumeEvent -from langgraph.graph import END, START, StateGraph -from langgraph.types import Interrupt from typing_extensions import TypedDict import nemo_flow -from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler as LangChainCallbackHandler -from nemo_flow.integrations.langgraph import NemoFlowCallbackHandler if TYPE_CHECKING: from langgraph.graph import CompiledStateGraph + from nemo_flow.integrations.langgraph import NemoFlowCallbackHandler + class State(TypedDict): value: int @@ -37,6 +34,8 @@ async def aincrement(state: State) -> State: def _build_graph(use_async: bool = False) -> CompiledStateGraph: + from langgraph.graph import END, START, StateGraph + # The cast here avoids a ty linting error builder = StateGraph(cast(Any, State)) if use_async: @@ -58,7 +57,14 @@ def async_graph_fixture() -> CompiledStateGraph: return _build_graph(use_async=True) -def events_to_strings(events: list[nemo_flow.Event]) -> list[str]: +@pytest.fixture(name="callback_handler") +def callback_handler_fixture() -> NemoFlowCallbackHandler: + from nemo_flow.integrations.langgraph import NemoFlowCallbackHandler + + return NemoFlowCallbackHandler() + + +def _events_to_strings(events: list[nemo_flow.Event]) -> list[str]: event_strings: list[str] = [] for event in events: @@ -70,48 +76,57 @@ def events_to_strings(events: list[nemo_flow.Event]) -> list[str]: return event_strings -def test_handler_type(): - handler = NemoFlowCallbackHandler() - assert isinstance(handler, LangChainCallbackHandler) - assert isinstance(handler, GraphCallbackHandler) +def test_handler_type(callback_handler: NemoFlowCallbackHandler): + from langgraph.callbacks import GraphCallbackHandler + + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler as LangChainCallbackHandler + + assert isinstance(callback_handler, LangChainCallbackHandler) + assert isinstance(callback_handler, GraphCallbackHandler) class TestGraphCallbacks: - def __init__(self): - self._expected_events = [ - "scope.start.request", - "scope.start.LangGraph", - "scope.start.increment", - "scope.end.increment", - "scope.end.LangGraph", - "scope.end.request", - ] + _expected_events = [ + "scope.start.request", + "scope.start.LangGraph", + "scope.start.increment", + "scope.end.increment", + "scope.end.LangGraph", + "scope.end.request", + ] def test_sync( self, sync_graph: CompiledStateGraph, subscribed_events: list[nemo_flow.Event], + callback_handler: NemoFlowCallbackHandler, ): with nemo_flow.scope.scope("request", nemo_flow.ScopeType.Agent): - result = sync_graph.invoke({"value": 1}, config={"callbacks": [NemoFlowCallbackHandler()]}) + result = sync_graph.invoke({"value": 1}, config={"callbacks": [callback_handler]}) assert result == {"value": 2} - assert events_to_strings(subscribed_events) == self._expected_events + assert _events_to_strings(subscribed_events) == self._expected_events async def test_async( self, async_graph: CompiledStateGraph, subscribed_events: list[nemo_flow.Event], + callback_handler: NemoFlowCallbackHandler, ): with nemo_flow.scope.scope("request", nemo_flow.ScopeType.Agent): - result = await async_graph.ainvoke({"value": 1}, config={"callbacks": [NemoFlowCallbackHandler()]}) + result = await async_graph.ainvoke({"value": 1}, config={"callbacks": [callback_handler]}) assert result == {"value": 2} - assert events_to_strings(subscribed_events) == self._expected_events + assert _events_to_strings(subscribed_events) == self._expected_events + +def test_graph_lifecycle_callbacks_emit_marks( + subscribed_events: list[nemo_flow.Event], + callback_handler: NemoFlowCallbackHandler, +): + from langgraph.callbacks import GraphInterruptEvent, GraphResumeEvent + from langgraph.types import Interrupt -def test_graph_lifecycle_callbacks_emit_marks(subscribed_events: list[nemo_flow.Event]): - handler = NemoFlowCallbackHandler() run_id = uuid4() expected_event_strings = [ @@ -122,7 +137,7 @@ def test_graph_lifecycle_callbacks_emit_marks(subscribed_events: list[nemo_flow. ] with nemo_flow.scope.scope("request", nemo_flow.ScopeType.Agent): - handler.on_interrupt( + callback_handler.on_interrupt( GraphInterruptEvent( run_id=run_id, status="interrupt_after", @@ -132,7 +147,7 @@ def test_graph_lifecycle_callbacks_emit_marks(subscribed_events: list[nemo_flow. ) ) - handler.on_resume( + callback_handler.on_resume( GraphResumeEvent( run_id=run_id, status="pending", @@ -141,7 +156,7 @@ def test_graph_lifecycle_callbacks_emit_marks(subscribed_events: list[nemo_flow. ) ) - assert events_to_strings(subscribed_events) == expected_event_strings + assert _events_to_strings(subscribed_events) == expected_event_strings interrupt_event = subscribed_events[1] assert isinstance(interrupt_event, nemo_flow.MarkEvent)