Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/api/health || exit 1

# Run the application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers", "--forwarded-allow-ips", "*"]
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers", "--forwarded-allow-ips", "127.0.0.1"]
68 changes: 57 additions & 11 deletions backend/app/api/routes/v1/chat_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Delete message
"""

import asyncio
import json
import logging
import traceback
Expand Down Expand Up @@ -226,21 +227,66 @@ async def stream_message(
tweaks = build_generic_tweaks(user_data=user_data, app_data=app_data)

async def generate_stream() -> AsyncGenerator[str, None]:
"""Generate SSE events from Langflow streaming response."""
"""Generate SSE events from Langflow streaming response.

Sends periodic SSE heartbeat comments while waiting for LangFlow
to prevent proxies (nginx, OAuth proxy, OpenShift Route) from
closing the idle chunked connection.
"""
client = get_langflow_client()
accumulated_content = ""

# Send initial heartbeat immediately to establish the chunked stream
# (SSE comment — ignored by EventSource clients but keeps proxies alive)
yield ": heartbeat\n\n"

try:
# Stream from Langflow
async for chunk in client.chat_stream(
message=request.content,
session_id=str(chat_id),
flow_id=request.flow_id,
flow_name=request.flow_name,
tweaks=tweaks,
):
accumulated_content += chunk
yield format_sse_event({"type": "content", "content": chunk})
# Use an async queue so we can interleave heartbeats with real data
chunk_queue: asyncio.Queue[str | None] = asyncio.Queue()

async def langflow_producer():
"""Read from LangFlow and put chunks on the queue."""
try:
async for chunk in client.chat_stream(
message=request.content,
session_id=str(chat_id),
flow_id=request.flow_id,
flow_name=request.flow_name,
tweaks=tweaks,
):
await chunk_queue.put(chunk)
finally:
await chunk_queue.put(None) # sentinel

producer_task = asyncio.create_task(langflow_producer())

try:
while True:
try:
chunk = await asyncio.wait_for(
chunk_queue.get(), timeout=15.0
)
except asyncio.TimeoutError:
# No data from LangFlow yet — send heartbeat to keep alive
yield ": heartbeat\n\n"
continue

if chunk is None:
break # stream finished
accumulated_content += chunk
yield format_sse_event({"type": "content", "content": chunk})
finally:
if not producer_task.done():
producer_task.cancel()
try:
await producer_task
except (asyncio.CancelledError, Exception):
pass
# Re-raise any exception from the producer
if producer_task.done() and not producer_task.cancelled():
exc = producer_task.exception()
if exc:
raise exc

# Save assistant message with accumulated content (only if not empty)
if accumulated_content.strip():
Expand Down
7 changes: 6 additions & 1 deletion backend/app/api/routes/v1/utils/health.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging

from fastapi import APIRouter
from sqlmodel import select

from app.api.deps import SessionDep

logger = logging.getLogger(__name__)

router = APIRouter()


Expand All @@ -18,8 +22,9 @@ async def health_check(session: SessionDep):
db_status = "healthy"
db_message = "Database connection successful"
except Exception as e:
logger.error(f"Database health check failed: {e}")
db_status = "unhealthy"
db_message = f"Database connection failed: {str(e)}"
db_message = "Database connection failed"

# Overall status is healthy only if database is healthy
overall_status = "healthy" if db_status == "healthy" else "unhealthy"
Expand Down
23 changes: 8 additions & 15 deletions backend/app/graphql_api/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,11 @@ def users(
skip: int = 0,
limit: int = 100,
) -> list[UserType]:
"""Get a list of users with pagination.

Args:
skip: Number of users to skip
limit: Maximum number of users to return
"""Get a list of users with pagination. Requires admin access."""
current_user = info.context.get("current_user")
if not current_user or not current_user.admin:
raise PermissionError("Admin access required")

