Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion backend/app/api/routes/v1/chat_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,11 @@ async def generate_stream() -> AsyncGenerator[str, None]:
yield format_sse_event({"type": "done", "message_id": assistant_message.id})
else:
logger.warning(f"No content received from Langflow for chat {chat_id}")
yield format_sse_event({"type": "done"})
yield format_sse_event({
"type": "error",
"error": "No response received from the AI service. "
"The query may have been rejected by the flow.",
})

except LangflowError as e:
logger.error(f"Langflow error in chat {chat_id}: {e.message}")
Expand Down
39 changes: 39 additions & 0 deletions backend/app/services/langflow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,39 @@
SSE_DONE_MARKER = "[DONE]"


def extract_error_from_sse_data(data: dict) -> str | None:
"""
Extract an error message from an SSE data payload, if present.

Handles Langflow error event formats where the top-level "event" is "error":
- {"event": "error", "data": {"error": "..."}}
- {"event": "error", "data": {"message": "..."}}
- {"event": "error", "data": {"text": "..."}}
- {"event": "error", "data": "plain string error"}

Falls back to str(data) if none of the known keys are found.

Returns the error message string, or None if not an error event.
"""
event_type = data.get("event")
if event_type != "error":
return None

event_data = data.get("data", {})
if isinstance(event_data, str):
return event_data

if not isinstance(event_data, dict):
return str(event_data) if event_data is not None else "Unknown error"

return (
event_data.get("error")
or event_data.get("message")
or event_data.get("text")
or str(event_data)
)


def extract_chunk_from_sse_data(data: dict) -> str | None:
"""
Extract the text chunk from an SSE data payload.
Expand Down Expand Up @@ -404,6 +437,12 @@ async def chat_stream(

try:
data = json.loads(data_str)

# Check for error events from Langflow (e.g. rejected queries)
error_msg = extract_error_from_sse_data(data)
if error_msg:
raise LangflowError(error_msg)

chunk = extract_chunk_from_sse_data(data)
if chunk:
yield chunk
Expand Down
40 changes: 40 additions & 0 deletions backend/tests/api/routes/v1/test_chats.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,46 @@ def test_stream_message_langflow_error_still_saves_user_message(
).all()
assert len(assistant_messages) == 0

def test_stream_message_empty_response_returns_error(
self, client: TestClient, session: Session, test_chat: Chat, monkeypatch
):
"""Test that empty Langflow response yields SSE error event, not done."""
from app.services.langflow.mock_client import MockLangflowClient
from app.api.routes.v1 import chat_messages

# Create a mock client that returns empty content (simulates rejected query)
empty_client = MockLangflowClient(responses=[""], stream_delay=0)
monkeypatch.setattr(chat_messages, "get_langflow_client", lambda: empty_client)

response = client.post(
f"/api/v1/chats/{test_chat.id}/messages/stream",
json={"content": "UNSAFE_TEST"},
)

assert response.status_code == 200

# Parse SSE events
events = []
for line in response.text.split("\n\n"):
if line.startswith("data: "):
events.append(json.loads(line[6:]))

# Should get an error event, not a done event
error_events = [e for e in events if e.get("type") == "error"]
assert len(error_events) == 1
assert "No response received" in error_events[0]["error"]

# No done event should be present
done_events = [e for e in events if e.get("type") == "done"]
assert len(done_events) == 0

# No assistant message should be saved
assistant_msgs = session.query(ChatMessage).filter(
ChatMessage.chat_id == test_chat.id,
ChatMessage.role == "assistant"
).all()
assert len(assistant_msgs) == 0


@pytest.fixture
def other_user(session: Session) -> User:
Expand Down
79 changes: 79 additions & 0 deletions backend/tests/services/test_langflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from unittest.mock import patch, MagicMock

from app.services.langflow.client import extract_chunk_from_sse_data, extract_error_from_sse_data
from app.services.langflow import (
LangflowClient,
LangflowError,
Expand Down Expand Up @@ -211,3 +212,81 @@ def test_is_langflow_configured_missing(self):
mock_settings.LANGFLOW_URL = None

assert is_langflow_configured() is False


class TestExtractErrorFromSseData:
"""Tests for extract_error_from_sse_data."""

def test_returns_none_for_token_event(self):
data = {"event": "token", "data": {"chunk": "hello"}}
assert extract_error_from_sse_data(data) is None

def test_returns_none_for_add_message_event(self):
data = {"event": "add_message", "data": {"text": "hi", "sender": "Machine"}}
assert extract_error_from_sse_data(data) is None

def test_returns_none_for_no_event(self):
data = {"chunk": "hello"}
assert extract_error_from_sse_data(data) is None

def test_extracts_error_field(self):
data = {"event": "error", "data": {"error": "Query rejected"}}
assert extract_error_from_sse_data(data) == "Query rejected"

def test_extracts_message_field(self):
data = {"event": "error", "data": {"message": "Not allowed"}}
assert extract_error_from_sse_data(data) == "Not allowed"

def test_extracts_text_field(self):
data = {"event": "error", "data": {"text": "Invalid input"}}
assert extract_error_from_sse_data(data) == "Invalid input"

def test_handles_string_data(self):
data = {"event": "error", "data": "Something went wrong"}
assert extract_error_from_sse_data(data) == "Something went wrong"

def test_falls_back_to_str_representation(self):
data = {"event": "error", "data": {"unexpected_key": "value"}}
result = extract_error_from_sse_data(data)
assert "unexpected_key" in result

def test_handles_none_data(self):
data = {"event": "error", "data": None}
assert extract_error_from_sse_data(data) == "Unknown error"

def test_handles_missing_data_key(self):
data = {"event": "error"}
result = extract_error_from_sse_data(data)
assert result == "{}"

def test_handles_integer_data(self):
data = {"event": "error", "data": 42}
assert extract_error_from_sse_data(data) == "42"


class TestExtractChunkFromSseData:
"""Tests for extract_chunk_from_sse_data."""

def test_token_event(self):
data = {"event": "token", "data": {"chunk": "hello"}}
assert extract_chunk_from_sse_data(data) == "hello"

def test_add_message_from_machine(self):
data = {"event": "add_message", "data": {"text": "response", "sender": "Machine"}}
assert extract_chunk_from_sse_data(data) == "response"

def test_add_message_from_ai(self):
data = {"event": "add_message", "data": {"text": "response", "sender": "AI"}}
assert extract_chunk_from_sse_data(data) == "response"

def test_add_message_from_user_ignored(self):
data = {"event": "add_message", "data": {"text": "input", "sender": "User"}}
assert extract_chunk_from_sse_data(data) is None

def test_direct_chunk(self):
data = {"chunk": "hello"}
assert extract_chunk_from_sse_data(data) == "hello"

def test_unknown_event_returns_none(self):
data = {"event": "unknown", "data": {"something": "else"}}
assert extract_chunk_from_sse_data(data) is None