diff --git a/.gitignore b/.gitignore index f32c2b0..8d5e149 100644 --- a/.gitignore +++ b/.gitignore @@ -86,3 +86,4 @@ helm/*/secrets-*.yaml # Playwright MCP .playwright-mcp/ .planning/ +app.db diff --git a/backend/app/api/routes/v1/chat_messages.py b/backend/app/api/routes/v1/chat_messages.py index 7e9082a..0864788 100644 --- a/backend/app/api/routes/v1/chat_messages.py +++ b/backend/app/api/routes/v1/chat_messages.py @@ -34,8 +34,6 @@ build_app_settings_data, build_generic_tweaks, build_user_settings_data, - get_required_services_for_flow, - MissingTokenError, ) from app.services.langflow import LangflowError, get_langflow_client @@ -215,22 +213,11 @@ async def stream_message( session.commit() session.refresh(user_message) - # Get required OAuth services for this flow - required_services = get_required_services_for_flow(flow_name) if flow_name else [] - - # Build user data with OAuth tokens - try: - user_data = await build_user_settings_data( - session=session, - user_id=current_user.id, - services=required_services, - ) - except MissingTokenError as e: - raise HTTPException( - status_code=400, - detail=f"Missing integration: {e.service_name}. " - "Please connect the service in Settings.", - ) + # Build user data with all available OAuth tokens + user_data = await build_user_settings_data( + session=session, + user_id=current_user.id, + ) # Build app data (feature flags, config) app_data = build_app_settings_data() diff --git a/backend/app/services/flow_token_injection.py b/backend/app/services/flow_token_injection.py index fdc5a26..d410919 100644 --- a/backend/app/services/flow_token_injection.py +++ b/backend/app/services/flow_token_injection.py @@ -2,7 +2,8 @@ Flow token injection service for Langflow integration. Injects user OAuth tokens into Langflow flow tweaks via the generic -UserSettings component. This decouples the backend from flow internals. +UserSettings component. The platform is flow-agnostic: it always injects +all available tokens and lets flows take what they need. Architecture: - API keys (OPENAI_API_KEY, etc.) -> langflow.env global variables @@ -13,66 +14,52 @@ import logging from sqlmodel import Session +from app.crud.integration import get_user_integrations from app.services.token_refresh import get_valid_token logger = logging.getLogger(__name__) -class MissingTokenError(Exception): - """Error raised when a required service token is missing.""" - - def __init__(self, service_name: str, message: str | None = None): - self.service_name = service_name - default_msg = f"Missing or expired token for service: {service_name}" - super().__init__(message or default_msg) - - async def build_user_settings_data( *, session: Session, user_id: int, - services: list[str] | None = None, ) -> dict: """ - Build user settings dict with OAuth tokens for specified services. + Build user settings dict with all available OAuth tokens. + + Discovers the user's connected integrations and injects valid tokens + for each. Flows opt in by reading the tokens they need from + UserSettings.data — the platform does not need to know which flows + use which services. Args: session: Database session user_id: User ID - services: List of service names to include tokens for. - If None, includes all available tokens. Returns: Dict suitable for UserSettings.data injection - Raises: - MissingTokenError: If a required service token is not available - Example: user_data = await build_user_settings_data( - session=session, - user_id=user.id, - services=["google_drive"], + session=session, user_id=user.id, ) - # Result: {"google_drive_token": "...", "user_id": 123} + # Result: {"google_drive_token": "...", "dataverse_token": "...", "user_id": 123} """ user_data: dict = {"user_id": user_id} - # Default services to check if none specified - if services is None: - services = ["google_drive", "dataverse"] + integrations = get_user_integrations(session=session, user_id=user_id) - for service_name in services: + for integration in integrations: token = await get_valid_token( session=session, user_id=user_id, - service_name=service_name, + service_name=integration.service_name, ) if token is not None: - # Use consistent key naming: {service_name}_token - user_data[f"{service_name}_token"] = token - logger.debug("Added token for %s to user settings", service_name) + user_data[f"{integration.service_name}_token"] = token + logger.debug("Added token for %s to user settings", integration.service_name) return user_data @@ -126,31 +113,3 @@ def build_generic_tweaks( tweaks["App Settings"] = {"settings_data": app_data} return tweaks - - -def get_required_services_for_flow(flow_name: str) -> list[str]: - """ - Get list of OAuth services required by a flow. - - This is the minimal configuration - just which services need tokens. - The injection target is always UserSettings.data. - - Args: - flow_name: Name of the Langflow flow - - Returns: - List of service names (e.g., ["google_drive", "dataverse"]) - """ - # Map flow names to required OAuth services - # The injection target is always UserSettings.data - flow_services: dict[str, list[str]] = { - # Enterprise agent needs Google Drive - "enterprise-agent": ["google_drive"], - # Test flows for validation - "test-google-drive": ["google_drive"], - "test-dataverse": ["dataverse"], - # Flows that need both - "multi-source-rag": ["google_drive", "dataverse"], - } - - return flow_services.get(flow_name, []) diff --git a/backend/app/services/langflow/client.py b/backend/app/services/langflow/client.py index ffc31e5..a314250 100644 --- a/backend/app/services/langflow/client.py +++ b/backend/app/services/langflow/client.py @@ -18,7 +18,10 @@ # HTTP timeout constants (seconds) LIST_FLOWS_TIMEOUT = 30.0 -CHAT_TIMEOUT = 120.0 +# Granular chat timeout: short connect/write/pool, long read for streaming. +# LangFlow flows can run 5-30+ minutes for deep research tasks. +# TODO: rearchitect for long-running tasks (async job queue + polling) +CHAT_TIMEOUT = httpx.Timeout(connect=30.0, read=1800.0, write=30.0, pool=30.0) # SSE constants SSE_DATA_PREFIX = "data: " diff --git a/backend/app/services/token_refresh.py b/backend/app/services/token_refresh.py index d034c64..3c5c6f9 100644 --- a/backend/app/services/token_refresh.py +++ b/backend/app/services/token_refresh.py @@ -15,6 +15,7 @@ import logging from datetime import datetime, timedelta, timezone +from cryptography.fernet import InvalidToken from sqlmodel import Session from app.crud.integration import ( @@ -139,7 +140,15 @@ async def get_valid_token( ) # Return the decrypted access token - tokens = get_decrypted_tokens(integration) + try: + tokens = get_decrypted_tokens(integration) + except InvalidToken: + logger.error( + "Failed to decrypt token for %s user %s (encryption key mismatch)", + service_name, + user_id, + ) + return None return tokens["access_token"] diff --git a/backend/tests/services/test_flow_token_injection.py b/backend/tests/services/test_flow_token_injection.py index edf1d53..bd7dc39 100644 --- a/backend/tests/services/test_flow_token_injection.py +++ b/backend/tests/services/test_flow_token_injection.py @@ -5,7 +5,6 @@ - build_user_settings_data: OAuth tokens via UserSettings component - build_app_settings_data: App context via AppSettings component - build_generic_tweaks: Tweak dict assembly for Langflow -- get_required_services_for_flow: Flow-to-services mapping """ import pytest @@ -17,8 +16,6 @@ build_app_settings_data, build_generic_tweaks, build_user_settings_data, - get_required_services_for_flow, - MissingTokenError, ) @@ -26,8 +23,8 @@ class TestBuildUserSettingsData: """Tests for building user settings with OAuth tokens.""" @pytest.mark.asyncio - async def test_with_valid_token_single_service(self, session: Session): - """Builds user data with token for a single service.""" + async def test_with_single_integration(self, session: Session): + """Injects token for a single connected service.""" from app.crud.integration import create_or_update_integration user = User(email="test@example.com", username="testuser") @@ -47,15 +44,14 @@ async def test_with_valid_token_single_service(self, session: Session): user_data = await build_user_settings_data( session=session, user_id=user.id, - services=["google_drive"], ) assert user_data["user_id"] == user.id assert user_data["google_drive_token"] == "google-access-token" @pytest.mark.asyncio - async def test_with_multiple_services(self, session: Session): - """Builds user data with tokens for multiple services.""" + async def test_with_multiple_integrations(self, session: Session): + """Injects tokens for all connected services.""" from app.crud.integration import create_or_update_integration user = User(email="test@example.com", username="testuser") @@ -81,7 +77,6 @@ async def test_with_multiple_services(self, session: Session): user_data = await build_user_settings_data( session=session, user_id=user.id, - services=["google_drive", "dataverse"], ) assert user_data["user_id"] == user.id @@ -89,67 +84,73 @@ async def test_with_multiple_services(self, session: Session): assert user_data["dataverse_token"] == "dataverse-token" @pytest.mark.asyncio - async def test_with_no_services_defaults_to_all(self, session: Session): - """When services is None, defaults to checking all known services.""" + async def test_with_no_integrations(self, session: Session): + """Returns only user_id when no services are connected.""" user = User(email="test@example.com", username="testuser") session.add(user) session.commit() session.refresh(user) - # No integrations created — get_valid_token returns None for each user_data = await build_user_settings_data( session=session, user_id=user.id, - services=None, ) - # Should still have user_id, but no tokens (none connected) assert user_data["user_id"] == user.id assert "google_drive_token" not in user_data assert "dataverse_token" not in user_data @pytest.mark.asyncio - async def test_with_empty_services_list(self, session: Session): - """Empty list means no services needed — returns only user_id.""" - user = User(email="test@example.com", username="testuser") - session.add(user) - session.commit() - session.refresh(user) - - user_data = await build_user_settings_data( - session=session, - user_id=user.id, - services=[], - ) - - assert user_data == {"user_id": user.id} + async def test_expired_token_omitted(self, session: Session): + """Expired tokens are omitted (get_valid_token returns None).""" + from app.crud.integration import create_or_update_integration - @pytest.mark.asyncio - async def test_missing_integration_skips_gracefully(self, session: Session): - """When a service has no integration, its token is omitted (not error).""" user = User(email="test@example.com", username="testuser") session.add(user) session.commit() session.refresh(user) - # No google_drive integration created - user_data = await build_user_settings_data( + # Create an expired integration (expires_in=0 makes it already expired) + create_or_update_integration( session=session, user_id=user.id, - services=["google_drive"], + service_name="google_drive", + access_token="expired-token", + expires_in=0, ) + # Mock get_valid_token to return None (simulating expired + refresh failure) + with patch( + "app.services.flow_token_injection.get_valid_token", + new_callable=AsyncMock, + return_value=None, + ): + user_data = await build_user_settings_data( + session=session, + user_id=user.id, + ) + assert user_data["user_id"] == user.id assert "google_drive_token" not in user_data @pytest.mark.asyncio async def test_with_refreshed_token(self, session: Session): """Uses refreshed token value when get_valid_token refreshes.""" + from app.crud.integration import create_or_update_integration + user = User(email="test@example.com", username="testuser") session.add(user) session.commit() session.refresh(user) + create_or_update_integration( + session=session, + user_id=user.id, + service_name="google_drive", + access_token="old-token", + expires_in=3600, + ) + with patch( "app.services.flow_token_injection.get_valid_token", new_callable=AsyncMock, @@ -158,7 +159,6 @@ async def test_with_refreshed_token(self, session: Session): user_data = await build_user_settings_data( session=session, user_id=user.id, - services=["google_drive"], ) assert user_data["google_drive_token"] == "refreshed-token" @@ -224,30 +224,3 @@ def test_with_empty_dicts(self): """Empty dicts are falsy — no tweaks generated.""" tweaks = build_generic_tweaks(user_data={}, app_data={}) assert tweaks == {} - - -class TestGetRequiredServicesForFlow: - """Tests for flow-to-services mapping.""" - - def test_known_flow_returns_services(self): - """Known flow returns its required OAuth services.""" - services = get_required_services_for_flow("enterprise-agent") - assert services == ["google_drive"] - - def test_unknown_flow_returns_empty(self): - """Unknown flow returns empty list (no OAuth needed).""" - services = get_required_services_for_flow("unknown-flow-name") - assert services == [] - - def test_return_type_is_list(self): - """Return type is always list[str].""" - services = get_required_services_for_flow("enterprise-agent") - assert isinstance(services, list) - for s in services: - assert isinstance(s, str) - - def test_multi_service_flow(self): - """Flow requiring multiple services returns all of them.""" - services = get_required_services_for_flow("multi-source-rag") - assert "google_drive" in services - assert "dataverse" in services diff --git a/docs/OAUTH_INTEGRATIONS.md b/docs/OAUTH_INTEGRATIONS.md index 2b4e931..f45a96c 100644 --- a/docs/OAUTH_INTEGRATIONS.md +++ b/docs/OAUTH_INTEGRATIONS.md @@ -158,17 +158,12 @@ tweaks = await build_flow_tweaks( await client.chat(message, tweaks=tweaks) ``` -### Handling Missing Tokens +### Missing Tokens -```python -from app.services.flow_token_injection import build_flow_tweaks, MissingTokenError - -try: - tweaks = await build_flow_tweaks(...) -except MissingTokenError as e: - # User needs to connect the service - return {"error": f"Please connect {e.service_name} first"} -``` +The platform injects all available tokens automatically. If a user hasn't +connected a service, its token is simply omitted from `UserSettings.data`. +Flows should handle missing tokens gracefully (e.g., skip document search +if no `google_drive_token` is present). ## Frontend Integration diff --git a/scripts/dev-langflow.sh b/scripts/dev-langflow.sh index b0a490b..d88c341 100755 --- a/scripts/dev-langflow.sh +++ b/scripts/dev-langflow.sh @@ -15,6 +15,7 @@ init_container_tool || exit 1 # Configuration LANGFLOW_VERSION="${LANGFLOW_VERSION:-latest}" +LANGFLOW_IMAGE="${LANGFLOW_IMAGE:-docker.io/langflowai/langflow:${LANGFLOW_VERSION}}" CONTAINER_NAME="app-langflow-dev" LANGFLOW_PORT="${LANGFLOW_PORT:-7860}" PROJECT_ROOT="${SCRIPT_DIR}/.." @@ -76,34 +77,97 @@ case "$1" in # Generate a secret key to avoid file permission issues SECRET_KEY=$(python3 -c "import secrets; print(secrets.token_urlsafe(32))" 2>/dev/null || openssl rand -base64 32) - $CONTAINER_TOOL run -d \ - --name $CONTAINER_NAME \ - -e LANGFLOW_DATABASE_URL="$DATABASE_URL" \ - -e LANGFLOW_CONFIG_DIR=/app/langflow \ - -e LANGFLOW_SECRET_KEY="$SECRET_KEY" \ - -e LANGFLOW_AUTO_LOGIN=true \ - -e LANGFLOW_SKIP_AUTH_AUTO_LOGIN=true \ - -e LANGFLOW_LOG_LEVEL=info \ - -e LANGFLOW_PORT=7860 \ - -e LANGFLOW_COMPONENTS_PATH=/app/langflow/components \ - -e PYTHONPATH=/app/langflow/packages \ - -e LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT="OPENAI_API_KEY,GEMINI_API_KEY,ANTHROPIC_API_KEY,OLLAMA_BASE_URL" \ - -e LANGFLOW_FALLBACK_TO_ENV_VAR=true \ - -e OPENAI_API_KEY="${OPENAI_API_KEY:-}" \ - -e GEMINI_API_KEY="${GEMINI_API_KEY:-}" \ - -e ANTHROPIC_API_KEY="${ANTHROPIC_API_KEY:-}" \ - -e OLLAMA_BASE_URL="${OLLAMA_BASE_URL:-http://host.containers.internal:11434}" \ - -e LANGFUSE_SECRET_KEY="${LANGFUSE_SECRET_KEY:-sk-dev-secret-key}" \ - -e LANGFUSE_PUBLIC_KEY="${LANGFUSE_PUBLIC_KEY:-pk-dev-public-key}" \ - -e LANGFUSE_HOST="${LANGFUSE_HOST:-http://${DB_HOST}:${LANGFUSE_WEB_PORT:-3000}}" \ - -e TZ="${TZ:-UTC}" \ - -p $LANGFLOW_PORT:7860 \ - -v "${DATA_DIR}:/app/langflow" \ - -v "${DATA_DIR}/components:/app/langflow/components:z" \ - -v "${DATA_DIR}/packages:/app/langflow/packages:z" \ - --add-host=host.docker.internal:host-gateway \ - --add-host=host.containers.internal:host-gateway \ - docker.io/langflowai/langflow:$LANGFLOW_VERSION + # Build env var and volume args (shared between stock and custom images) + COMMON_ENV_ARGS=( + -e LANGFLOW_SECRET_KEY="$SECRET_KEY" + -e LANGFLOW_AUTO_LOGIN=true + -e LANGFLOW_SKIP_AUTH_AUTO_LOGIN=true + -e LANGFLOW_LOG_LEVEL=info + -e LANGFLOW_PORT=7860 + -e LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT="OPENAI_API_KEY,GEMINI_API_KEY,ANTHROPIC_API_KEY,OLLAMA_BASE_URL" + -e LANGFLOW_FALLBACK_TO_ENV_VAR=true + -e OPENAI_API_KEY="${OPENAI_API_KEY:-}" + -e GEMINI_API_KEY="${GEMINI_API_KEY:-}" + -e ANTHROPIC_API_KEY="${ANTHROPIC_API_KEY:-}" + -e OLLAMA_BASE_URL="${OLLAMA_BASE_URL:-http://host.containers.internal:11434}" + -e LANGFUSE_SECRET_KEY="${LANGFUSE_SECRET_KEY:-sk-dev-secret-key}" + -e LANGFUSE_PUBLIC_KEY="${LANGFUSE_PUBLIC_KEY:-pk-dev-public-key}" + -e LANGFUSE_HOST="${LANGFUSE_HOST:-http://${DB_HOST}:${LANGFUSE_WEB_PORT:-3000}}" + -e TZ="${TZ:-UTC}" + -e LANGFLOW_LAZY_LOAD_COMPONENTS=false + ) + + # Detect host DNS servers (needed for VPN-resolved internal hostnames) + DNS_ARGS=() + for dns in $(scutil --dns 2>/dev/null | awk '/nameserver\[0\]/{print $3}' | head -3); do + DNS_ARGS+=(--dns "$dns") + done + + COMMON_RUN_ARGS=( + -p $LANGFLOW_PORT:7860 + --add-host=host.docker.internal:host-gateway + --add-host=host.containers.internal:host-gateway + "${DNS_ARGS[@]}" + ) + + # Determine if using custom image (not stock langflowai) + if [[ "$LANGFLOW_IMAGE" != docker.io/langflowai/* ]]; then + # Custom image (e.g., agents-python custom build) + # NOTE: Custom images use /app/langflow for LangFlow source code + # (editable install), so data is mounted at /data/langflow instead. + # Uses same PostgreSQL and DATA_DIR conventions as stock image. + log_info "Using custom LangFlow image: $LANGFLOW_IMAGE" + + CUSTOM_ENV_ARGS=( + -e LANGFLOW_DATABASE_URL="$DATABASE_URL" + -e LANGFLOW_CONFIG_DIR=/data/langflow + -e PYTHONPATH=/data/langflow/packages + -e GOOGLE_CLOUD_PROJECT="${GOOGLE_CLOUD_PROJECT:-}" + -e GOOGLE_CLIENT_ID="${GOOGLE_CLIENT_ID:-}" + -e GOOGLE_CLIENT_SECRET="${GOOGLE_CLIENT_SECRET:-}" + -e GRANITE_GUARDIAN_ENDPOINT="${GRANITE_GUARDIAN_ENDPOINT:-}" + -e GRANITE_GUARDIAN_API_KEY="${GRANITE_GUARDIAN_API_KEY:-}" + -e GRANITE_CA_BUNDLE="${GRANITE_CA_BUNDLE:-}" + ) + + CUSTOM_VOL_ARGS=( + -v "${DATA_DIR}:/data/langflow" + -v "${DATA_DIR}/components:/data/langflow/components:z" + -v "${DATA_DIR}/packages:/data/langflow/packages:z" + ) + + # Mount platform components into image's component path + if [ -d "${DATA_DIR}/components/platform" ]; then + CUSTOM_VOL_ARGS+=(-v "${DATA_DIR}/components/platform:/app/components/platform:ro") + fi + + # Mount gcloud credentials for Vertex AI auth (harmless if dir doesn't exist) + if [ -d "${HOME}/.config/gcloud" ]; then + CUSTOM_VOL_ARGS+=(-v "${HOME}/.config/gcloud:/root/.config/gcloud:ro") + fi + + $CONTAINER_TOOL run -d \ + --name $CONTAINER_NAME \ + "${COMMON_ENV_ARGS[@]}" \ + "${CUSTOM_ENV_ARGS[@]}" \ + "${COMMON_RUN_ARGS[@]}" \ + "${CUSTOM_VOL_ARGS[@]}" \ + "$LANGFLOW_IMAGE" + else + # Stock LangFlow image (uses PostgreSQL for shared state) + $CONTAINER_TOOL run -d \ + --name $CONTAINER_NAME \ + "${COMMON_ENV_ARGS[@]}" \ + -e LANGFLOW_DATABASE_URL="$DATABASE_URL" \ + -e LANGFLOW_CONFIG_DIR=/app/langflow \ + -e LANGFLOW_COMPONENTS_PATH=/app/langflow/components \ + -e PYTHONPATH=/app/langflow/packages \ + "${COMMON_RUN_ARGS[@]}" \ + -v "${DATA_DIR}:/app/langflow" \ + -v "${DATA_DIR}/components:/app/langflow/components:z" \ + -v "${DATA_DIR}/packages:/app/langflow/packages:z" \ + "$LANGFLOW_IMAGE" + fi fi log_info "Waiting for LangFlow to be ready..." @@ -207,11 +271,19 @@ case "$1" in echo "" echo "Environment variables:" echo " LANGFLOW_VERSION - LangFlow version (default: latest)" + echo " LANGFLOW_IMAGE - Custom LangFlow image (overrides LANGFLOW_VERSION)" + echo " Example: LANGFLOW_IMAGE=langflow-langflow:latest" echo " LANGFLOW_PORT - LangFlow port (default: 7860)" echo " POSTGRES_USER - Database user (default: app)" echo " POSTGRES_PASSWORD - Database password (default: changethis)" echo " LANGFLOW_DB - Database name (default: langflow)" echo "" + echo "Custom image env vars (forwarded when LANGFLOW_IMAGE is set):" + echo " GOOGLE_CLOUD_PROJECT - GCP project for Vertex AI" + echo " GRANITE_GUARDIAN_ENDPOINT - Granite Guardian API endpoint" + echo " GRANITE_GUARDIAN_API_KEY - Granite Guardian API key" + echo " GRANITE_CA_BUNDLE - Custom CA bundle path" + echo "" echo "Prerequisites:" echo " PostgreSQL must be running: make db-start" exit 1 diff --git a/scripts/dev-oauth.sh b/scripts/dev-oauth.sh index 61a4baf..12d8e05 100755 --- a/scripts/dev-oauth.sh +++ b/scripts/dev-oauth.sh @@ -96,6 +96,7 @@ start_oauth() { -e OAUTH2_PROXY_PASS_USER_HEADERS="true" \ -e OAUTH2_PROXY_SET_XAUTHREQUEST="true" \ -e OAUTH2_PROXY_SKIP_PROVIDER_BUTTON="true" \ + -e OAUTH2_PROXY_UPSTREAM_TIMEOUT="1800s" \ -e OAUTH2_PROXY_REDIRECT_URL="http://localhost:${OAUTH_PORT}/oauth2/callback" \ $OAUTH_PROXY_IMAGE