Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ helm/*/secrets-*.yaml
# Playwright MCP
.playwright-mcp/
.planning/
app.db
23 changes: 5 additions & 18 deletions backend/app/api/routes/v1/chat_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
build_app_settings_data,
build_generic_tweaks,
build_user_settings_data,
get_required_services_for_flow,
MissingTokenError,
)
from app.services.langflow import LangflowError, get_langflow_client

Expand Down Expand Up @@ -215,22 +213,11 @@ async def stream_message(
session.commit()
session.refresh(user_message)

# Get required OAuth services for this flow
required_services = get_required_services_for_flow(flow_name) if flow_name else []

# Build user data with OAuth tokens
try:
user_data = await build_user_settings_data(
session=session,
user_id=current_user.id,
services=required_services,
)
except MissingTokenError as e:
raise HTTPException(
status_code=400,
detail=f"Missing integration: {e.service_name}. "
"Please connect the service in Settings.",
)
# Build user data with all available OAuth tokens
user_data = await build_user_settings_data(
session=session,
user_id=current_user.id,
)

# Build app data (feature flags, config)
app_data = build_app_settings_data()
Expand Down
73 changes: 16 additions & 57 deletions backend/app/services/flow_token_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
Flow token injection service for Langflow integration.

Injects user OAuth tokens into Langflow flow tweaks via the generic
UserSettings component. This decouples the backend from flow internals.
UserSettings component. The platform is flow-agnostic: it always injects
all available tokens and lets flows take what they need.

Architecture:
- API keys (OPENAI_API_KEY, etc.) -> langflow.env global variables
Expand All @@ -13,66 +14,52 @@
import logging
from sqlmodel import Session

from app.crud.integration import get_user_integrations
from app.services.token_refresh import get_valid_token

logger = logging.getLogger(__name__)


class MissingTokenError(Exception):
"""Error raised when a required service token is missing."""

def __init__(self, service_name: str, message: str | None = None):
self.service_name = service_name
default_msg = f"Missing or expired token for service: {service_name}"
super().__init__(message or default_msg)


async def build_user_settings_data(
*,
session: Session,
user_id: int,
services: list[str] | None = None,
) -> dict:
"""
Build user settings dict with OAuth tokens for specified services.
Build user settings dict with all available OAuth tokens.

Discovers the user's connected integrations and injects valid tokens
for each. Flows opt in by reading the tokens they need from
UserSettings.data — the platform does not need to know which flows
use which services.

Args:
session: Database session
user_id: User ID
services: List of service names to include tokens for.
If None, includes all available tokens.

Returns:
Dict suitable for UserSettings.data injection

Raises:
MissingTokenError: If a required service token is not available

Example:
user_data = await build_user_settings_data(
session=session,
user_id=user.id,
services=["google_drive"],
session=session, user_id=user.id,
)
# Result: {"google_drive_token": "...", "user_id": 123}
# Result: {"google_drive_token": "...", "dataverse_token": "...", "user_id": 123}
"""
user_data: dict = {"user_id": user_id}

# Default services to check if none specified
if services is None:
services = ["google_drive", "dataverse"]
integrations = get_user_integrations(session=session, user_id=user_id)

for service_name in services:
for integration in integrations:
token = await get_valid_token(
session=session,
user_id=user_id,
service_name=service_name,
service_name=integration.service_name,
)

if token is not None:
# Use consistent key naming: {service_name}_token
user_data[f"{service_name}_token"] = token
logger.debug("Added token for %s to user settings", service_name)
user_data[f"{integration.service_name}_token"] = token
logger.debug("Added token for %s to user settings", integration.service_name)

return user_data

Expand Down Expand Up @@ -126,31 +113,3 @@ def build_generic_tweaks(
tweaks["App Settings"] = {"settings_data": app_data}

return tweaks


def get_required_services_for_flow(flow_name: str) -> list[str]:
"""
Get list of OAuth services required by a flow.

This is the minimal configuration - just which services need tokens.
The injection target is always UserSettings.data.

Args:
flow_name: Name of the Langflow flow

Returns:
List of service names (e.g., ["google_drive", "dataverse"])
"""
# Map flow names to required OAuth services
# The injection target is always UserSettings.data
flow_services: dict[str, list[str]] = {
# Enterprise agent needs Google Drive
"enterprise-agent": ["google_drive"],
# Test flows for validation
"test-google-drive": ["google_drive"],
"test-dataverse": ["dataverse"],
# Flows that need both
"multi-source-rag": ["google_drive", "dataverse"],
}

return flow_services.get(flow_name, [])
5 changes: 4 additions & 1 deletion backend/app/services/langflow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

# HTTP timeout constants (seconds)
LIST_FLOWS_TIMEOUT = 30.0
CHAT_TIMEOUT = 120.0
# Granular chat timeout: short connect/write/pool, long read for streaming.
# LangFlow flows can run 5-30+ minutes for deep research tasks.
# TODO: rearchitect for long-running tasks (async job queue + polling)
CHAT_TIMEOUT = httpx.Timeout(connect=30.0, read=1800.0, write=30.0, pool=30.0)

# SSE constants
SSE_DATA_PREFIX = "data: "
Expand Down
11 changes: 10 additions & 1 deletion backend/app/services/token_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import logging
from datetime import datetime, timedelta, timezone
from cryptography.fernet import InvalidToken
from sqlmodel import Session

from app.crud.integration import (
Expand Down Expand Up @@ -139,7 +140,15 @@ async def get_valid_token(
)

# Return the decrypted access token
tokens = get_decrypted_tokens(integration)
try:
tokens = get_decrypted_tokens(integration)
except InvalidToken:
logger.error(
"Failed to decrypt token for %s user %s (encryption key mismatch)",
service_name,
user_id,
)
return None
return tokens["access_token"]


Expand Down
97 changes: 35 additions & 62 deletions backend/tests/services/test_flow_token_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
- build_user_settings_data: OAuth tokens via UserSettings component
- build_app_settings_data: App context via AppSettings component
- build_generic_tweaks: Tweak dict assembly for Langflow
- get_required_services_for_flow: Flow-to-services mapping
"""

import pytest
Expand All @@ -17,17 +16,15 @@
build_app_settings_data,
build_generic_tweaks,
build_user_settings_data,
get_required_services_for_flow,
MissingTokenError,
)


class TestBuildUserSettingsData:
"""Tests for building user settings with OAuth tokens."""

@pytest.mark.asyncio
async def test_with_valid_token_single_service(self, session: Session):
"""Builds user data with token for a single service."""
async def test_with_single_integration(self, session: Session):
"""Injects token for a single connected service."""
from app.crud.integration import create_or_update_integration

user = User(email="test@example.com", username="testuser")
Expand All @@ -47,15 +44,14 @@ async def test_with_valid_token_single_service(self, session: Session):
user_data = await build_user_settings_data(
session=session,
user_id=user.id,
services=["google_drive"],
)

assert user_data["user_id"] == user.id
assert user_data["google_drive_token"] == "google-access-token"

@pytest.mark.asyncio
async def test_with_multiple_services(self, session: Session):
"""Builds user data with tokens for multiple services."""
async def test_with_multiple_integrations(self, session: Session):
"""Injects tokens for all connected services."""
from app.crud.integration import create_or_update_integration

user = User(email="test@example.com", username="testuser")
Expand All @@ -81,75 +77,80 @@ async def test_with_multiple_services(self, session: Session):
user_data = await build_user_settings_data(
session=session,
user_id=user.id,
services=["google_drive", "dataverse"],
)

assert user_data["user_id"] == user.id
assert user_data["google_drive_token"] == "google-token"
assert user_data["dataverse_token"] == "dataverse-token"

@pytest.mark.asyncio
async def test_with_no_services_defaults_to_all(self, session: Session):
"""When services is None, defaults to checking all known services."""
async def test_with_no_integrations(self, session: Session):
"""Returns only user_id when no services are connected."""
user = User(email="test@example.com", username="testuser")
session.add(user)
session.commit()
session.refresh(user)

# No integrations created — get_valid_token returns None for each
user_data = await build_user_settings_data(
session=session,
user_id=user.id,
services=None,
)

# Should still have user_id, but no tokens (none connected)
assert user_data["user_id"] == user.id
assert "google_drive_token" not in user_data
assert "dataverse_token" not in user_data

@pytest.mark.asyncio
async def test_with_empty_services_list(self, session: Session):
"""Empty list means no services needed — returns only user_id."""
user = User(email="test@example.com", username="testuser")
session.add(user)
session.commit()
session.refresh(user)

user_data = await build_user_settings_data(
session=session,
user_id=user.id,
services=[],
)

assert user_data == {"user_id": user.id}
async def test_expired_token_omitted(self, session: Session):
"""Expired tokens are omitted (get_valid_token returns None)."""
from app.crud.integration import create_or_update_integration

@pytest.mark.asyncio
async def test_missing_integration_skips_gracefully(self, session: Session):
"""When a service has no integration, its token is omitted (not error)."""
user = User(email="test@example.com", username="testuser")
session.add(user)
session.commit()
session.refresh(user)

# No google_drive integration created
user_data = await build_user_settings_data(
# Create an expired integration (expires_in=0 makes it already expired)
create_or_update_integration(
session=session,
user_id=user.id,
services=["google_drive"],
service_name="google_drive",
access_token="expired-token",
expires_in=0,
)

# Mock get_valid_token to return None (simulating expired + refresh failure)
with patch(
"app.services.flow_token_injection.get_valid_token",
new_callable=AsyncMock,
return_value=None,
):
user_data = await build_user_settings_data(
session=session,
user_id=user.id,
)

assert user_data["user_id"] == user.id
assert "google_drive_token" not in user_data

@pytest.mark.asyncio
async def test_with_refreshed_token(self, session: Session):
"""Uses refreshed token value when get_valid_token refreshes."""
from app.crud.integration import create_or_update_integration

user = User(email="test@example.com", username="testuser")
session.add(user)
session.commit()
session.refresh(user)

create_or_update_integration(
session=session,
user_id=user.id,
service_name="google_drive",
access_token="old-token",
expires_in=3600,
)

with patch(
"app.services.flow_token_injection.get_valid_token",
new_callable=AsyncMock,
Expand All @@ -158,7 +159,6 @@ async def test_with_refreshed_token(self, session: Session):
user_data = await build_user_settings_data(
session=session,
user_id=user.id,
services=["google_drive"],
)

assert user_data["google_drive_token"] == "refreshed-token"
Expand Down Expand Up @@ -224,30 +224,3 @@ def test_with_empty_dicts(self):
"""Empty dicts are falsy — no tweaks generated."""
tweaks = build_generic_tweaks(user_data={}, app_data={})
assert tweaks == {}


class TestGetRequiredServicesForFlow:
"""Tests for flow-to-services mapping."""

def test_known_flow_returns_services(self):
"""Known flow returns its required OAuth services."""
services = get_required_services_for_flow("enterprise-agent")
assert services == ["google_drive"]

def test_unknown_flow_returns_empty(self):
"""Unknown flow returns empty list (no OAuth needed)."""
services = get_required_services_for_flow("unknown-flow-name")
assert services == []

def test_return_type_is_list(self):
"""Return type is always list[str]."""
services = get_required_services_for_flow("enterprise-agent")
assert isinstance(services, list)
for s in services:
assert isinstance(s, str)

def test_multi_service_flow(self):
"""Flow requiring multiple services returns all of them."""
services = get_required_services_for_flow("multi-source-rag")
assert "google_drive" in services
assert "dataverse" in services
Loading
Loading