From 3f9bcbde4b2ae96c73b3a6f4eae404962ea72eb4 Mon Sep 17 00:00:00 2001 From: Chris Chase Date: Fri, 20 Feb 2026 12:21:45 -0500 Subject: [PATCH] fix: surface Langflow error and empty-response messages to users When a Langflow flow rejects a query or returns no content, the UI now displays an error message instead of silently showing nothing. This adds extract_error_from_sse_data() to detect Langflow SSE error events during streaming, and changes the empty-response path from a silent "done" to an explicit error event. --- backend/app/api/routes/v1/chat_messages.py | 6 +- backend/app/services/langflow/client.py | 39 +++++++++++ backend/tests/api/routes/v1/test_chats.py | 40 +++++++++++ backend/tests/services/test_langflow.py | 79 ++++++++++++++++++++++ 4 files changed, 163 insertions(+), 1 deletion(-) diff --git a/backend/app/api/routes/v1/chat_messages.py b/backend/app/api/routes/v1/chat_messages.py index 9dc28b1..7e9082a 100644 --- a/backend/app/api/routes/v1/chat_messages.py +++ b/backend/app/api/routes/v1/chat_messages.py @@ -269,7 +269,11 @@ async def generate_stream() -> AsyncGenerator[str, None]: yield format_sse_event({"type": "done", "message_id": assistant_message.id}) else: logger.warning(f"No content received from Langflow for chat {chat_id}") - yield format_sse_event({"type": "done"}) + yield format_sse_event({ + "type": "error", + "error": "No response received from the AI service. " + "The query may have been rejected by the flow.", + }) except LangflowError as e: logger.error(f"Langflow error in chat {chat_id}: {e.message}") diff --git a/backend/app/services/langflow/client.py b/backend/app/services/langflow/client.py index 642d1e1..ffc31e5 100644 --- a/backend/app/services/langflow/client.py +++ b/backend/app/services/langflow/client.py @@ -25,6 +25,39 @@ SSE_DONE_MARKER = "[DONE]" +def extract_error_from_sse_data(data: dict) -> str | None: + """ + Extract an error message from an SSE data payload, if present. + + Handles Langflow error event formats where the top-level "event" is "error": + - {"event": "error", "data": {"error": "..."}} + - {"event": "error", "data": {"message": "..."}} + - {"event": "error", "data": {"text": "..."}} + - {"event": "error", "data": "plain string error"} + + Falls back to str(data) if none of the known keys are found. + + Returns the error message string, or None if not an error event. + """ + event_type = data.get("event") + if event_type != "error": + return None + + event_data = data.get("data", {}) + if isinstance(event_data, str): + return event_data + + if not isinstance(event_data, dict): + return str(event_data) if event_data is not None else "Unknown error" + + return ( + event_data.get("error") + or event_data.get("message") + or event_data.get("text") + or str(event_data) + ) + + def extract_chunk_from_sse_data(data: dict) -> str | None: """ Extract the text chunk from an SSE data payload. @@ -404,6 +437,12 @@ async def chat_stream( try: data = json.loads(data_str) + + # Check for error events from Langflow (e.g. rejected queries) + error_msg = extract_error_from_sse_data(data) + if error_msg: + raise LangflowError(error_msg) + chunk = extract_chunk_from_sse_data(data) if chunk: yield chunk diff --git a/backend/tests/api/routes/v1/test_chats.py b/backend/tests/api/routes/v1/test_chats.py index 90091eb..a762398 100644 --- a/backend/tests/api/routes/v1/test_chats.py +++ b/backend/tests/api/routes/v1/test_chats.py @@ -473,6 +473,46 @@ def test_stream_message_langflow_error_still_saves_user_message( ).all() assert len(assistant_messages) == 0 + def test_stream_message_empty_response_returns_error( + self, client: TestClient, session: Session, test_chat: Chat, monkeypatch + ): + """Test that empty Langflow response yields SSE error event, not done.""" + from app.services.langflow.mock_client import MockLangflowClient + from app.api.routes.v1 import chat_messages + + # Create a mock client that returns empty content (simulates rejected query) + empty_client = MockLangflowClient(responses=[""], stream_delay=0) + monkeypatch.setattr(chat_messages, "get_langflow_client", lambda: empty_client) + + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/stream", + json={"content": "UNSAFE_TEST"}, + ) + + assert response.status_code == 200 + + # Parse SSE events + events = [] + for line in response.text.split("\n\n"): + if line.startswith("data: "): + events.append(json.loads(line[6:])) + + # Should get an error event, not a done event + error_events = [e for e in events if e.get("type") == "error"] + assert len(error_events) == 1 + assert "No response received" in error_events[0]["error"] + + # No done event should be present + done_events = [e for e in events if e.get("type") == "done"] + assert len(done_events) == 0 + + # No assistant message should be saved + assistant_msgs = session.query(ChatMessage).filter( + ChatMessage.chat_id == test_chat.id, + ChatMessage.role == "assistant" + ).all() + assert len(assistant_msgs) == 0 + @pytest.fixture def other_user(session: Session) -> User: diff --git a/backend/tests/services/test_langflow.py b/backend/tests/services/test_langflow.py index 99fda0d..119997f 100644 --- a/backend/tests/services/test_langflow.py +++ b/backend/tests/services/test_langflow.py @@ -3,6 +3,7 @@ import pytest from unittest.mock import patch, MagicMock +from app.services.langflow.client import extract_chunk_from_sse_data, extract_error_from_sse_data from app.services.langflow import ( LangflowClient, LangflowError, @@ -211,3 +212,81 @@ def test_is_langflow_configured_missing(self): mock_settings.LANGFLOW_URL = None assert is_langflow_configured() is False + + +class TestExtractErrorFromSseData: + """Tests for extract_error_from_sse_data.""" + + def test_returns_none_for_token_event(self): + data = {"event": "token", "data": {"chunk": "hello"}} + assert extract_error_from_sse_data(data) is None + + def test_returns_none_for_add_message_event(self): + data = {"event": "add_message", "data": {"text": "hi", "sender": "Machine"}} + assert extract_error_from_sse_data(data) is None + + def test_returns_none_for_no_event(self): + data = {"chunk": "hello"} + assert extract_error_from_sse_data(data) is None + + def test_extracts_error_field(self): + data = {"event": "error", "data": {"error": "Query rejected"}} + assert extract_error_from_sse_data(data) == "Query rejected" + + def test_extracts_message_field(self): + data = {"event": "error", "data": {"message": "Not allowed"}} + assert extract_error_from_sse_data(data) == "Not allowed" + + def test_extracts_text_field(self): + data = {"event": "error", "data": {"text": "Invalid input"}} + assert extract_error_from_sse_data(data) == "Invalid input" + + def test_handles_string_data(self): + data = {"event": "error", "data": "Something went wrong"} + assert extract_error_from_sse_data(data) == "Something went wrong" + + def test_falls_back_to_str_representation(self): + data = {"event": "error", "data": {"unexpected_key": "value"}} + result = extract_error_from_sse_data(data) + assert "unexpected_key" in result + + def test_handles_none_data(self): + data = {"event": "error", "data": None} + assert extract_error_from_sse_data(data) == "Unknown error" + + def test_handles_missing_data_key(self): + data = {"event": "error"} + result = extract_error_from_sse_data(data) + assert result == "{}" + + def test_handles_integer_data(self): + data = {"event": "error", "data": 42} + assert extract_error_from_sse_data(data) == "42" + + +class TestExtractChunkFromSseData: + """Tests for extract_chunk_from_sse_data.""" + + def test_token_event(self): + data = {"event": "token", "data": {"chunk": "hello"}} + assert extract_chunk_from_sse_data(data) == "hello" + + def test_add_message_from_machine(self): + data = {"event": "add_message", "data": {"text": "response", "sender": "Machine"}} + assert extract_chunk_from_sse_data(data) == "response" + + def test_add_message_from_ai(self): + data = {"event": "add_message", "data": {"text": "response", "sender": "AI"}} + assert extract_chunk_from_sse_data(data) == "response" + + def test_add_message_from_user_ignored(self): + data = {"event": "add_message", "data": {"text": "input", "sender": "User"}} + assert extract_chunk_from_sse_data(data) is None + + def test_direct_chunk(self): + data = {"chunk": "hello"} + assert extract_chunk_from_sse_data(data) == "hello" + + def test_unknown_event_returns_none(self): + data = {"event": "unknown", "data": {"something": "else"}} + assert extract_chunk_from_sse_data(data) is None