From 86548d6663d38a9bdc0bc5b901164bf52b63b0d1 Mon Sep 17 00:00:00 2001 From: Chris Chase Date: Wed, 25 Feb 2026 16:19:26 -0500 Subject: [PATCH 1/2] fix: address security review findings - Restrict forwarded-allow-ips to 127.0.0.1 (was wildcard) - Require admin for GraphQL users/user queries - Remove admin password echo from deploy script - Bind MinIO S3 port to localhost only - Mask database error details in health check response - Support SSE type in MCP server import --- backend/Dockerfile | 2 +- backend/app/api/routes/v1/utils/health.py | 7 ++++++- backend/app/graphql_api/schema.py | 23 ++++++++--------------- scripts/deploy.sh | 4 +--- scripts/dev-langfuse.sh | 2 +- scripts/import_flows.py | 4 ++-- 6 files changed, 19 insertions(+), 23 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index dbe3724..91f62e2 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -59,4 +59,4 @@ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/api/health || exit 1 # Run the application -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers", "--forwarded-allow-ips", "*"] \ No newline at end of file +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers", "--forwarded-allow-ips", "127.0.0.1"] \ No newline at end of file diff --git a/backend/app/api/routes/v1/utils/health.py b/backend/app/api/routes/v1/utils/health.py index 6e24c48..e9f6ff9 100644 --- a/backend/app/api/routes/v1/utils/health.py +++ b/backend/app/api/routes/v1/utils/health.py @@ -1,8 +1,12 @@ +import logging + from fastapi import APIRouter from sqlmodel import select from app.api.deps import SessionDep +logger = logging.getLogger(__name__) + router = APIRouter() @@ -18,8 +22,9 @@ async def health_check(session: SessionDep): db_status = "healthy" db_message = "Database connection successful" except Exception as e: + logger.error(f"Database health check failed: {e}") db_status = "unhealthy" - db_message = f"Database connection failed: {str(e)}" + db_message = "Database connection failed" # Overall status is healthy only if database is healthy overall_status = "healthy" if db_status == "healthy" else "unhealthy" diff --git a/backend/app/graphql_api/schema.py b/backend/app/graphql_api/schema.py index 433b114..01fe6d7 100644 --- a/backend/app/graphql_api/schema.py +++ b/backend/app/graphql_api/schema.py @@ -114,15 +114,11 @@ def users( skip: int = 0, limit: int = 100, ) -> list[UserType]: - """Get a list of users with pagination. - - Args: - skip: Number of users to skip - limit: Maximum number of users to return + """Get a list of users with pagination. Requires admin access.""" + current_user = info.context.get("current_user") + if not current_user or not current_user.admin: + raise PermissionError("Admin access required") - Returns: - List of users - """ session: Session = info.context["session"] statement = select(User).offset(skip).limit(limit) @@ -131,14 +127,11 @@ def users( @strawberry.field def user(self, info: Info, id: int) -> UserType | None: - """Get a single user by ID. - - Args: - id: The user ID + """Get a single user by ID. Requires admin access.""" + current_user = info.context.get("current_user") + if not current_user or not current_user.admin: + raise PermissionError("Admin access required") - Returns: - The user if found, None otherwise - """ session: Session = info.context["session"] user = session.get(User, id) return UserType.from_orm(user) if user else None diff --git a/scripts/deploy.sh b/scripts/deploy.sh index 9219329..d25d88b 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -158,9 +158,7 @@ if ! oc get secret admin-credentials -n "$NAMESPACE" &> /dev/null; then --from-literal=email="$ADMIN_EMAIL" \ --from-literal=password="$ADMIN_PASS" \ -n "$NAMESPACE" - echo "Created admin-credentials secret" - echo " Email: $ADMIN_EMAIL" - echo " Password: $ADMIN_PASS" + echo "Created admin-credentials secret (use 'oc get secret admin-credentials -o yaml' to retrieve)" else echo "admin-credentials already exists" fi diff --git a/scripts/dev-langfuse.sh b/scripts/dev-langfuse.sh index 4e2648e..d9fe5fd 100755 --- a/scripts/dev-langfuse.sh +++ b/scripts/dev-langfuse.sh @@ -142,7 +142,7 @@ start_minio() { -e MINIO_ROOT_USER=$MINIO_ROOT_USER \ -e MINIO_ROOT_PASSWORD=$MINIO_ROOT_PASSWORD \ -v "${MINIO_DATA}:/data" \ - -p 9090:9000 \ + -p 127.0.0.1:9090:9000 \ -p 127.0.0.1:9091:9001 \ docker.io/minio/minio:latest \ server /data --console-address ":9001" diff --git a/scripts/import_flows.py b/scripts/import_flows.py index ea68d37..838f165 100644 --- a/scripts/import_flows.py +++ b/scripts/import_flows.py @@ -254,10 +254,10 @@ def _build_mcp_entry(server_config: dict, packages_dir: str = "/app/langflow/pac env.setdefault("PYTHONPATH", packages_dir) entry["env"] = env return entry - elif server_type == "http": + elif server_type in ("http", "sse"): url = server_config.get("url") if not url: - log_error(f"MCP server missing 'url' for http type") + log_error(f"MCP server missing 'url' for {server_type} type") return None return {"url": url} else: From df68dabfb2b689f0342866c54e60aabbf0667d66 Mon Sep 17 00:00:00 2001 From: Chris Chase Date: Wed, 25 Feb 2026 21:48:34 -0500 Subject: [PATCH 2/2] fix: SSE streaming timeouts and heartbeat keepalive - Add SSE heartbeat to chat_messages.py to prevent chunked connection drops through proxy layers (nginx, OAuth proxy, OpenShift Route) - Set nginx proxy_read_timeout/proxy_send_timeout to 300s for SSE - Add Route timeout annotation (300s) for OpenShift HAProxy - Add OAuth proxy UPSTREAM_TIMEOUT and FLUSH_INTERVAL settings - Fix XDG_STATE_HOME to writable path in LangFlow Helm values --- backend/app/api/routes/v1/chat_messages.py | 68 ++++++++++++++++++---- frontend/nginx.conf | 5 ++ helm/langflow/values-dev.yaml | 2 +- k8s/app/base/route.yaml | 2 + k8s/app/overlays/dev/oauth-proxy.env | 2 + 5 files changed, 67 insertions(+), 12 deletions(-) diff --git a/backend/app/api/routes/v1/chat_messages.py b/backend/app/api/routes/v1/chat_messages.py index 0864788..63c57a0 100644 --- a/backend/app/api/routes/v1/chat_messages.py +++ b/backend/app/api/routes/v1/chat_messages.py @@ -8,6 +8,7 @@ - Delete message """ +import asyncio import json import logging import traceback @@ -226,21 +227,66 @@ async def stream_message( tweaks = build_generic_tweaks(user_data=user_data, app_data=app_data) async def generate_stream() -> AsyncGenerator[str, None]: - """Generate SSE events from Langflow streaming response.""" + """Generate SSE events from Langflow streaming response. + + Sends periodic SSE heartbeat comments while waiting for LangFlow + to prevent proxies (nginx, OAuth proxy, OpenShift Route) from + closing the idle chunked connection. + """ client = get_langflow_client() accumulated_content = "" + # Send initial heartbeat immediately to establish the chunked stream + # (SSE comment — ignored by EventSource clients but keeps proxies alive) + yield ": heartbeat\n\n" + try: - # Stream from Langflow - async for chunk in client.chat_stream( - message=request.content, - session_id=str(chat_id), - flow_id=request.flow_id, - flow_name=request.flow_name, - tweaks=tweaks, - ): - accumulated_content += chunk - yield format_sse_event({"type": "content", "content": chunk}) + # Use an async queue so we can interleave heartbeats with real data + chunk_queue: asyncio.Queue[str | None] = asyncio.Queue() + + async def langflow_producer(): + """Read from LangFlow and put chunks on the queue.""" + try: + async for chunk in client.chat_stream( + message=request.content, + session_id=str(chat_id), + flow_id=request.flow_id, + flow_name=request.flow_name, + tweaks=tweaks, + ): + await chunk_queue.put(chunk) + finally: + await chunk_queue.put(None) # sentinel + + producer_task = asyncio.create_task(langflow_producer()) + + try: + while True: + try: + chunk = await asyncio.wait_for( + chunk_queue.get(), timeout=15.0 + ) + except asyncio.TimeoutError: + # No data from LangFlow yet — send heartbeat to keep alive + yield ": heartbeat\n\n" + continue + + if chunk is None: + break # stream finished + accumulated_content += chunk + yield format_sse_event({"type": "content", "content": chunk}) + finally: + if not producer_task.done(): + producer_task.cancel() + try: + await producer_task + except (asyncio.CancelledError, Exception): + pass + # Re-raise any exception from the producer + if producer_task.done() and not producer_task.cancelled(): + exc = producer_task.exception() + if exc: + raise exc # Save assistant message with accumulated content (only if not empty) if accumulated_content.strip(): diff --git a/frontend/nginx.conf b/frontend/nginx.conf index 53ef98c..8aadaa5 100644 --- a/frontend/nginx.conf +++ b/frontend/nginx.conf @@ -29,6 +29,11 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto; proxy_cache_bypass $http_upgrade; + + # SSE streaming support — prevent default 60s proxy_read_timeout + proxy_read_timeout 300s; + proxy_send_timeout 300s; + proxy_buffering off; } # Handle client routing diff --git a/helm/langflow/values-dev.yaml b/helm/langflow/values-dev.yaml index bbdf56d..56dfd62 100644 --- a/helm/langflow/values-dev.yaml +++ b/helm/langflow/values-dev.yaml @@ -93,7 +93,7 @@ langflow: - name: GOOGLE_CLOUD_PROJECT value: "redhat-ai-analysis" - name: XDG_STATE_HOME - value: "/app" + value: "/tmp/langflow" - name: CONFIG_DIR value: "/app/src/config" - name: LFX_DEV diff --git a/k8s/app/base/route.yaml b/k8s/app/base/route.yaml index 9aff9ab..a797722 100644 --- a/k8s/app/base/route.yaml +++ b/k8s/app/base/route.yaml @@ -4,6 +4,8 @@ metadata: name: multi-agent-platform labels: app: multi-agent-platform + annotations: + haproxy.router.openshift.io/timeout: 300s spec: to: kind: Service diff --git a/k8s/app/overlays/dev/oauth-proxy.env b/k8s/app/overlays/dev/oauth-proxy.env index 4599150..d33a51f 100644 --- a/k8s/app/overlays/dev/oauth-proxy.env +++ b/k8s/app/overlays/dev/oauth-proxy.env @@ -11,6 +11,8 @@ OAUTH2_PROXY_PASS_ACCESS_TOKEN=true OAUTH2_PROXY_SET_XAUTHREQUEST=true OAUTH2_PROXY_SKIP_PROVIDER_BUTTON=true OAUTH2_PROXY_SKIP_AUTH_ROUTES=/api/v1/utils/health-check +OAUTH2_PROXY_UPSTREAM_TIMEOUT=300s +OAUTH2_PROXY_FLUSH_INTERVAL=200ms # Google Group restriction (requires Admin SDK setup -- see docs/AUTHENTICATION.md) # Steps: 1) Enable Admin SDK API in Google Cloud Console