Returns:
List of users
"""
session: Session = info.context["session"]

statement = select(User).offset(skip).limit(limit)
Expand All @@ -131,14 +127,11 @@ def users(

@strawberry.field
def user(self, info: Info, id: int) -> UserType | None:
"""Get a single user by ID.

Args:
id: The user ID
"""Get a single user by ID. Requires admin access."""
current_user = info.context.get("current_user")
if not current_user or not current_user.admin:
raise PermissionError("Admin access required")

Returns:
The user if found, None otherwise
"""
session: Session = info.context["session"]
user = session.get(User, id)
return UserType.from_orm(user) if user else None
Expand Down
5 changes: 5 additions & 0 deletions frontend/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ server {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto;
proxy_cache_bypass $http_upgrade;

# SSE streaming support — prevent default 60s proxy_read_timeout
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_buffering off;
}

# Handle client routing
Expand Down
2 changes: 1 addition & 1 deletion helm/langflow/values-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ langflow:
- name: GOOGLE_CLOUD_PROJECT
value: "redhat-ai-analysis"
- name: XDG_STATE_HOME
value: "/app"
value: "/tmp/langflow"
- name: CONFIG_DIR
value: "/app/src/config"
- name: LFX_DEV
Expand Down
2 changes: 2 additions & 0 deletions k8s/app/base/route.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ metadata:
name: multi-agent-platform
labels:
app: multi-agent-platform
annotations:
haproxy.router.openshift.io/timeout: 300s
spec:
to:
kind: Service
Expand Down
2 changes: 2 additions & 0 deletions k8s/app/overlays/dev/oauth-proxy.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ OAUTH2_PROXY_PASS_ACCESS_TOKEN=true
OAUTH2_PROXY_SET_XAUTHREQUEST=true
OAUTH2_PROXY_SKIP_PROVIDER_BUTTON=true
OAUTH2_PROXY_SKIP_AUTH_ROUTES=/api/v1/utils/health-check
OAUTH2_PROXY_UPSTREAM_TIMEOUT=300s
OAUTH2_PROXY_FLUSH_INTERVAL=200ms

# Google Group restriction (requires Admin SDK setup -- see docs/AUTHENTICATION.md)
# Steps: 1) Enable Admin SDK API in Google Cloud Console
Expand Down
4 changes: 1 addition & 3 deletions scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ if ! oc get secret admin-credentials -n "$NAMESPACE" &> /dev/null; then
--from-literal=email="$ADMIN_EMAIL" \
--from-literal=password="$ADMIN_PASS" \
-n "$NAMESPACE"
echo "Created admin-credentials secret"
echo " Email: $ADMIN_EMAIL"
echo " Password: $ADMIN_PASS"
echo "Created admin-credentials secret (use 'oc get secret admin-credentials -o yaml' to retrieve)"
else
echo "admin-credentials already exists"
fi
Expand Down
2 changes: 1 addition & 1 deletion scripts/dev-langfuse.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ start_minio() {
-e MINIO_ROOT_USER=$MINIO_ROOT_USER \
-e MINIO_ROOT_PASSWORD=$MINIO_ROOT_PASSWORD \
-v "${MINIO_DATA}:/data" \
-p 9090:9000 \
-p 127.0.0.1:9090:9000 \
-p 127.0.0.1:9091:9001 \
docker.io/minio/minio:latest \
server /data --console-address ":9001"
Expand Down
4 changes: 2 additions & 2 deletions scripts/import_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,10 @@ def _build_mcp_entry(server_config: dict, packages_dir: str = "/app/langflow/pac
env.setdefault("PYTHONPATH", packages_dir)
entry["env"] = env
return entry
elif server_type == "http":
elif server_type in ("http", "sse"):
url = server_config.get("url")
if not url:
log_error(f"MCP server missing 'url' for http type")
log_error(f"MCP server missing 'url' for {server_type} type")
return None
return {"url": url}
else:
Expand Down