diff --git a/.agents/skills/test-python-binding/SKILL.md b/.agents/skills/test-python-binding/SKILL.md index 330eda60..b3081130 100644 --- a/.agents/skills/test-python-binding/SKILL.md +++ b/.agents/skills/test-python-binding/SKILL.md @@ -38,6 +38,13 @@ Use this skill when the change is primarily in `python/nemo_flow`, - The name of the mocked class should be prefixed with `mock`, not `fake`. - Prefer pytest fixtures over helper methods. - Do not repeat fixtures, if a fixture is needed in multiple test files, place it in a `conftest.py` file. +- When creating a fixture follow this pattern: + ```python + @pytest.fixture(name=""[, scope=""]) + def _fixture() -> : + ... + ``` + Only specify the scope argument when the value is something other than "function". - Prefer `pytest.mark.parametrize` over creating individual tests for different input types. diff --git a/python/tests/conftest.py b/python/tests/conftest.py index cc4b9794..922a5a6f 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -7,6 +7,7 @@ import typing from collections.abc import Iterator +import types from uuid import uuid4 import pytest diff --git a/python/tests/integrations/conftest.py b/python/tests/integrations/conftest.py new file mode 100644 index 00000000..e14da885 --- /dev/null +++ b/python/tests/integrations/conftest.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + + +@pytest.fixture(name="integration_langchain", scope='session') +def integration_langchain_fixture() -> types.ModuleType: + """ + Use for integration tests that require LangChain to be installed. + """ + try: + import langchain + return langchain + except Exception: + pytest.skip(reason="langchain must be installed to run LangChain based tests") + + +@pytest.fixture(name="integration_langgraph", scope='session') +def integration_langgraph_fixture(integration_langchain: types.ModuleType) -> types.ModuleType: + """ + Use for integration tests that require LangGraph to be installed. + """ + try: + import langgraph + return langgraph + except Exception: + pytest.skip(reason="langgraph must be installed to run LangGraph based tests") + + +@pytest.fixture(name="integration_deepagents", scope='session') +def integration_deepagents_fixture(integration_langgraph: types.ModuleType) -> types.ModuleType: + """ + Use for integration tests that require Deep Agents to be installed. + """ + try: + import deepagents + return deepagents + except Exception: + pytest.skip(reason="deepagents must be installed to run Deep Agents based tests") diff --git a/python/tests/integrations/deepagents/conftest.py b/python/tests/integrations/deepagents/conftest.py new file mode 100644 index 00000000..a50e6e5d --- /dev/null +++ b/python/tests/integrations/deepagents/conftest.py @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + +@pytest.fixture(name="integration_deepagents", scope='session', autouse=True) +def integration_deepagents_fixture(integration_deepagents: types.ModuleType) -> types.ModuleType: + """ + Override the integration_deepagents fixture to make it autouse + """ + yield integration_deepagents diff --git a/python/tests/integrations/langchain/test_middleware.py b/python/tests/integrations/langchain/test_middleware.py deleted file mode 100644 index b5b1a18f..00000000 --- a/python/tests/integrations/langchain/test_middleware.py +++ /dev/null @@ -1,291 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""Tests for the LangChain NeMo Flow middleware.""" - -from __future__ import annotations - -import asyncio -from typing import Any -from unittest.mock import AsyncMock, MagicMock - -import pytest -from langchain.agents import create_agent -from langchain.agents.middleware import ModelRequest, ModelResponse, ToolCallRequest -from langchain_core.language_models import BaseChatModel -from langchain_core.messages import AIMessage, HumanMessage, ToolMessage -from langchain_core.tools import tool - -import nemo_flow -from nemo_flow.codecs import AnthropicMessagesCodec, OpenAIChatCodec, OpenAIResponsesCodec -from nemo_flow.integrations.langchain import _serialization -from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware - -_DEFAULT_MOCK_RESPONSE_MSG = "nemo_flow unittest result" - - -def _mk_mock_model(returned_message: str | list[AIMessage] = _DEFAULT_MOCK_RESPONSE_MSG) -> MagicMock: - mock_model = MagicMock(spec=BaseChatModel) - mock_model.bind.return_value = mock_model - mock_model.bind_tools.return_value = mock_model - mock_model.model = "mock-model" - - if isinstance(returned_message, str): - msg = AIMessage(content=returned_message) - mock_model.invoke.return_value = msg - mock_model.ainvoke = AsyncMock(return_value=msg) - else: - mock_model.invoke.side_effect = list(returned_message) - mock_model.ainvoke = AsyncMock(side_effect=list(returned_message)) - - return mock_model - - -class RecordingMiddleware(NemoFlowMiddleware): - def __init__(self) -> None: - super().__init__() - self.calls: list[dict[str, Any]] = [] - - async def _llm_execute( - self, - model_name: str, - request: nemo_flow.LLMRequest, - codec: Any, - response_codec: Any, - func: Any, - ) -> Any: - self.calls.append( - { - "model_name": model_name, - "request": request, - "codec": codec, - "response_codec": response_codec, - } - ) - intercepted = nemo_flow.LLMRequest( - request.headers, - { - **request.content, - "model_settings": {"temperature": 0.25}, - }, - ) - return await func(intercepted) - - -def _model_request() -> ModelRequest[Any]: - mock_model = _mk_mock_model() - - return ModelRequest( - model=mock_model, - messages=[HumanMessage(content="hello")], - model_settings={"temperature": 1.0}, - ) - - -def _tool_call_request() -> ToolCallRequest: - return ToolCallRequest( - tool_call={"name": "lookup", "args": {"query": "original"}, "id": "call-1"}, - tool=None, - state={}, - runtime=MagicMock(), - ) - - -def test_wrap_model_call_routes_through_llm_execute() -> None: - middleware = RecordingMiddleware() - seen_request: ModelRequest[Any] | None = None - - def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: - nonlocal seen_request - seen_request = request - return ModelResponse(result=[AIMessage(content="done")]) - - response = middleware.wrap_model_call(_model_request(), handler) - - assert response.result[0].content == "done" - assert seen_request is not None - assert seen_request.model_settings == {"temperature": 0.25} - assert middleware.calls[0]["model_name"] == "mock-model" - assert middleware.calls[0]["request"].content["model"] == "mock-model" - assert middleware.calls[0]["codec"] is None - assert middleware.calls[0]["response_codec"] is None - - -def test_awrap_model_call_routes_through_llm_execute() -> None: - middleware = RecordingMiddleware() - seen_request: ModelRequest[Any] | None = None - - async def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: - nonlocal seen_request - seen_request = request - return ModelResponse(result=[AIMessage(content="done")]) - - response = asyncio.run(middleware.awrap_model_call(_model_request(), handler)) - - assert response.result[0].content == "done" - assert seen_request is not None - assert seen_request.model_settings == {"temperature": 0.25} - assert middleware.calls[0]["model_name"] == "mock-model" - assert middleware.calls[0]["request"].content["model"] == "mock-model" - assert middleware.calls[0]["codec"] is None - assert middleware.calls[0]["response_codec"] is None - - -def test_wrap_tool_call_routes_through_tool_execute(monkeypatch: pytest.MonkeyPatch) -> None: - middleware = NemoFlowMiddleware() - parent_handle = MagicMock() - seen_request: ToolCallRequest | None = None - - async def execute_side_effect(*, func: Any, **kwargs: Any) -> ToolMessage: - return func({"query": "intercepted"}) - - mock_tool_execute = AsyncMock(side_effect=execute_side_effect) - - def handler(request: ToolCallRequest) -> ToolMessage: - nonlocal seen_request - seen_request = request - return ToolMessage(content="done", tool_call_id=request.tool_call["id"]) - - monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) - monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) - - response = middleware.wrap_tool_call(_tool_call_request(), handler) - - assert response.content == "done" - assert seen_request is not None - assert seen_request.tool_call["args"] == {"query": "intercepted"} - mock_tool_execute.assert_awaited_once() - kwargs = mock_tool_execute.await_args.kwargs - assert kwargs["name"] == "lookup" - assert kwargs["args"] == {"query": "original"} - assert kwargs["handle"] is parent_handle - assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) - assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) - - -def test_awrap_tool_call_routes_through_tool_execute(monkeypatch: pytest.MonkeyPatch) -> None: - middleware = NemoFlowMiddleware() - parent_handle = MagicMock() - seen_request: ToolCallRequest | None = None - - async def execute_side_effect(*, func: Any, **kwargs: Any) -> ToolMessage: - return await func({"query": "intercepted"}) - - mock_tool_execute = AsyncMock(side_effect=execute_side_effect) - - async def handler(request: ToolCallRequest) -> ToolMessage: - nonlocal seen_request - seen_request = request - return ToolMessage(content="done", tool_call_id=request.tool_call["id"]) - - monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) - monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) - - response = asyncio.run(middleware.awrap_tool_call(_tool_call_request(), handler)) - - assert response.content == "done" - assert seen_request is not None - assert seen_request.tool_call["args"] == {"query": "intercepted"} - mock_tool_execute.assert_awaited_once() - kwargs = mock_tool_execute.await_args.kwargs - assert kwargs["name"] == "lookup" - assert kwargs["args"] == {"query": "original"} - assert kwargs["handle"] is parent_handle - assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) - assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) - - -def test_infer_codec_from_supported_model_classes(monkeypatch: pytest.MonkeyPatch) -> None: - class FakeChatAnthropic: - pass - - class FakeChatOpenAI: - def __init__(self, *, use_responses_api: bool = False) -> None: - self.use_responses_api = use_responses_api - - class FakeChatNVIDIA: - pass - - monkeypatch.setattr(_serialization, "ChatAnthropic", FakeChatAnthropic, raising=False) - monkeypatch.setattr(_serialization, "ChatOpenAI", FakeChatOpenAI, raising=False) - monkeypatch.setattr(_serialization, "ChatNVIDIA", FakeChatNVIDIA, raising=False) - monkeypatch.setattr(_serialization, "_HAS_ANTHROPIC", True) - monkeypatch.setattr(_serialization, "_HAS_OPENAI", True) - monkeypatch.setattr(_serialization, "_HAS_NVIDIA", True) - - assert isinstance(_serialization.infer_codec_from_model(FakeChatAnthropic()), AnthropicMessagesCodec) - assert isinstance(_serialization.infer_codec_from_model(FakeChatOpenAI()), OpenAIChatCodec) - assert isinstance( - _serialization.infer_codec_from_model(FakeChatOpenAI(use_responses_api=True)), - OpenAIResponsesCodec, - ) - assert isinstance(_serialization.infer_codec_from_model(FakeChatNVIDIA()), OpenAIChatCodec) - assert _serialization.infer_codec_from_model(object()) is None - - -@pytest.mark.parametrize("use_async", [False, True]) -def test_agent_integration(use_async: bool) -> None: - """An integration test to verify that the middleware correctly wraps a model call end-to-end.""" - model_responses = [ - AIMessage( - content="", - tool_calls=[ - { - "name": "get_weather", - "args": {"location": "San Francisco"}, - "id": "call-1", - } - ], - ), - AIMessage(content=_DEFAULT_MOCK_RESPONSE_MSG), - ] - - mock_model = _mk_mock_model(model_responses) - - @tool - def get_weather(location: str) -> str: - """Get the current weather for a location.""" - return f"The weather in {location} is sunny and 72 degrees." - - agent = create_agent(model=mock_model, tools=[get_weather], middleware=[NemoFlowMiddleware()]) - - input_payload = { - "messages": [ - { - "role": "user", - "content": "What is the weather in San Francisco?", - } - ] - } - - events = [] - expected_events = [ - "scope.start.langchain-request", - "scope.start.mock-model", - "scope.end.mock-model", - "scope.start.get_weather", - "scope.end.get_weather", - "scope.start.mock-model", - "scope.end.mock-model", - "scope.end.langchain-request", - ] - - def event_recorder(event) -> None: - events.append(f"{event.kind}.{event.scope_category}.{event.name}") - - nemo_flow.subscribers.register("event_recorder", event_recorder) - - try: - with nemo_flow.scope.scope("langchain-request", nemo_flow.ScopeType.Agent): - if use_async: - result = asyncio.run(agent.ainvoke(input_payload)) - else: - result = agent.invoke(input_payload) - finally: - nemo_flow.subscribers.deregister("event_recorder") - - assert any( - message.content == "The weather in San Francisco is sunny and 72 degrees." for message in result["messages"] - ) - assert result["messages"][-1].content == _DEFAULT_MOCK_RESPONSE_MSG - assert events == expected_events diff --git a/python/tests/integrations/langchain_tests/conftest.py b/python/tests/integrations/langchain_tests/conftest.py new file mode 100644 index 00000000..eb4f7d35 --- /dev/null +++ b/python/tests/integrations/langchain_tests/conftest.py @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + +@pytest.fixture(name="integration_langchain", scope='session', autouse=True) +def integration_langchain_fixture(integration_langchain: types.ModuleType) -> types.ModuleType: + """ + Override the integration_langchain fixture to make it autouse + """ + yield integration_langchain diff --git a/python/tests/integrations/langchain/test_callbacks.py b/python/tests/integrations/langchain_tests/test_callbacks.py similarity index 84% rename from python/tests/integrations/langchain/test_callbacks.py rename to python/tests/integrations/langchain_tests/test_callbacks.py index 60700e1e..168215df 100644 --- a/python/tests/integrations/langchain/test_callbacks.py +++ b/python/tests/integrations/langchain_tests/test_callbacks.py @@ -5,26 +5,26 @@ from __future__ import annotations -from types import SimpleNamespace +import types +import typing from unittest.mock import MagicMock from uuid import uuid4 import pytest -from langchain_core.messages import ToolMessage -from langgraph.types import Command -from nemo_flow.integrations.langchain import callbacks as callbacks_module -from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler + +if typing.TYPE_CHECKING: + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler def _make_mock_nemo_flow() -> MagicMock: """Build a minimal mock of the ``nemo_flow`` module.""" mock_nemo_flow = MagicMock(name="nemo_flow") - mock_nemo_flow.ScopeType = SimpleNamespace(Agent="Agent") + mock_nemo_flow.ScopeType = types.SimpleNamespace(Agent="Agent") - scope = SimpleNamespace() + scope = types.SimpleNamespace() scope.push = MagicMock( - side_effect=lambda name, scope_type, **kwargs: SimpleNamespace( + side_effect=lambda name, scope_type, **kwargs: types.SimpleNamespace( uuid=str(uuid4()), name=name, scope_type=scope_type, @@ -35,9 +35,14 @@ def _make_mock_nemo_flow() -> MagicMock: mock_nemo_flow.scope = scope return mock_nemo_flow +@pytest.fixture(name="callbacks_module", scope="session") +def callbacks_module_fixture() -> types.ModuleType: + """Fixture to provide the callbacks module.""" + import nemo_flow.integrations.langchain.callbacks as callbacks_module + return callbacks_module @pytest.fixture() -def mock_nemo_flow(monkeypatch: pytest.MonkeyPatch) -> MagicMock: +def mock_nemo_flow(monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType) -> MagicMock: mock_nemo_flow = _make_mock_nemo_flow() monkeypatch.setattr(callbacks_module, "nemo_flow", mock_nemo_flow) return mock_nemo_flow @@ -45,6 +50,7 @@ def mock_nemo_flow(monkeypatch: pytest.MonkeyPatch) -> MagicMock: @pytest.fixture() def handler(mock_nemo_flow: MagicMock) -> NemoFlowCallbackHandler: + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler return NemoFlowCallbackHandler() @@ -121,6 +127,8 @@ def test_on_chain_error_pops_scope(self, handler: NemoFlowCallbackHandler, mock_ assert run_id not in handler._scope_handles def test_on_chain_end_prepares_command_outputs(self, handler: NemoFlowCallbackHandler, mock_nemo_flow: MagicMock): + from langgraph.types import Command + from langchain_core.messages import ToolMessage run_id = uuid4() handler.on_chain_start( {"name": "MyChain"}, @@ -216,20 +224,23 @@ def test_name_fallback_to_id(self, handler: NemoFlowCallbackHandler, mock_nemo_f class TestGracefulNoOp: """Verify callbacks are silent if the module-level runtime is unavailable.""" - def test_no_nemo_flow_on_chain_start(self, monkeypatch: pytest.MonkeyPatch): + def test_no_nemo_flow_on_chain_start(self, monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType): monkeypatch.setattr(callbacks_module, "nemo_flow", None) + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler handler = NemoFlowCallbackHandler() handler.on_chain_start({"name": "x"}, {}, run_id=uuid4()) - def test_no_nemo_flow_on_chain_end(self, monkeypatch: pytest.MonkeyPatch): + def test_no_nemo_flow_on_chain_end(self, monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType): monkeypatch.setattr(callbacks_module, "nemo_flow", None) + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler handler = NemoFlowCallbackHandler() handler.on_chain_end({}, run_id=uuid4()) - def test_no_nemo_flow_on_chain_error(self, monkeypatch: pytest.MonkeyPatch): + def test_no_nemo_flow_on_chain_error(self, monkeypatch: pytest.MonkeyPatch, callbacks_module: types.ModuleType): monkeypatch.setattr(callbacks_module, "nemo_flow", None) + from nemo_flow.integrations.langchain.callbacks import NemoFlowCallbackHandler handler = NemoFlowCallbackHandler() handler.on_chain_error(RuntimeError("e"), run_id=uuid4()) @@ -238,9 +249,8 @@ def test_no_nemo_flow_on_chain_error(self, monkeypatch: pytest.MonkeyPatch): class TestErrorSwallowing: """Ensure NeMo Flow errors never propagate.""" - def test_scope_push_error_swallowed(self, mock_nemo_flow: MagicMock): + def test_scope_push_error_swallowed(self, handler: NemoFlowCallbackHandler, mock_nemo_flow: MagicMock): mock_nemo_flow.scope.push.side_effect = RuntimeError("nemo flow failure") - handler = NemoFlowCallbackHandler() handler.on_chain_start({"name": "x"}, {}, run_id=uuid4()) diff --git a/python/tests/integrations/langchain_tests/test_middleware.py b/python/tests/integrations/langchain_tests/test_middleware.py new file mode 100644 index 00000000..98eb4c6d --- /dev/null +++ b/python/tests/integrations/langchain_tests/test_middleware.py @@ -0,0 +1,326 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for the LangChain NeMo Flow middleware.""" + +from __future__ import annotations + +import asyncio +import inspect +from collections.abc import Awaitable, Callable +from typing import Any, TYPE_CHECKING +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import nemo_flow +from nemo_flow.codecs import AnthropicMessagesCodec, OpenAIChatCodec, OpenAIResponsesCodec + +if TYPE_CHECKING: + from langchain_core.messages import AIMessage + from langchain_core.messages import ToolMessage + from langchain.agents.middleware import ToolCallRequest + from langchain.agents.middleware import ModelRequest, ModelResponse + + from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware + +_DEFAULT_MOCK_RESPONSE_MSG = "nemo_flow unittest result" + +@pytest.fixture(name="model_request_handler") +def model_request_handler_fixture() -> tuple[Callable[[ModelRequest[Any]], ModelResponse[Any]], dict[str, ModelRequest[Any]]]: + from langchain_core.messages import AIMessage + from langchain.agents.middleware import ModelResponse + + seen_request: dict[str, ModelRequest[Any]] = {} + + def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: + seen_request["request"] = request + return ModelResponse(result=[AIMessage(content="done")]) + + return handler, seen_request + +@pytest.fixture(name="async_model_request_handler") +def async_model_request_handler_fixture(model_request_handler: tuple[Callable[[ModelRequest[Any]], ModelResponse[Any]], dict[str, ModelRequest[Any]]]) -> tuple[Callable[[ModelRequest[Any]], Awaitable[ModelResponse[Any]]], dict[str, ModelRequest[Any]]]: + (sync_handler, seen_request) = model_request_handler + async def handler(request: ModelRequest[Any]) -> ModelResponse[Any]: + return sync_handler(request) + + return handler, seen_request + +@pytest.fixture(name="tool_request_handler") +def tool_request_handler_fixture() -> tuple[Callable[[ToolCallRequest], ToolMessage], dict[str, ToolCallRequest]]: + from langchain_core.messages import ToolMessage + seen_request: dict[str, ToolCallRequest] = {} + + def handler(request: ToolCallRequest) -> ToolMessage: + seen_request["request"] = request + return ToolMessage(content="done", tool_call_id=request.tool_call["id"]) + + return handler, seen_request + +@pytest.fixture(name="async_tool_request_handler") +def async_tool_request_handler_fixture(tool_request_handler: tuple[Callable[[ToolCallRequest], ToolMessage], dict[str, ToolCallRequest]]) -> tuple[Callable[[ToolCallRequest], Awaitable[ToolMessage]], dict[str, ToolCallRequest]]: + (sync_handler, seen_request) = tool_request_handler + async def handler(request: ToolCallRequest) -> ToolMessage: + return sync_handler(request) + + return handler, seen_request + +@pytest.fixture(name="mock_tool_execute") +def mock_tool_execute_fixture() -> AsyncMock: + async def execute_side_effect(*, func: Any, **kwargs: Any) -> ToolMessage: + result = func({"query": "intercepted"}) + if inspect.isawaitable(result): + return await result + return result + + return AsyncMock(side_effect=execute_side_effect) + + +def _mk_mock_model(returned_message: str | list[AIMessage] = _DEFAULT_MOCK_RESPONSE_MSG) -> MagicMock: + from langchain_core.language_models import BaseChatModel + from langchain_core.messages import AIMessage + + mock_model = MagicMock(spec=BaseChatModel) + mock_model.bind.return_value = mock_model + mock_model.bind_tools.return_value = mock_model + mock_model.model = "mock-model" + + if isinstance(returned_message, str): + msg = AIMessage(content=returned_message) + mock_model.invoke.return_value = msg + mock_model.ainvoke = AsyncMock(return_value=msg) + else: + mock_model.invoke.side_effect = list(returned_message) + mock_model.ainvoke = AsyncMock(side_effect=list(returned_message)) + + return mock_model + +@pytest.fixture(name="nemo_flow_middleware") +def nemo_flow_middleware_fixture() -> NemoFlowMiddleware: + from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware + return NemoFlowMiddleware() + +@pytest.fixture(name="recording_middleware") +def recording_middleware_fixture() -> NemoFlowMiddleware: + from nemo_flow.integrations.langchain.middleware import NemoFlowMiddleware + class RecordingMiddleware(NemoFlowMiddleware): + def __init__(self): + super().__init__() + self.calls: list[dict[str, Any]] = [] + + async def _llm_execute( + self, + model_name: str, + request: nemo_flow.LLMRequest, + codec: Any, + response_codec: Any, + func: Any, + ) -> Any: + self.calls.append( + { + "model_name": model_name, + "request": request, + "codec": codec, + "response_codec": response_codec, + } + ) + intercepted = nemo_flow.LLMRequest( + request.headers, + { + **request.content, + "model_settings": {"temperature": 0.25}, + }, + ) + return await func(intercepted) + + return RecordingMiddleware() + +@pytest.fixture(name="model_request") +def model_request_fixture() -> ModelRequest[Any]: + from langchain.agents.middleware import ModelRequest + from langchain_core.messages import HumanMessage + + mock_model = _mk_mock_model() + + return ModelRequest( + model=mock_model, + messages=[HumanMessage(content="hello")], + model_settings={"temperature": 1.0}, + ) + + +@pytest.fixture(name="tool_call_request") +def tool_call_request_fixture() -> ToolCallRequest: + from langchain.agents.middleware import ToolCallRequest + return ToolCallRequest( + tool_call={"name": "lookup", "args": {"query": "original"}, "id": "call-1"}, + tool=None, + state={}, + runtime=MagicMock(), + ) + + +def test_wrap_model_call_routes_through_llm_execute(model_request: ModelRequest[Any], model_request_handler: tuple[Callable[[ModelRequest[Any]], ModelResponse[Any]], dict[str, ModelRequest[Any]]], recording_middleware: NemoFlowMiddleware): + + (handler, seen_request) = model_request_handler + + response = recording_middleware.wrap_model_call(model_request, handler) + + assert response.result[0].content == "done" + assert seen_request["request"].model_settings == {"temperature": 0.25} + assert recording_middleware.calls[0]["model_name"] == "mock-model" + assert recording_middleware.calls[0]["request"].content["model"] == "mock-model" + assert recording_middleware.calls[0]["codec"] is None + assert recording_middleware.calls[0]["response_codec"] is None + + +def test_awrap_model_call_routes_through_llm_execute(model_request: ModelRequest[Any], async_model_request_handler: tuple[Callable[[ModelRequest[Any]], Awaitable[ModelResponse[Any]]], dict[str, ModelRequest[Any]]], recording_middleware: NemoFlowMiddleware): + (handler, seen_request) = async_model_request_handler + + response = asyncio.run(recording_middleware.awrap_model_call(model_request, handler)) + + assert response.result[0].content == "done" + assert seen_request["request"].model_settings == {"temperature": 0.25} + assert recording_middleware.calls[0]["model_name"] == "mock-model" + assert recording_middleware.calls[0]["request"].content["model"] == "mock-model" + assert recording_middleware.calls[0]["codec"] is None + assert recording_middleware.calls[0]["response_codec"] is None + + +def test_wrap_tool_call_routes_through_tool_execute(monkeypatch: pytest.MonkeyPatch, nemo_flow_middleware: NemoFlowMiddleware, mock_tool_execute: AsyncMock, tool_call_request: ToolCallRequest, tool_request_handler: tuple[Callable[[ToolCallRequest], ToolMessage], dict[str, ToolCallRequest]]): + (handler, seen_request) = tool_request_handler + parent_handle = MagicMock() + + monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) + monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) + + response = nemo_flow_middleware.wrap_tool_call(tool_call_request, handler) + + assert response.content == "done" + assert seen_request["request"].tool_call["args"] == {"query": "intercepted"} + mock_tool_execute.assert_awaited_once() + kwargs = mock_tool_execute.await_args.kwargs + assert kwargs["name"] == "lookup" + assert kwargs["args"] == {"query": "original"} + assert kwargs["handle"] is parent_handle + assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) + assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) + + +def test_awrap_tool_call_routes_through_tool_execute(monkeypatch: pytest.MonkeyPatch, nemo_flow_middleware: NemoFlowMiddleware, mock_tool_execute: AsyncMock, tool_call_request: ToolCallRequest, async_tool_request_handler: tuple[Callable[[ToolCallRequest], Awaitable[ToolMessage]], dict[str, ToolCallRequest]]): + parent_handle = MagicMock() + (handler, seen_request) = async_tool_request_handler + + monkeypatch.setattr(nemo_flow.scope, "get_handle", lambda: parent_handle) + monkeypatch.setattr(nemo_flow.typed, "tool_execute", mock_tool_execute) + + response = asyncio.run(nemo_flow_middleware.awrap_tool_call(tool_call_request, handler)) + + assert response.content == "done" + assert seen_request["request"].tool_call["args"] == {"query": "intercepted"} + mock_tool_execute.assert_awaited_once() + kwargs = mock_tool_execute.await_args.kwargs + assert kwargs["name"] == "lookup" + assert kwargs["args"] == {"query": "original"} + assert kwargs["handle"] is parent_handle + assert isinstance(kwargs["args_codec"], nemo_flow.typed.BestEffortAnyCodec) + assert isinstance(kwargs["result_codec"], nemo_flow.typed.BestEffortAnyCodec) + + +def test_infer_codec_from_supported_model_classes(monkeypatch: pytest.MonkeyPatch): + from nemo_flow.integrations.langchain import _serialization + + MockChatAnthropic = MagicMock(spec=type("MockChatAnthropic", (), {})) + MockChatOpenAI = MagicMock(spec=type("MockChatOpenAI", (), {})) + MockChatOpenAIResponses = MagicMock(spec=MockChatOpenAI.__class__) + MockChatOpenAIResponses.use_responses_api = True + MockChatNVIDIA = MagicMock(spec=type("MockChatNVIDIA", (), {})) + + monkeypatch.setattr(_serialization, "ChatAnthropic", MockChatAnthropic.__class__, raising=False) + monkeypatch.setattr(_serialization, "ChatOpenAI", MockChatOpenAI.__class__, raising=False) + monkeypatch.setattr(_serialization, "ChatNVIDIA", MockChatNVIDIA.__class__, raising=False) + monkeypatch.setattr(_serialization, "_HAS_ANTHROPIC", True) + monkeypatch.setattr(_serialization, "_HAS_OPENAI", True) + monkeypatch.setattr(_serialization, "_HAS_NVIDIA", True) + + assert isinstance(_serialization.infer_codec_from_model(MockChatAnthropic), AnthropicMessagesCodec) + assert isinstance(_serialization.infer_codec_from_model(MockChatOpenAI), OpenAIChatCodec) + assert isinstance( + _serialization.infer_codec_from_model(MockChatOpenAIResponses), + OpenAIResponsesCodec, + ) + assert isinstance(_serialization.infer_codec_from_model(MockChatNVIDIA), OpenAIChatCodec) + assert _serialization.infer_codec_from_model(object()) is None + + +@pytest.mark.parametrize("use_async", [False, True]) +def test_agent_integration(use_async: bool, nemo_flow_middleware: NemoFlowMiddleware): + """An integration test to verify that the middleware correctly wraps a model call end-to-end.""" + from langchain.agents import create_agent + from langchain_core.messages import AIMessage + from langchain_core.tools import tool + + model_responses = [ + AIMessage( + content="", + tool_calls=[ + { + "name": "get_weather", + "args": {"location": "San Francisco"}, + "id": "call-1", + } + ], + ), + AIMessage(content=_DEFAULT_MOCK_RESPONSE_MSG), + ] + + mock_model = _mk_mock_model(model_responses) + + @tool + def get_weather(location: str) -> str: + """Get the current weather for a location.""" + return f"The weather in {location} is sunny and 72 degrees." + + agent = create_agent(model=mock_model, tools=[get_weather], middleware=[nemo_flow_middleware]) + + input_payload = { + "messages": [ + { + "role": "user", + "content": "What is the weather in San Francisco?", + } + ] + } + + events = [] + expected_events = [ + "scope.start.langchain-request", + "scope.start.mock-model", + "scope.end.mock-model", + "scope.start.get_weather", + "scope.end.get_weather", + "scope.start.mock-model", + "scope.end.mock-model", + "scope.end.langchain-request", + ] + + def event_recorder(event): + events.append(f"{event.kind}.{event.scope_category}.{event.name}") + + nemo_flow.subscribers.register("event_recorder", event_recorder) + + try: + with nemo_flow.scope.scope("langchain-request", nemo_flow.ScopeType.Agent): + if use_async: + result = asyncio.run(agent.ainvoke(input_payload)) + else: + result = agent.invoke(input_payload) + finally: + nemo_flow.subscribers.deregister("event_recorder") + + assert any( + message.content == "The weather in San Francisco is sunny and 72 degrees." for message in result["messages"] + ) + assert result["messages"][-1].content == _DEFAULT_MOCK_RESPONSE_MSG + assert events == expected_events diff --git a/python/tests/integrations/langgraph/conftest.py b/python/tests/integrations/langgraph/conftest.py new file mode 100644 index 00000000..678734fa --- /dev/null +++ b/python/tests/integrations/langgraph/conftest.py @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import types + +import pytest + +@pytest.fixture(name="integration_langgraph", scope='session', autouse=True) +def integration_langgraph_fixture(integration_langgraph: types.ModuleType) -> types.ModuleType: + """ + Override the integration_langgraph fixture to make it autouse + """ + yield integration_langgraph