diff --git a/backend/app/alembic/versions/a1b2c3d4e5f6_add_job_table.py b/backend/app/alembic/versions/a1b2c3d4e5f6_add_job_table.py
new file mode 100644
index 0000000..b8b3b41
--- /dev/null
+++ b/backend/app/alembic/versions/a1b2c3d4e5f6_add_job_table.py
@@ -0,0 +1,45 @@
+"""add job table
+
+Revision ID: a1b2c3d4e5f6
+Revises: 146945cf3865
+Create Date: 2026-02-27 00:10:00.000000
+
+"""
+from alembic import op
+import sqlalchemy as sa
+import sqlmodel.sql.sqltypes
+
+
+# revision identifiers, used by Alembic.
+revision = 'a1b2c3d4e5f6'
+down_revision = '146945cf3865'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.create_table('job',
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('chat_message_id', sa.Integer(), nullable=False),
+ sa.Column('langflow_job_id', sqlmodel.sql.sqltypes.AutoString(length=255), nullable=True),
+ sa.Column('flow_id', sqlmodel.sql.sqltypes.AutoString(length=255), nullable=True),
+ sa.Column('status', sqlmodel.sql.sqltypes.AutoString(length=50), nullable=False),
+ sa.Column('error_message', sa.Text(), nullable=True),
+ sa.Column('result_content', sa.Text(), nullable=True),
+ sa.Column('started_at', sa.DateTime(), nullable=True),
+ sa.Column('completed_at', sa.DateTime(), nullable=True),
+ sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
+ sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
+ sa.ForeignKeyConstraint(['chat_message_id'], ['chat_message.id'], ondelete='CASCADE'),
+ sa.PrimaryKeyConstraint('id')
+ )
+ op.create_index(op.f('ix_job_chat_message_id'), 'job', ['chat_message_id'], unique=False)
+ op.create_index(op.f('ix_job_langflow_job_id'), 'job', ['langflow_job_id'], unique=False)
+ op.create_index(op.f('ix_job_status'), 'job', ['status'], unique=False)
+
+
+def downgrade():
+ op.drop_index(op.f('ix_job_status'), table_name='job')
+ op.drop_index(op.f('ix_job_langflow_job_id'), table_name='job')
+ op.drop_index(op.f('ix_job_chat_message_id'), table_name='job')
+ op.drop_table('job')
diff --git a/backend/app/api/routes/v1/chat_messages.py b/backend/app/api/routes/v1/chat_messages.py
index 63c57a0..98e1b15 100644
--- a/backend/app/api/routes/v1/chat_messages.py
+++ b/backend/app/api/routes/v1/chat_messages.py
@@ -28,6 +28,9 @@
ChatMessageCreate,
ChatMessagePublic,
ChatMessagesPublic,
+ Job,
+ JobPublic,
+ JobStatus,
Message,
)
from app.core.config import settings
@@ -131,6 +134,116 @@ def create_message(
return message
+class JobMessageRequest(BaseModel):
+ """Request body for job-based message endpoint."""
+ content: str
+ flow_id: str | None = None
+ flow_name: str | None = None
+
+
+@router.post("/job", response_model=JobPublic)
+async def create_job_message(
+ *,
+ session: SessionDep,
+ current_user: CurrentUser,
+ chat_id: int,
+ request: JobMessageRequest,
+) -> Any:
+ """
+ Send a message and create a background job for AI processing.
+
+ This endpoint:
+ 1. Saves the user message to the database
+ 2. Creates a placeholder assistant message (filled on job completion)
+ 3. Creates a Job record (status=pending)
+ 4. Submits to LangFlow V2 with background=true
+ 5. Updates Job with langflow_job_id and status=in_progress
+ 6. Returns JobPublic response (includes job id for frontend polling)
+ """
+ # Verify chat exists and user has access
+ chat = get_chat_with_permission(session, current_user, chat_id)
+ flow_name = request.flow_name or settings.LANGFLOW_DEFAULT_FLOW
+
+ # Save user message
+ user_message = ChatMessage(
+ chat_id=chat_id,
+ content=request.content,
+ role="user",
+ )
+ session.add(user_message)
+
+ # Create placeholder assistant message (content filled on job completion)
+ assistant_message = ChatMessage(
+ chat_id=chat_id,
+ content="",
+ role="assistant",
+ )
+ session.add(assistant_message)
+
+ # Update chat metadata
+ chat.updated_at = datetime.now(timezone.utc)
+ if not chat.flow_name and flow_name:
+ chat.flow_name = flow_name
+ session.add(chat)
+ session.commit()
+ session.refresh(user_message)
+ session.refresh(assistant_message)
+
+ # Resolve flow ID
+ client = get_langflow_client()
+ resolved_flow_id = await client.resolve_flow_id(
+ flow_id=request.flow_id, flow_name=flow_name
+ )
+ if not resolved_flow_id:
+ raise HTTPException(status_code=400, detail="No flow configured or found")
+
+ # Create job record
+ job = Job(
+ chat_message_id=assistant_message.id,
+ flow_id=resolved_flow_id,
+ status=JobStatus.PENDING.value,
+ )
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+
+ # Build tweaks and submit to LangFlow V2
+ user_data = await build_user_settings_data(
+ session=session, user_id=current_user.id
+ )
+ app_data = build_app_settings_data()
+ tweaks = build_generic_tweaks(user_data=user_data, app_data=app_data)
+
+ try:
+ v2_inputs = client.build_v2_inputs(
+ message=request.content,
+ session_id=str(chat_id),
+ tweaks=tweaks,
+ )
+ lf_response = await client.submit_workflow(
+ flow_id=resolved_flow_id,
+ inputs=v2_inputs,
+ session_id=str(chat_id),
+ )
+ now = datetime.now(timezone.utc)
+ job.langflow_job_id = lf_response.get("job_id")
+ job.status = JobStatus.IN_PROGRESS.value
+ job.started_at = now
+ job.updated_at = now
+ except Exception as e:
+ logger.error(f"Failed to submit workflow: {e}")
+ now = datetime.now(timezone.utc)
+ job.status = JobStatus.FAILED.value
+ job.error_message = str(e)
+ job.completed_at = now
+ job.updated_at = now
+
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+ return job
+
+
@router.delete("/{message_id}")
def delete_message(
session: SessionDep,
diff --git a/backend/app/api/routes/v1/chats.py b/backend/app/api/routes/v1/chats.py
index 9ef91fc..8f82cb1 100644
--- a/backend/app/api/routes/v1/chats.py
+++ b/backend/app/api/routes/v1/chats.py
@@ -7,6 +7,7 @@
- Create new chat
- Update chat
- Delete chat
+- Get active job for a chat (page refresh recovery)
"""
from datetime import datetime, timezone
@@ -19,10 +20,15 @@
from app.models import (
Chat,
ChatCreate,
+ ChatMessage,
ChatPublic,
ChatsPublic,
ChatUpdate,
+ Job,
+ JobPublic,
+ JobStatus,
Message,
+ TERMINAL_STATUSES,
)
router = APIRouter(prefix="/chats", tags=["chats"])
@@ -119,6 +125,37 @@ def update_chat(
return chat
+@router.get("/{id}/active-job", response_model=JobPublic)
+def get_active_job(
+ session: SessionDep, current_user: CurrentUser, id: int
+) -> Any:
+ """
+ Get the active (non-terminal) job for a chat, if any.
+
+ Used for page refresh recovery -- checks if there's an in-flight job
+ that the frontend should resume polling for.
+
+ Returns 404 if no active job exists.
+ """
+ chat = get_chat_with_permission(session, current_user, id)
+
+ # Find the latest non-terminal job for any message in this chat
+ terminal_values = {s.value for s in TERMINAL_STATUSES}
+ statement = (
+ select(Job)
+ .join(ChatMessage, Job.chat_message_id == ChatMessage.id)
+ .where(ChatMessage.chat_id == chat.id)
+ .where(Job.status.notin_(terminal_values))
+ .order_by(Job.id.desc())
+ .limit(1)
+ )
+ job = session.exec(statement).first()
+ if not job:
+ raise HTTPException(status_code=404, detail="No active job found")
+
+ return job
+
+
@router.delete("/{id}")
def delete_chat(session: SessionDep, current_user: CurrentUser, id: int) -> Message:
"""
diff --git a/backend/app/api/routes/v1/jobs.py b/backend/app/api/routes/v1/jobs.py
new file mode 100644
index 0000000..181d51a
--- /dev/null
+++ b/backend/app/api/routes/v1/jobs.py
@@ -0,0 +1,231 @@
+"""
+Job API endpoints for tracking long-running LangFlow executions.
+
+This module provides:
+- GET /jobs/{id} — Get job status with optional ?sync=true to poll LangFlow
+- POST /jobs/{id}/cancel — Cancel a running job
+"""
+
+import json
+import logging
+from datetime import datetime, timezone
+
+from fastapi import APIRouter, HTTPException
+
+from app.api.deps import CurrentUser, SessionDep
+from app.models import Chat, ChatMessage, Job, JobPublic, JobStatus, TERMINAL_STATUSES
+from app.services.langflow import get_langflow_client
+from app.services.langflow.client import LangflowError
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/jobs", tags=["jobs"])
+
+
+def verify_job_ownership(
+ job: Job, current_user: CurrentUser, session: SessionDep
+) -> None:
+ """
+ Verify the current user owns the chat associated with this job.
+
+ Traverses: job -> chat_message -> chat -> user_id
+ Admins can access any job.
+
+ Raises:
+ HTTPException: 403 if user does not own the job's chat
+ HTTPException: 404 if chat_message or chat not found
+ """
+ chat_message = session.get(ChatMessage, job.chat_message_id)
+ if not chat_message:
+ raise HTTPException(status_code=404, detail="Job's chat message not found")
+
+ chat = session.get(Chat, chat_message.chat_id)
+ if not chat:
+ raise HTTPException(status_code=404, detail="Job's chat not found")
+
+ if chat.user_id != current_user.id and not current_user.admin:
+ raise HTTPException(status_code=403, detail="Not enough permissions")
+
+
+def extract_result_text(outputs: dict) -> str:
+ """
+ Extract the chat output text from LangFlow V2 response outputs.
+
+ V2 outputs structure: {"Component Name": {"type": "message", "content": "...", ...}}
+ Looks for the first component with type "message" and returns its content.
+
+ Args:
+ outputs: The 'outputs' dict from V2 workflow status response
+
+ Returns:
+ The extracted text, or empty string if not found
+ """
+ try:
+ for component_name, component_output in outputs.items():
+ if isinstance(component_output, dict):
+ content = component_output.get("content")
+ if content is not None:
+ return str(content)
+ except (AttributeError, TypeError):
+ logger.warning("Failed to extract result text from V2 outputs")
+ return ""
+
+
+async def sync_job_status_with_langflow(
+ job: Job, session: SessionDep
+) -> None:
+ """
+ Sync job status with LangFlow V2 API. Skip for terminal states.
+
+ Called when GET /jobs/{id}?sync=true. Polls LangFlow for the latest
+ status and updates our database record.
+
+ Args:
+ job: The Job record to sync
+ session: Database session
+ """
+ if not job.langflow_job_id:
+ return
+ if job.status in {s.value for s in TERMINAL_STATUSES}:
+ return
+
+ try:
+ client = get_langflow_client()
+ lf_response = await client.get_workflow_status(job.langflow_job_id)
+ lf_status = lf_response.get("status", "").lower()
+
+ STATUS_MAP = {
+ "queued": JobStatus.PENDING,
+ "pending": JobStatus.PENDING,
+ "in_progress": JobStatus.IN_PROGRESS,
+ "running": JobStatus.IN_PROGRESS,
+ "completed": JobStatus.COMPLETED,
+ "success": JobStatus.COMPLETED,
+ "failed": JobStatus.FAILED,
+ "error": JobStatus.FAILED,
+ "cancelled": JobStatus.CANCELLED,
+ "canceled": JobStatus.CANCELLED,
+ "timed_out": JobStatus.TIMED_OUT,
+ }
+
+ new_status = STATUS_MAP.get(lf_status)
+ if new_status and new_status.value != job.status:
+ job.status = new_status.value
+ now = datetime.now(timezone.utc)
+
+ if new_status == JobStatus.IN_PROGRESS and not job.started_at:
+ job.started_at = now
+
+ if new_status == JobStatus.COMPLETED:
+ result_text = extract_result_text(
+ lf_response.get("outputs", {})
+ )
+ job.result_content = result_text
+ job.completed_at = now
+ # Write result back to the chat message so it appears in message history
+ if result_text and job.chat_message_id:
+ chat_message = session.get(ChatMessage, job.chat_message_id)
+ if chat_message:
+ chat_message.content = result_text
+ session.add(chat_message)
+ elif new_status in (JobStatus.FAILED, JobStatus.TIMED_OUT):
+ errors = lf_response.get("errors", [])
+ if errors and isinstance(errors[0], dict):
+ job.error_message = errors[0].get("error", "Unknown error")
+ else:
+ job.error_message = lf_response.get("error", "Unknown error")
+ job.completed_at = now
+
+ job.updated_at = now
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+
+ except LangflowError as e:
+ # LangFlow V2 returns HTTP 500 for failed jobs with JOB_FAILED code.
+ # Parse the error to update job status instead of leaving it stuck.
+ if e.status_code == 500 and "JOB_FAILED" in e.message:
+ logger.info(f"Job {job.id} failed in LangFlow: {e.message}")
+ # Extract readable error from the JSON response
+ error_msg = "Job failed in LangFlow"
+ try:
+ detail = json.loads(
+ e.message.replace("Failed to get workflow status: ", "", 1)
+ )
+ error_msg = detail.get("detail", {}).get("message", error_msg)
+ except (json.JSONDecodeError, AttributeError):
+ pass
+ now = datetime.now(timezone.utc)
+ job.status = JobStatus.FAILED.value
+ job.error_message = error_msg
+ job.completed_at = now
+ job.updated_at = now
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+ else:
+ logger.warning(f"LangFlow error syncing job {job.id}: {e}")
+ except Exception as e:
+ logger.warning(f"Error syncing job {job.id} with LangFlow: {e}")
+
+
+@router.get("/{job_id}", response_model=JobPublic)
+async def get_job(
+ job_id: int,
+ sync: bool = False,
+ session: SessionDep = None,
+ current_user: CurrentUser = None,
+) -> Job:
+ """
+ Get job status.
+
+ If sync=true, poll LangFlow V2 API first and update our record
+ before returning. This is the primary mechanism for frontend polling.
+ """
+ job = session.get(Job, job_id)
+ if not job:
+ raise HTTPException(status_code=404, detail="Job not found")
+
+ verify_job_ownership(job, current_user, session)
+
+ if sync:
+ await sync_job_status_with_langflow(job, session)
+
+ return job
+
+
+@router.post("/{job_id}/cancel", response_model=JobPublic)
+async def cancel_job(
+ job_id: int,
+ session: SessionDep = None,
+ current_user: CurrentUser = None,
+) -> Job:
+ """
+ Cancel a running job by calling LangFlow V2 stop endpoint.
+
+ Returns 400 if the job is already in a terminal state.
+ """
+ job = session.get(Job, job_id)
+ if not job:
+ raise HTTPException(status_code=404, detail="Job not found")
+
+ verify_job_ownership(job, current_user, session)
+
+ if job.status in {s.value for s in TERMINAL_STATUSES}:
+ raise HTTPException(status_code=400, detail="Job already in terminal state")
+
+ client = get_langflow_client()
+ if job.langflow_job_id:
+ try:
+ await client.stop_workflow(job.langflow_job_id)
+ except Exception as e:
+ logger.warning(f"Failed to stop LangFlow workflow for job {job.id}: {e}")
+
+ now = datetime.now(timezone.utc)
+ job.status = JobStatus.CANCELLED.value
+ job.completed_at = now
+ job.updated_at = now
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+ return job
diff --git a/backend/app/api/routes/v1/router.py b/backend/app/api/routes/v1/router.py
index 733a57e..c101ada 100644
--- a/backend/app/api/routes/v1/router.py
+++ b/backend/app/api/routes/v1/router.py
@@ -6,6 +6,7 @@
from .chat_messages import router as chat_messages_router
from .flows import router as flows_router
from .integrations import router as integrations_router
+from .jobs import router as jobs_router
router = APIRouter()
router.include_router(utils_router, prefix="/utils")
@@ -15,3 +16,4 @@
router.include_router(chat_messages_router)
router.include_router(flows_router)
router.include_router(integrations_router)
+router.include_router(jobs_router)
diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py
index f02c216..5d5a086 100644
--- a/backend/app/models/__init__.py
+++ b/backend/app/models/__init__.py
@@ -69,6 +69,14 @@
ChatMessagesPublic,
)
+# Job models
+from app.models.job import (
+ Job,
+ JobPublic,
+ JobStatus,
+ TERMINAL_STATUSES,
+)
+
# Integration models
from app.models.user_integration import (
UserIntegration,
@@ -114,6 +122,11 @@
"ChatMessageCreate",
"ChatMessagePublic",
"ChatMessagesPublic",
+ # Job
+ "Job",
+ "JobPublic",
+ "JobStatus",
+ "TERMINAL_STATUSES",
# Integration
"UserIntegration",
"UserIntegrationPublic",
diff --git a/backend/app/models/chat_message.py b/backend/app/models/chat_message.py
index e01277a..7302b69 100644
--- a/backend/app/models/chat_message.py
+++ b/backend/app/models/chat_message.py
@@ -16,6 +16,7 @@
if TYPE_CHECKING:
from app.models.chat import Chat
+ from app.models.job import Job
# Valid roles for chat messages - type hint only (SQLModel can't use Literal for db columns)
@@ -24,8 +25,12 @@
class ChatMessageBase(SQLModel):
- """Shared properties for ChatMessage."""
- content: str = Field(min_length=1, max_length=10000)
+ """Shared properties for ChatMessage.
+
+ Note: content allows empty strings to support placeholder assistant
+ messages created by the job-based flow (filled on job completion).
+ """
+ content: str = Field(min_length=0, max_length=10000)
role: str = Field(max_length=20)
@field_validator("role")
@@ -38,8 +43,12 @@ def validate_role(cls, v: str) -> str:
class ChatMessageCreate(ChatMessageBase):
- """Properties to receive on message creation."""
- pass
+ """Properties to receive on message creation.
+
+ Requires non-empty content (unlike the base which allows empty
+ for placeholder assistant messages).
+ """
+ content: str = Field(min_length=1, max_length=10000)
class ChatMessage(ChatMessageBase, table=True):
@@ -53,8 +62,9 @@ class ChatMessage(ChatMessageBase, table=True):
sa_type=DateTime(timezone=True),
)
- # Relationship
+ # Relationships
chat: Optional["Chat"] = Relationship(back_populates="messages")
+ job: Optional["Job"] = Relationship(back_populates="chat_message")
class ChatMessagePublic(ChatMessageBase):
diff --git a/backend/app/models/job.py b/backend/app/models/job.py
new file mode 100644
index 0000000..9c97cb3
--- /dev/null
+++ b/backend/app/models/job.py
@@ -0,0 +1,86 @@
+"""
+Job model and related schemas.
+
+This module contains:
+- JobStatus enum for job lifecycle states
+- TERMINAL_STATUSES set for completed states
+- Job database model (table=True)
+- JobPublic: Output schema for API responses
+"""
+
+from datetime import datetime, timezone
+from enum import Enum
+from typing import TYPE_CHECKING, Optional
+
+from sqlalchemy import Column, DateTime, Text
+from sqlmodel import Field, Relationship, SQLModel
+
+if TYPE_CHECKING:
+ from app.models.chat_message import ChatMessage
+
+
+class JobStatus(str, Enum):
+ """Job lifecycle states."""
+ PENDING = "pending"
+ IN_PROGRESS = "in_progress"
+ COMPLETED = "completed"
+ FAILED = "failed"
+ CANCELLED = "cancelled"
+ TIMED_OUT = "timed_out"
+
+
+TERMINAL_STATUSES = {
+ JobStatus.COMPLETED,
+ JobStatus.FAILED,
+ JobStatus.CANCELLED,
+ JobStatus.TIMED_OUT,
+}
+
+
+class Job(SQLModel, table=True):
+ """Job database model for tracking long-running LangFlow executions."""
+ __tablename__ = "job"
+
+ id: int | None = Field(default=None, primary_key=True)
+ chat_message_id: int = Field(
+ foreign_key="chat_message.id", nullable=False, index=True, ondelete="CASCADE"
+ )
+ langflow_job_id: str | None = Field(
+ default=None, index=True, max_length=255
+ )
+ flow_id: str | None = Field(default=None, max_length=255)
+ status: str = Field(default=JobStatus.PENDING, max_length=50, index=True)
+ error_message: str | None = Field(
+ default=None, sa_column=Column("error_message", Text, nullable=True)
+ )
+ result_content: str | None = Field(
+ default=None, sa_column=Column("result_content", Text, nullable=True)
+ )
+ started_at: datetime | None = Field(default=None)
+ completed_at: datetime | None = Field(default=None)
+ created_at: datetime = Field(
+ default_factory=lambda: datetime.now(timezone.utc),
+ sa_type=DateTime(timezone=True),
+ )
+ updated_at: datetime = Field(
+ default_factory=lambda: datetime.now(timezone.utc),
+ sa_type=DateTime(timezone=True),
+ )
+
+ # Relationship back to ChatMessage
+ chat_message: Optional["ChatMessage"] = Relationship(back_populates="job")
+
+
+class JobPublic(SQLModel):
+ """Properties to return via API."""
+ id: int
+ chat_message_id: int
+ langflow_job_id: str | None
+ flow_id: str | None
+ status: str
+ error_message: str | None
+ result_content: str | None
+ started_at: datetime | None
+ completed_at: datetime | None
+ created_at: datetime
+ updated_at: datetime
diff --git a/backend/app/services/langflow/client.py b/backend/app/services/langflow/client.py
index a314250..b7baeeb 100644
--- a/backend/app/services/langflow/client.py
+++ b/backend/app/services/langflow/client.py
@@ -373,6 +373,155 @@ async def chat(
logger.error(f"Langflow connection error: {e}")
raise LangflowError(f"Failed to connect to Langflow: {str(e)}")
+ # --- V2 Workflow API Methods ---
+
+ def build_v2_inputs(
+ self,
+ message: str,
+ session_id: str | None = None,
+ tweaks: dict | None = None,
+ ) -> dict:
+ """
+ Build V2-compatible flat inputs from message and tweaks.
+
+ V2 uses flat format: "display_name.param_name": value
+ parse_flat_inputs() converts this to tweaks dict, then process_tweaks()
+ applies them to the graph (supports both component IDs and display names).
+
+ Args:
+ message: The user message
+ session_id: Optional session ID for conversation continuity
+ tweaks: Optional tweaks dict (display-name keyed, e.g.
+ {"User Settings": {"settings_data": {...}}})
+
+ Returns:
+ Dict with flat inputs for V2 WorkflowExecutionRequest.inputs
+ """
+ inputs: dict = {
+ "Chat Input.input_value": message,
+ }
+ if session_id:
+ inputs["Chat Input.session_id"] = session_id
+ if tweaks:
+ for component_name, params in tweaks.items():
+ if isinstance(params, dict):
+ for param_name, value in params.items():
+ inputs[f"{component_name}.{param_name}"] = value
+ return inputs
+
+ async def submit_workflow(
+ self,
+ flow_id: str,
+ inputs: dict,
+ session_id: str | None = None,
+ ) -> dict:
+ """
+ Submit workflow for background execution via V2 API.
+
+ Args:
+ flow_id: The flow ID to execute
+ inputs: V2-compatible inputs (from build_v2_inputs)
+ session_id: Optional session ID
+
+ Returns:
+ Dict with job_id, status, and other metadata from V2 API
+
+ Raises:
+ LangflowError: If the API call fails
+ """
+ url = f"{self.base_url}/api/v2/workflows"
+ payload = {
+ "flow_id": flow_id,
+ "background": True,
+ "inputs": inputs,
+ }
+
+ async with httpx.AsyncClient(timeout=CHAT_TIMEOUT) as client:
+ try:
+ response = await client.post(
+ url, json=payload, headers=self.headers
+ )
+ response.raise_for_status()
+ return response.json()
+ except httpx.HTTPStatusError as e:
+ logger.error(
+ f"Langflow V2 submit error: {e.response.status_code} - {e.response.text}"
+ )
+ raise LangflowError(
+ f"Failed to submit workflow: {e.response.text}",
+ status_code=e.response.status_code,
+ )
+ except httpx.RequestError as e:
+ logger.error(f"Langflow V2 connection error: {e}")
+ raise LangflowError(f"Failed to connect to Langflow V2: {str(e)}")
+
+ async def get_workflow_status(self, job_id: str) -> dict:
+ """
+ Get workflow status from V2 API.
+
+ Args:
+ job_id: The LangFlow job ID to check
+
+ Returns:
+ Dict with job_id, status, outputs, and other metadata
+
+ Raises:
+ LangflowError: If the API call fails
+ """
+ url = f"{self.base_url}/api/v2/workflows?job_id={job_id}"
+
+ async with httpx.AsyncClient(timeout=LIST_FLOWS_TIMEOUT) as client:
+ try:
+ response = await client.get(url, headers=self.headers)
+ response.raise_for_status()
+ return response.json()
+ except httpx.HTTPStatusError as e:
+ logger.error(
+ f"Langflow V2 status error: {e.response.status_code} - {e.response.text}"
+ )
+ raise LangflowError(
+ f"Failed to get workflow status: {e.response.text}",
+ status_code=e.response.status_code,
+ )
+ except httpx.RequestError as e:
+ logger.error(f"Langflow V2 connection error: {e}")
+ raise LangflowError(f"Failed to connect to Langflow V2: {str(e)}")
+
+ async def stop_workflow(self, job_id: str) -> dict:
+ """
+ Stop a running workflow via V2 API.
+
+ Args:
+ job_id: The LangFlow job ID to stop
+
+ Returns:
+ Dict with job_id and message from V2 API
+
+ Raises:
+ LangflowError: If the API call fails
+ """
+ url = f"{self.base_url}/api/v2/workflows/stop"
+ payload = {"job_id": job_id}
+
+ async with httpx.AsyncClient(timeout=LIST_FLOWS_TIMEOUT) as client:
+ try:
+ response = await client.post(
+ url, json=payload, headers=self.headers
+ )
+ response.raise_for_status()
+ return response.json()
+ except httpx.HTTPStatusError as e:
+ logger.error(
+ f"Langflow V2 stop error: {e.response.status_code} - {e.response.text}"
+ )
+ raise LangflowError(
+ f"Failed to stop workflow: {e.response.text}",
+ status_code=e.response.status_code,
+ )
+ except httpx.RequestError as e:
+ logger.error(f"Langflow V2 connection error: {e}")
+ raise LangflowError(f"Failed to connect to Langflow V2: {str(e)}")
+
async def chat_stream(
self,
message: str,
diff --git a/backend/app/services/langflow/mock_client.py b/backend/app/services/langflow/mock_client.py
index 27c1b15..40ec8d2 100644
--- a/backend/app/services/langflow/mock_client.py
+++ b/backend/app/services/langflow/mock_client.py
@@ -197,6 +197,103 @@ async def chat_stream(
logger.debug(f"[MOCK] Finished streaming response: {response[:50]}...")
+ # --- V2 Workflow API Mock Methods ---
+
+ async def resolve_flow_id(
+ self,
+ flow_id: str | None = None,
+ flow_name: str | None = None,
+ ) -> str | None:
+ """
+ Mock resolve_flow_id - returns the flow_id if provided,
+ otherwise returns a mock flow ID.
+ """
+ if self.simulate_error:
+ raise LangflowError(self.error_message, status_code=500)
+
+ if flow_id:
+ return flow_id
+ return "mock-flow-1"
+
+ def build_v2_inputs(
+ self,
+ message: str,
+ session_id: str | None = None,
+ tweaks: dict | None = None,
+ ) -> dict:
+ """Mock build_v2_inputs - mirrors LangflowClient."""
+ inputs: dict = {
+ "Chat Input.input_value": message,
+ }
+ if session_id:
+ inputs["Chat Input.session_id"] = session_id
+ if tweaks:
+ for component_name, params in tweaks.items():
+ if isinstance(params, dict):
+ for param_name, value in params.items():
+ inputs[f"{component_name}.{param_name}"] = value
+ return inputs
+
+ async def submit_workflow(
+ self,
+ flow_id: str,
+ inputs: dict,
+ session_id: str | None = None,
+ ) -> dict:
+ """
+ Mock submit_workflow - returns a mock job submission response.
+ """
+ self._record_call("submit_workflow", inputs.get("Chat Input.input_value", ""), session_id)
+ logger.debug(f"[MOCK] submit_workflow called: flow_id={flow_id}")
+
+ if self.simulate_error:
+ raise LangflowError(self.error_message, status_code=500)
+
+ await asyncio.sleep(0.05)
+ return {
+ "job_id": "mock-job-123",
+ "flow_id": flow_id,
+ "status": "queued",
+ }
+
+ async def get_workflow_status(self, job_id: str) -> dict:
+ """
+ Mock get_workflow_status - returns a completed job status.
+ """
+ logger.debug(f"[MOCK] get_workflow_status called: job_id={job_id}")
+
+ if self.simulate_error:
+ raise LangflowError(self.error_message, status_code=500)
+
+ response = self._get_next_response()
+ return {
+ "job_id": job_id,
+ "status": "completed",
+ "outputs": {
+ "Chat Output": {
+ "type": "message",
+ "component_id": "ChatOutput-mock",
+ "status": "completed",
+ "content": response,
+ "metadata": {"component_type": "ChatOutput"},
+ }
+ },
+ }
+
+ async def stop_workflow(self, job_id: str) -> dict:
+ """
+ Mock stop_workflow - returns a cancellation confirmation.
+ """
+ logger.debug(f"[MOCK] stop_workflow called: job_id={job_id}")
+
+ if self.simulate_error:
+ raise LangflowError(self.error_message, status_code=500)
+
+ return {
+ "job_id": job_id,
+ "message": "Job cancelled successfully.",
+ }
+
# Test helper methods
def get_call_history(self) -> list[dict]:
diff --git a/backend/app/services/protocols.py b/backend/app/services/protocols.py
index 31060aa..2700d0d 100644
--- a/backend/app/services/protocols.py
+++ b/backend/app/services/protocols.py
@@ -91,3 +91,58 @@ async def list_flows(self) -> list["Flow"]:
LangflowError: If the API call fails
"""
...
+
+ def build_v2_inputs(
+ self,
+ message: str,
+ session_id: str | None = None,
+ tweaks: dict | None = None,
+ ) -> dict:
+ """
+ Build V2-compatible inputs from message and tweaks.
+
+ Returns:
+ Dict with input_value, session_id, and tweaks for V2 payload
+ """
+ ...
+
+ async def submit_workflow(
+ self,
+ flow_id: str,
+ inputs: dict,
+ session_id: str | None = None,
+ ) -> dict:
+ """
+ Submit workflow for background execution via V2 API.
+
+ Returns:
+ Dict with job_id, status, and other metadata
+
+ Raises:
+ LangflowError: If the API call fails
+ """
+ ...
+
+ async def get_workflow_status(self, job_id: str) -> dict:
+ """
+ Get workflow status from V2 API.
+
+ Returns:
+ Dict with job_id, status, outputs, and other metadata
+
+ Raises:
+ LangflowError: If the API call fails
+ """
+ ...
+
+ async def stop_workflow(self, job_id: str) -> dict:
+ """
+ Stop a running workflow via V2 API.
+
+ Returns:
+ Dict with job_id and message
+
+ Raises:
+ LangflowError: If the API call fails
+ """
+ ...
diff --git a/backend/tests/api/routes/v1/test_chat_messages.py b/backend/tests/api/routes/v1/test_chat_messages.py
new file mode 100644
index 0000000..579e5cc
--- /dev/null
+++ b/backend/tests/api/routes/v1/test_chat_messages.py
@@ -0,0 +1,177 @@
+"""Tests for the job-based chat message endpoint."""
+
+import pytest
+from fastapi.testclient import TestClient
+from sqlmodel import Session
+
+from app.models import Chat, ChatMessage, Job, JobStatus, User
+
+
+@pytest.fixture
+def dev_user(session: Session) -> User:
+ """Create the dev-user that matches the local development user."""
+ user = User(
+ email="dev-user@example.com",
+ username="dev-user",
+ full_name="Development User",
+ )
+ session.add(user)
+ session.commit()
+ session.refresh(user)
+ return user
+
+
+@pytest.fixture
+def test_chat(session: Session, dev_user: User) -> Chat:
+ """Create a test chat owned by the dev user."""
+ chat = Chat(
+ title="Test Chat",
+ user_id=dev_user.id,
+ )
+ session.add(chat)
+ session.commit()
+ session.refresh(chat)
+ return chat
+
+
+class TestCreateJobMessage:
+ """Tests for POST /v1/chats/{id}/messages/job endpoint."""
+
+ def test_create_job_message(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """Test creating a job-based message returns JobPublic."""
+ response = client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "Hello, AI!"},
+ )
+ assert response.status_code == 200
+ data = response.json()
+
+ # Should return JobPublic
+ assert "id" in data
+ assert "status" in data
+ assert "chat_message_id" in data
+ assert data["status"] == "in_progress"
+ assert data["langflow_job_id"] == "mock-job-123"
+ assert data["flow_id"] == "mock-flow-1"
+
+ def test_create_job_message_saves_user_message(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """Test that user message is saved to database."""
+ client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "Test user message"},
+ )
+
+ # Check user message was saved
+ user_messages = (
+ session.query(ChatMessage)
+ .filter(
+ ChatMessage.chat_id == test_chat.id,
+ ChatMessage.role == "user",
+ )
+ .all()
+ )
+ assert len(user_messages) == 1
+ assert user_messages[0].content == "Test user message"
+
+ def test_create_job_message_creates_placeholder(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """Test that a placeholder assistant message is created."""
+ response = client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "Hello!"},
+ )
+ data = response.json()
+
+ # Check placeholder assistant message
+ assistant_msg = session.get(ChatMessage, data["chat_message_id"])
+ assert assistant_msg is not None
+ assert assistant_msg.role == "assistant"
+ assert assistant_msg.content == "" # placeholder
+
+ def test_create_job_message_creates_job_record(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """Test that a Job record is created and linked to the assistant message."""
+ response = client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "Hello!"},
+ )
+ data = response.json()
+ job_id = data["id"]
+
+ # Verify job exists in database
+ job = session.get(Job, job_id)
+ assert job is not None
+ assert job.status == JobStatus.IN_PROGRESS.value
+ assert job.langflow_job_id == "mock-job-123"
+
+ def test_create_job_message_sets_flow_name(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """Test that flow_name is locked on the chat."""
+ assert test_chat.flow_name is None
+
+ client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "Hello!", "flow_name": "test-flow"},
+ )
+
+ session.expire_all()
+ updated_chat = session.get(Chat, test_chat.id)
+ # The mock client resolve_flow_id returns "mock-flow-1" regardless,
+ # but the chat's flow_name should be set from the request
+ assert updated_chat.flow_name == "test-flow"
+
+ def test_create_job_message_chat_not_found(
+ self, client: TestClient, dev_user: User
+ ):
+ """Test creating a job message in non-existent chat returns 404."""
+ response = client.post(
+ "/api/v1/chats/99999/messages/job",
+ json={"content": "Hello!"},
+ )
+ assert response.status_code == 404
+
+ def test_create_job_message_submit_error_results_in_failed_job(
+ self, client: TestClient, session: Session, test_chat: Chat, monkeypatch
+ ):
+ """Test that LangFlow submission errors result in FAILED job status."""
+ from app.services.langflow.mock_client import MockLangflowClient
+ from app.api.routes.v1 import chat_messages
+
+ # Create a custom mock that only errors on submit_workflow (not resolve_flow_id)
+ class SubmitErrorClient(MockLangflowClient):
+ async def submit_workflow(self, flow_id, inputs, session_id=None):
+ from app.services.langflow.client import LangflowError
+ raise LangflowError("V2 submission failed", status_code=500)
+
+ error_client = SubmitErrorClient()
+ monkeypatch.setattr(
+ chat_messages, "get_langflow_client", lambda: error_client
+ )
+
+ response = client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "This will fail on submit"},
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["status"] == "failed"
+ assert "V2 submission failed" in data["error_message"]
+
+ def test_create_job_preserves_existing_stream(
+ self, client: TestClient, test_chat: Chat
+ ):
+ """Test that the existing /stream endpoint still works."""
+ response = client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/stream",
+ json={"content": "Hello via stream!"},
+ )
+ assert response.status_code == 200
+ assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
diff --git a/backend/tests/api/routes/v1/test_jobs.py b/backend/tests/api/routes/v1/test_jobs.py
new file mode 100644
index 0000000..6062c13
--- /dev/null
+++ b/backend/tests/api/routes/v1/test_jobs.py
@@ -0,0 +1,207 @@
+"""Tests for the Job API endpoints."""
+
+import pytest
+from fastapi.testclient import TestClient
+from sqlmodel import Session
+
+from app.models import Chat, ChatMessage, Job, JobStatus, User
+
+
+@pytest.fixture
+def dev_user(session: Session) -> User:
+ """Create the dev-user that matches the local development user."""
+ user = User(
+ email="dev-user@example.com",
+ username="dev-user",
+ full_name="Development User",
+ )
+ session.add(user)
+ session.commit()
+ session.refresh(user)
+ return user
+
+
+@pytest.fixture
+def test_chat(session: Session, dev_user: User) -> Chat:
+ """Create a test chat owned by the dev user."""
+ chat = Chat(
+ title="Test Chat",
+ user_id=dev_user.id,
+ )
+ session.add(chat)
+ session.commit()
+ session.refresh(chat)
+ return chat
+
+
+@pytest.fixture
+def test_message(session: Session, test_chat: Chat) -> ChatMessage:
+ """Create a test assistant message linked to the chat."""
+ message = ChatMessage(
+ chat_id=test_chat.id,
+ content="",
+ role="assistant",
+ )
+ session.add(message)
+ session.commit()
+ session.refresh(message)
+ return message
+
+
+@pytest.fixture
+def test_job(session: Session, test_message: ChatMessage) -> Job:
+ """Create a test job in pending state."""
+ job = Job(
+ chat_message_id=test_message.id,
+ flow_id="mock-flow-1",
+ status=JobStatus.PENDING.value,
+ langflow_job_id="mock-lf-job-123",
+ )
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+ return job
+
+
+@pytest.fixture
+def completed_job(session: Session, test_message: ChatMessage) -> Job:
+ """Create a test job in completed state."""
+ job = Job(
+ chat_message_id=test_message.id,
+ flow_id="mock-flow-1",
+ status=JobStatus.COMPLETED.value,
+ langflow_job_id="mock-lf-job-456",
+ result_content="Test result",
+ )
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+ return job
+
+
+class TestGetJob:
+ """Tests for GET /v1/jobs/{id} endpoint."""
+
+ def test_get_job(self, client: TestClient, test_job: Job):
+ """Test getting a job by ID."""
+ response = client.get(f"/api/v1/jobs/{test_job.id}")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["id"] == test_job.id
+ assert data["status"] == "pending"
+ assert data["langflow_job_id"] == "mock-lf-job-123"
+ assert data["flow_id"] == "mock-flow-1"
+
+ def test_get_job_not_found(self, client: TestClient, dev_user: User):
+ """Test getting a non-existent job returns 404."""
+ response = client.get("/api/v1/jobs/99999")
+ assert response.status_code == 404
+ assert "not found" in response.json()["detail"].lower()
+
+ def test_get_job_with_sync(self, client: TestClient, test_job: Job):
+ """Test getting a job with sync=true triggers LangFlow poll."""
+ response = client.get(f"/api/v1/jobs/{test_job.id}?sync=true")
+ assert response.status_code == 200
+ data = response.json()
+ # Mock client returns "completed" status, so after sync the job
+ # should be updated to completed
+ assert data["status"] == "completed"
+ assert data["result_content"] is not None
+
+ def test_get_job_sync_skips_terminal(
+ self, client: TestClient, completed_job: Job
+ ):
+ """Test that sync=true skips polling for terminal state jobs."""
+ response = client.get(f"/api/v1/jobs/{completed_job.id}?sync=true")
+ assert response.status_code == 200
+ data = response.json()
+ # Should still be completed (sync skipped)
+ assert data["status"] == "completed"
+
+ def test_get_job_without_sync(self, client: TestClient, test_job: Job):
+ """Test that without sync=true, the job status is returned as-is."""
+ response = client.get(f"/api/v1/jobs/{test_job.id}")
+ assert response.status_code == 200
+ data = response.json()
+ # Should remain pending since we didn't sync
+ assert data["status"] == "pending"
+
+
+class TestCancelJob:
+ """Tests for POST /v1/jobs/{id}/cancel endpoint."""
+
+ def test_cancel_pending_job(self, client: TestClient, test_job: Job):
+ """Test cancelling a pending job."""
+ response = client.post(f"/api/v1/jobs/{test_job.id}/cancel")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["status"] == "cancelled"
+ assert data["completed_at"] is not None
+
+ def test_cancel_job_not_found(self, client: TestClient, dev_user: User):
+ """Test cancelling a non-existent job returns 404."""
+ response = client.post("/api/v1/jobs/99999/cancel")
+ assert response.status_code == 404
+
+ def test_cancel_completed_job_returns_400(
+ self, client: TestClient, completed_job: Job
+ ):
+ """Test that cancelling a completed job returns 400."""
+ response = client.post(f"/api/v1/jobs/{completed_job.id}/cancel")
+ assert response.status_code == 400
+ assert "terminal" in response.json()["detail"].lower()
+
+ def test_cancel_job_without_langflow_id(
+ self, client: TestClient, session: Session, test_message: ChatMessage
+ ):
+ """Test cancelling a job that has no langflow_job_id (still pending)."""
+ job = Job(
+ chat_message_id=test_message.id,
+ flow_id="mock-flow-1",
+ status=JobStatus.PENDING.value,
+ langflow_job_id=None,
+ )
+ session.add(job)
+ session.commit()
+ session.refresh(job)
+
+ response = client.post(f"/api/v1/jobs/{job.id}/cancel")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["status"] == "cancelled"
+
+
+class TestJobOwnership:
+ """Tests for job ownership verification."""
+
+ def test_get_job_forbidden_for_other_user(
+ self, session: Session, test_job: Job
+ ):
+ """Test that a user cannot access another user's job."""
+ from app.api.deps import get_current_user, get_db
+ from app.main import app
+
+ # Create another user
+ other_user = User(
+ email="other@example.com",
+ username="other-user",
+ full_name="Other User",
+ )
+ session.add(other_user)
+ session.commit()
+ session.refresh(other_user)
+
+ def get_other_user():
+ return other_user
+
+ def get_session_override():
+ return session
+
+ app.dependency_overrides[get_current_user] = get_other_user
+ app.dependency_overrides[get_db] = get_session_override
+
+ other_client = TestClient(app)
+ response = other_client.get(f"/api/v1/jobs/{test_job.id}")
+ assert response.status_code == 403
+
+ app.dependency_overrides.clear()
diff --git a/backend/tests/api/test_job_lifecycle.py b/backend/tests/api/test_job_lifecycle.py
new file mode 100644
index 0000000..a69283a
--- /dev/null
+++ b/backend/tests/api/test_job_lifecycle.py
@@ -0,0 +1,266 @@
+"""
+End-to-end job lifecycle integration tests.
+
+Tests the complete flow: message send -> job creation -> LangFlow background
+execution -> result appears in chat. Uses MockLangflowClient.
+"""
+
+import pytest
+from fastapi.testclient import TestClient
+from sqlmodel import Session
+
+from app.models import Chat, User
+
+
+@pytest.fixture
+def dev_user(session: Session) -> User:
+ """Create the dev-user that matches the local development user."""
+ user = User(
+ email="dev-user@example.com",
+ username="dev-user",
+ full_name="Development User",
+ )
+ session.add(user)
+ session.commit()
+ session.refresh(user)
+ return user
+
+
+@pytest.fixture
+def test_chat(session: Session, dev_user: User) -> Chat:
+ """Create a test chat owned by the dev user."""
+ chat = Chat(
+ title="Lifecycle Test Chat",
+ user_id=dev_user.id,
+ )
+ session.add(chat)
+ session.commit()
+ session.refresh(chat)
+ return chat
+
+
+def create_test_chat(client: TestClient) -> dict:
+ """Helper to create a chat via the API."""
+ response = client.post(
+ "/api/v1/chats/",
+ json={"title": "E2E Test Chat"},
+ )
+ assert response.status_code == 200
+ return response.json()
+
+
+class TestJobLifecycleHappyPath:
+ """E2E: user sends message, job is created, LangFlow executes, result in chat."""
+
+ def test_full_lifecycle(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """
+ Complete happy path: message -> job created -> poll shows completed
+ -> result in chat messages.
+ """
+ chat_id = test_chat.id
+
+ # 1. Send message via job endpoint
+ response = client.post(
+ f"/api/v1/chats/{chat_id}/messages/job",
+ json={"content": "What is enterprise architecture?"},
+ )
+ assert response.status_code == 200
+ job = response.json()
+ assert job["status"] in ("pending", "in_progress")
+ job_id = job["id"]
+ assert job["flow_id"] is not None
+
+ # 2. Poll for job status with sync (mock client returns completed)
+ response = client.get(f"/api/v1/jobs/{job_id}?sync=true")
+ assert response.status_code == 200
+ job = response.json()
+ assert job["status"] == "completed"
+ assert job["result_content"] is not None
+ assert len(job["result_content"]) > 0
+
+ # 3. Verify messages were created in the chat
+ response = client.get(f"/api/v1/chats/{chat_id}/messages/")
+ assert response.status_code == 200
+ messages = response.json()
+ assert messages["count"] >= 2 # user message + assistant message
+
+ # Find the user and assistant messages
+ roles = [m["role"] for m in messages["data"]]
+ assert "user" in roles
+ assert "assistant" in roles
+
+ def test_job_response_includes_required_fields(
+ self, client: TestClient, test_chat: Chat
+ ):
+ """Verify job response contains all fields needed by frontend polling."""
+ response = client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "Test query"},
+ )
+ data = response.json()
+ assert "id" in data
+ assert "status" in data
+ assert "chat_message_id" in data
+ assert "langflow_job_id" in data
+ assert "flow_id" in data
+ assert "created_at" in data
+
+
+class TestJobCancellation:
+ """E2E: user cancels an in-progress job, cancellation reflected in chat."""
+
+ def test_cancel_in_progress_job(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """Create a job, cancel it, verify status is cancelled."""
+ chat_id = test_chat.id
+
+ # Create job
+ response = client.post(
+ f"/api/v1/chats/{chat_id}/messages/job",
+ json={"content": "Long running query"},
+ )
+ assert response.status_code == 200
+ job_id = response.json()["id"]
+
+ # Cancel the job
+ response = client.post(f"/api/v1/jobs/{job_id}/cancel")
+ assert response.status_code == 200
+ cancelled = response.json()
+ assert cancelled["status"] == "cancelled"
+ assert cancelled["completed_at"] is not None
+
+ def test_cancel_already_terminal_returns_400(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """Cancelling a completed job returns 400 (already terminal)."""
+ chat_id = test_chat.id
+
+ # Create job
+ response = client.post(
+ f"/api/v1/chats/{chat_id}/messages/job",
+ json={"content": "Quick query"},
+ )
+ job_id = response.json()["id"]
+
+ # Sync to complete the job (mock returns completed)
+ client.get(f"/api/v1/jobs/{job_id}?sync=true")
+
+ # Try to cancel the completed job
+ response = client.post(f"/api/v1/jobs/{job_id}/cancel")
+ assert response.status_code == 400
+ assert "terminal" in response.json()["detail"].lower()
+
+
+class TestPageRefreshRecovery:
+ """E2E: page refresh during active job resumes polling and shows result."""
+
+ def test_active_job_endpoint_returns_in_flight_job(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """
+ After creating a job, GET /chats/{id}/active-job returns the
+ in-flight job for page refresh recovery.
+ """
+ chat_id = test_chat.id
+
+ # Create job (mock returns in_progress status since submit succeeds)
+ response = client.post(
+ f"/api/v1/chats/{chat_id}/messages/job",
+ json={"content": "In-flight query"},
+ )
+ assert response.status_code == 200
+ job_id = response.json()["id"]
+
+ # Check for active job (simulates page refresh)
+ response = client.get(f"/api/v1/chats/{chat_id}/active-job")
+ assert response.status_code == 200
+ active_job = response.json()
+ assert active_job["id"] == job_id
+ assert active_job["status"] in ("pending", "in_progress")
+
+ def test_no_active_job_after_completion(
+ self, client: TestClient, session: Session, test_chat: Chat
+ ):
+ """After a job completes, active-job returns 404."""
+ chat_id = test_chat.id
+
+ # Create and complete a job
+ response = client.post(
+ f"/api/v1/chats/{chat_id}/messages/job",
+ json={"content": "Quick query"},
+ )
+ job_id = response.json()["id"]
+
+ # Sync to complete
+ client.get(f"/api/v1/jobs/{job_id}?sync=true")
+
+ # No active job should exist
+ response = client.get(f"/api/v1/chats/{chat_id}/active-job")
+ assert response.status_code == 404
+
+
+class TestJobErrorHandling:
+ """E2E: failed job shows error with expandable details and retry."""
+
+ def test_job_not_found(self, client: TestClient, dev_user: User):
+ """GET /jobs/99999 returns 404."""
+ response = client.get("/api/v1/jobs/99999")
+ assert response.status_code == 404
+
+ def test_submit_error_creates_failed_job(
+ self, client: TestClient, session: Session, test_chat: Chat, monkeypatch
+ ):
+ """
+ When LangFlow submission fails, the job is created with status=failed
+ and error_message is populated.
+ """
+ from app.services.langflow.mock_client import MockLangflowClient
+ from app.services.langflow.client import LangflowError
+ from app.api.routes.v1 import chat_messages
+
+ class SubmitErrorClient(MockLangflowClient):
+ async def submit_workflow(self, flow_id, inputs, session_id=None):
+ raise LangflowError("V2 workflow submission failed", status_code=500)
+
+ monkeypatch.setattr(
+ chat_messages, "get_langflow_client", lambda: SubmitErrorClient()
+ )
+
+ response = client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "This will fail"},
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert data["status"] == "failed"
+ assert "V2 workflow submission failed" in data["error_message"]
+
+ def test_failed_job_preserves_messages(
+ self, client: TestClient, session: Session, test_chat: Chat, monkeypatch
+ ):
+ """Even when a job fails, user and assistant messages are preserved."""
+ from app.services.langflow.mock_client import MockLangflowClient
+ from app.services.langflow.client import LangflowError
+ from app.api.routes.v1 import chat_messages
+
+ class SubmitErrorClient(MockLangflowClient):
+ async def submit_workflow(self, flow_id, inputs, session_id=None):
+ raise LangflowError("Submit failed", status_code=500)
+
+ monkeypatch.setattr(
+ chat_messages, "get_langflow_client", lambda: SubmitErrorClient()
+ )
+
+ client.post(
+ f"/api/v1/chats/{test_chat.id}/messages/job",
+ json={"content": "Error query"},
+ )
+
+ # Messages should still exist
+ response = client.get(f"/api/v1/chats/{test_chat.id}/messages/")
+ assert response.status_code == 200
+ messages = response.json()
+ assert messages["count"] >= 2 # user + placeholder assistant
diff --git a/backend/uv.lock b/backend/uv.lock
index da989bf..1b95ebe 100644
--- a/backend/uv.lock
+++ b/backend/uv.lock
@@ -380,18 +380,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" },
]
-[[package]]
-name = "jinja2"
-version = "3.1.6"
-source = { registry = "https://pypi.org/simple" }
-dependencies = [
- { name = "markupsafe" },
-]
-sdist = { url = "https://files.pythonhosted.org/packages/df/bf/f7da0350254c0ed7c72f3e33cef02e048281fec7ecec5f032d4aac52226b/jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d", size = 245115, upload-time = "2025-03-05T20:05:02.478Z" }
-wheels = [
- { url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" },
-]
-
[[package]]
name = "lia-web"
version = "0.2.3"
@@ -503,7 +491,6 @@ dependencies = [
{ name = "psycopg", extra = ["binary"] },
{ name = "pydantic-settings" },
{ name = "python-dotenv" },
- { name = "sqladmin" },
{ name = "sqlmodel" },
{ name = "sse-starlette" },
{ name = "strawberry-graphql", extra = ["fastapi"] },
@@ -538,7 +525,6 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'dev'", specifier = "==8.4.1" },
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = "==1.0.0" },
{ name = "python-dotenv", specifier = "==1.1.1" },
- { name = "sqladmin", specifier = ">=0.22.0" },
{ name = "sqlmodel", specifier = ">=0.0.21,<1.0.0" },
{ name = "sse-starlette", specifier = ">=2.2.1" },
{ name = "strawberry-graphql", extras = ["fastapi"], specifier = ">=0.245.0" },
@@ -904,22 +890,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" },
]
-[[package]]
-name = "sqladmin"
-version = "0.22.0"
-source = { registry = "https://pypi.org/simple" }
-dependencies = [
- { name = "jinja2" },
- { name = "python-multipart" },
- { name = "sqlalchemy" },
- { name = "starlette" },
- { name = "wtforms" },
-]
-sdist = { url = "https://files.pythonhosted.org/packages/2c/ac/526bb3ff2dd94fbf8442bccb49ef40aa360045add19d4fbffcb43995e67a/sqladmin-0.22.0.tar.gz", hash = "sha256:4ea904d97e4d030edb68fb0681330b4d963f422442a64bee487fdc46119b3729", size = 1429937, upload-time = "2025-11-24T12:52:59.285Z" }
-wheels = [
- { url = "https://files.pythonhosted.org/packages/f2/b4/ab78c7d7b13bd3f90d6d8a106c5ad12bf7a738f89eb0241b24ad8efe5d1e/sqladmin-0.22.0-py3-none-any.whl", hash = "sha256:f2fb11165a70601a97f71956104b47da2c432db49b0d7966dc65e9e6343887d3", size = 1445514, upload-time = "2025-11-24T12:53:00.511Z" },
-]
-
[[package]]
name = "sqlalchemy"
version = "2.0.44"
@@ -1233,15 +1203,3 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1b/6c/c65773d6cab416a64d191d6ee8a8b1c68a09970ea6909d16965d26bfed1e/websockets-15.0.1-cp313-cp313-win_amd64.whl", hash = "sha256:e09473f095a819042ecb2ab9465aee615bd9c2028e4ef7d933600a8401c79561", size = 176837, upload-time = "2025-03-05T20:02:55.237Z" },
{ url = "https://files.pythonhosted.org/packages/fa/a8/5b41e0da817d64113292ab1f8247140aac61cbf6cfd085d6a0fa77f4984f/websockets-15.0.1-py3-none-any.whl", hash = "sha256:f7a866fbc1e97b5c617ee4116daaa09b722101d4a3c170c787450ba409f9736f", size = 169743, upload-time = "2025-03-05T20:03:39.41Z" },
]
-
-[[package]]
-name = "wtforms"
-version = "3.1.2"
-source = { registry = "https://pypi.org/simple" }
-dependencies = [
- { name = "markupsafe" },
-]
-sdist = { url = "https://files.pythonhosted.org/packages/6a/c7/96d10183c3470f1836846f7b9527d6cb0b6c2226ebca40f36fa29f23de60/wtforms-3.1.2.tar.gz", hash = "sha256:f8d76180d7239c94c6322f7990ae1216dae3659b7aa1cee94b6318bdffb474b9", size = 134705, upload-time = "2024-01-06T07:52:41.075Z" }
-wheels = [
- { url = "https://files.pythonhosted.org/packages/18/19/c3232f35e24dccfad372e9f341c4f3a1166ae7c66e4e1351a9467c921cc1/wtforms-3.1.2-py3-none-any.whl", hash = "sha256:bf831c042829c8cdbad74c27575098d541d039b1faa74c771545ecac916f2c07", size = 145961, upload-time = "2024-01-06T07:52:43.023Z" },
-]
diff --git a/config/local/.env.example b/config/local/.env.example
index eae274d..2e61489 100644
--- a/config/local/.env.example
+++ b/config/local/.env.example
@@ -42,6 +42,15 @@ GOOGLE_CLIENT_SECRET=
LANGFLOW_URL=http://localhost:7860
LANGFLOW_DEFAULT_FLOW=enterprise-agent-multiuser
+# LangFlow API Authentication
+# Shared API key between backend and LangFlow container.
+# LangFlow validates via LANGFLOW_API_KEY_SOURCE=env (set on the container).
+LANGFLOW_API_KEY=dev-langflow-api-key
+LANGFLOW_API_KEY_SOURCE=env
+
+# LangFlow V2 Workflow API (for background job execution)
+LANGFLOW_DEVELOPER_API_ENABLED=true
+
# ========================================
# Database Credentials (optional - defaults work for local dev)
# ========================================
diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md
index a21db48..c57559f 100644
--- a/docs/ARCHITECTURE.md
+++ b/docs/ARCHITECTURE.md
@@ -242,7 +242,7 @@ created_at TIMESTAMP
|-----------|-------|------|-----------|
| frontend | app-frontend | 8080 | 256Mi / 200m |
| backend | app-backend | 8000 | 512Mi / 500m |
-| langflow | langflowai/langflow:1.7.1 | 7860 | 2Gi / 1000m |
+| langflow | langflowai/langflow:1.8.0.rc0 | 7860 | 2Gi / 1000m |
| langfuse | langfuse/langfuse:latest | 3000 | 1Gi / 500m |
| mlflow | ghcr.io/mlflow/mlflow:v2.16.0 | 5000 | 1Gi / 500m |
| postgres | postgres:15-alpine | 5432 | 512Mi / 500m |
diff --git a/frontend/src/app/Chat/Chat.css b/frontend/src/app/Chat/Chat.css
index 97ca192..12c111d 100644
--- a/frontend/src/app/Chat/Chat.css
+++ b/frontend/src/app/Chat/Chat.css
@@ -4,3 +4,30 @@
right: 20px;
z-index: 1000;
}
+
+/* Cancel button positioning in the loading message */
+.pf-chatbot__job-cancel-area {
+ padding-top: var(--pf-t--global--spacer--xs, 4px);
+}
+
+/* Tooltip wrapper for job status -- allows hover target on loading message */
+.pf-chatbot__job-tooltip-wrapper {
+ display: contents;
+}
+
+/* Error details formatting */
+.pf-chatbot__error-details {
+ padding-top: var(--pf-t--global--spacer--sm, 8px);
+}
+
+.pf-chatbot__error-pre {
+ font-size: var(--pf-t--global--font--size--xs, 0.75rem);
+ white-space: pre-wrap;
+ word-break: break-word;
+ max-height: 200px;
+ overflow-y: auto;
+ padding: var(--pf-t--global--spacer--sm, 8px);
+ background-color: var(--pf-t--global--background--color--secondary--default, #f0f0f0);
+ border-radius: var(--pf-t--global--border--radius--small, 3px);
+ margin: 0;
+}
diff --git a/frontend/src/app/Chat/Chat.test.tsx b/frontend/src/app/Chat/Chat.test.tsx
index b531135..14e823c 100644
--- a/frontend/src/app/Chat/Chat.test.tsx
+++ b/frontend/src/app/Chat/Chat.test.tsx
@@ -5,6 +5,7 @@ import { Chat } from './Chat';
import { ChatAPI } from './chatApi';
import { BrowserRouter } from 'react-router-dom';
import { AppProvider } from '@app/contexts/AppContext';
+import { QueryClient, QueryClientProvider } from '@tanstack/react-query';
// Mock the ChatAPI
vi.mock('./chatApi', () => ({
@@ -16,10 +17,20 @@ vi.mock('./chatApi', () => ({
updateChat: vi.fn(),
deleteChat: vi.fn(),
getMessages: vi.fn(),
+ createJobMessage: vi.fn(),
+ getJob: vi.fn(),
+ cancelJob: vi.fn(),
+ getActiveJob: vi.fn(),
+ // Legacy streaming kept for backward compatibility
createStreamingMessage: vi.fn(),
},
}));
+// Mock the useJobPolling hook
+vi.mock('./useJobPolling', () => ({
+ useJobPolling: vi.fn().mockReturnValue({ data: null }),
+}));
+
// Mock the image imports
vi.mock('@app/images/user-avatar.svg', () => ({ default: 'user-avatar.svg' }));
vi.mock('@app/images/ai-logo-transparent.svg', () => ({ default: 'ai-logo.svg' }));
@@ -43,17 +54,42 @@ const MOCK_MESSAGES = [
{ id: 2, chat_id: 1, content: 'Hi there!', role: 'assistant', created_at: '2024-01-01T10:00:01' },
];
+const MOCK_JOB_RESPONSE = {
+ id: 1,
+ chat_message_id: 3,
+ langflow_job_id: 'lf-job-123',
+ flow_id: 'flow-1',
+ status: 'in_progress' as const,
+ error_message: null,
+ result_content: null,
+ started_at: '2024-01-01T10:00:00',
+ completed_at: null,
+ created_at: '2024-01-01T10:00:00',
+ updated_at: '2024-01-01T10:00:00',
+};
+
// =============================================================================
// Test Utilities
// =============================================================================
+function createQueryClient() {
+ return new QueryClient({
+ defaultOptions: {
+ queries: { retry: false },
+ },
+ });
+}
+
function renderChat() {
+ const queryClient = createQueryClient();
return render(
-
-
-
-
-
+
+
+
+
+
+
+
);
}
@@ -61,6 +97,7 @@ function setupDefaultMocks(): void {
vi.mocked(ChatAPI.getChats).mockResolvedValue({ data: MOCK_CHATS, count: 2 });
vi.mocked(ChatAPI.getFlows).mockResolvedValue({ data: MOCK_FLOWS, count: 2 });
vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: MOCK_MESSAGES, count: 2 });
+ vi.mocked(ChatAPI.getActiveJob).mockResolvedValue(null);
}
describe('Chat component', () => {
@@ -135,20 +172,8 @@ describe('Chat component', () => {
});
});
- test('should call createStreamingMessage when sending a message', async () => {
- const mockClose = vi.fn();
-
- vi.mocked(ChatAPI.createStreamingMessage).mockImplementation(
- (_chatId, _content, onMessage, _onError, onComplete) => {
- // Simulate immediate completion
- setTimeout(() => {
- onMessage({ type: 'content', content: 'Response' });
- onMessage({ type: 'done' });
- onComplete?.();
- }, 10);
- return { close: mockClose };
- }
- );
+ test('should call createJobMessage when sending a message', async () => {
+ vi.mocked(ChatAPI.createJobMessage).mockResolvedValue(MOCK_JOB_RESPONSE);
renderChat();
@@ -156,15 +181,14 @@ describe('Chat component', () => {
expect(ChatAPI.getChats).toHaveBeenCalled();
});
- // Verify the streaming API is properly mocked
- expect(ChatAPI.createStreamingMessage).toBeDefined();
+ // Verify the job API is properly mocked
+ expect(ChatAPI.createJobMessage).toBeDefined();
});
- test('should provide close function for stopping streams', async () => {
- const mockClose = vi.fn();
-
- vi.mocked(ChatAPI.createStreamingMessage).mockImplementation(() => {
- return { close: mockClose };
+ test('should provide cancelJob function for stopping jobs', async () => {
+ vi.mocked(ChatAPI.cancelJob).mockResolvedValue({
+ ...MOCK_JOB_RESPONSE,
+ status: 'cancelled',
});
renderChat();
@@ -173,11 +197,9 @@ describe('Chat component', () => {
expect(ChatAPI.getChats).toHaveBeenCalled();
});
- // Verify the streaming API returns a close function
- const result = ChatAPI.createStreamingMessage(1, 'test', () => {}, () => {}, () => {});
- expect(result.close).toBeDefined();
- result.close();
- expect(mockClose).toHaveBeenCalled();
+ // Verify the cancel API returns cancelled status
+ const result = await ChatAPI.cancelJob(1);
+ expect(result.status).toBe('cancelled');
});
test('should call deleteChat API when delete is triggered', async () => {
@@ -207,16 +229,9 @@ describe('Chat component', () => {
expect(screen.getByText('Research Assistant')).toBeVisible();
});
- test('should display actual error message from SSE error event', async () => {
- vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: [], count: 0 });
-
- vi.mocked(ChatAPI.createStreamingMessage).mockImplementation(
- (_chatId, _content, onMessage) => {
- setTimeout(() => {
- onMessage({ type: 'error', error: 'Failed to connect to Langflow: Connection refused' });
- }, 10);
- return { close: vi.fn() };
- }
+ test('should handle job creation failure', async () => {
+ vi.mocked(ChatAPI.createJobMessage).mockRejectedValue(
+ new Error('Failed to connect to Langflow: Connection refused')
);
renderChat();
@@ -225,30 +240,18 @@ describe('Chat component', () => {
expect(ChatAPI.getChats).toHaveBeenCalled();
});
- // Simulate sending by calling the streaming mock directly and checking the onMessage callback
- const onMessage = vi.fn();
- ChatAPI.createStreamingMessage(1, 'test', onMessage);
-
- await waitFor(() => {
- expect(onMessage).toHaveBeenCalledWith({
- type: 'error',
- error: 'Failed to connect to Langflow: Connection refused',
- });
- });
+ // Verify the job API handles errors
+ await expect(ChatAPI.createJobMessage(1, 'test')).rejects.toThrow(
+ 'Failed to connect to Langflow: Connection refused'
+ );
});
- test('should pass partial content and error separately when error occurs mid-stream', async () => {
- vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: [], count: 0 });
-
- vi.mocked(ChatAPI.createStreamingMessage).mockImplementation(
- (_chatId, _content, onMessage) => {
- setTimeout(() => {
- onMessage({ type: 'content', content: 'Here is the beginning' });
- onMessage({ type: 'error', error: 'Langflow streaming error: 500' });
- }, 10);
- return { close: vi.fn() };
- }
- );
+ test('should handle immediate job failure status', async () => {
+ vi.mocked(ChatAPI.createJobMessage).mockResolvedValue({
+ ...MOCK_JOB_RESPONSE,
+ status: 'failed',
+ error_message: 'No flow configured or found',
+ });
renderChat();
@@ -256,45 +259,32 @@ describe('Chat component', () => {
expect(ChatAPI.getChats).toHaveBeenCalled();
});
- // Verify the streaming API delivers both content and error events
- const events: Array<{ type: string; content?: string; error?: string }> = [];
- ChatAPI.createStreamingMessage(1, 'test', (event) => events.push(event));
-
- await waitFor(() => {
- expect(events).toHaveLength(2);
- expect(events[0]).toEqual({ type: 'content', content: 'Here is the beginning' });
- expect(events[1]).toEqual({ type: 'error', error: 'Langflow streaming error: 500' });
- });
+ // Verify the API returns failed status
+ const result = await ChatAPI.createJobMessage(1, 'test');
+ expect(result.status).toBe('failed');
+ expect(result.error_message).toBe('No flow configured or found');
});
- test('should pass network error message through to error handler', async () => {
- vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: [], count: 0 });
-
- vi.mocked(ChatAPI.createStreamingMessage).mockImplementation(
- (_chatId, _content, _onMessage, onError) => {
- setTimeout(() => {
- onError?.(new Error('Missing integration: jira. Please connect the service in Settings.'));
- }, 10);
- return { close: vi.fn() };
- }
- );
+ test('should check for active jobs on chat load for page refresh recovery', async () => {
+ vi.mocked(ChatAPI.getActiveJob).mockResolvedValue(null);
renderChat();
await waitFor(() => {
- expect(ChatAPI.getChats).toHaveBeenCalled();
+ expect(ChatAPI.getActiveJob).toHaveBeenCalledWith(1);
});
+ });
- // Verify the onError callback receives the actual error message
- const onError = vi.fn();
- ChatAPI.createStreamingMessage(1, 'test', vi.fn(), onError);
+ test('should recover in-flight job on page refresh', async () => {
+ vi.mocked(ChatAPI.getActiveJob).mockResolvedValue(MOCK_JOB_RESPONSE);
+
+ renderChat();
await waitFor(() => {
- expect(onError).toHaveBeenCalledWith(
- expect.objectContaining({
- message: 'Missing integration: jira. Please connect the service in Settings.',
- })
- );
+ expect(ChatAPI.getActiveJob).toHaveBeenCalledWith(1);
});
+
+ // The active job should be detected and polling should resume
+ expect(ChatAPI.getActiveJob).toHaveBeenCalled();
});
});
diff --git a/frontend/src/app/Chat/Chat.tsx b/frontend/src/app/Chat/Chat.tsx
index 3e08679..e8092bd 100644
--- a/frontend/src/app/Chat/Chat.tsx
+++ b/frontend/src/app/Chat/Chat.tsx
@@ -7,9 +7,11 @@ import {
Dropdown,
DropdownItem,
DropdownList,
+ ExpandableSection,
MenuToggle,
MenuToggleElement,
PageSection,
+ Tooltip,
} from '@patternfly/react-core';
import {
Chatbot,
@@ -31,7 +33,8 @@ import {
} from '@patternfly/chatbot';
import { ArrowDownIcon, TrashIcon } from '@patternfly/react-icons';
-import { ChatAPI, Chat as ChatType, ChatMessage, StreamingEvent, Flow } from './chatApi';
+import { ChatAPI, Chat as ChatType, ChatMessage, Flow } from './chatApi';
+import { useJobPolling } from './useJobPolling';
import userAvatar from '@app/images/user-avatar.svg';
import aiLogo from '@app/images/ai-logo-transparent.svg';
@@ -53,6 +56,12 @@ function convertMessageToProps(msg: ChatMessage): MessageProps {
};
}
+/** Format elapsed seconds as human-readable string. */
+function formatElapsed(seconds: number): string {
+ if (seconds < 60) return `${seconds}s`;
+ return `${Math.floor(seconds / 60)}m ${seconds % 60}s`;
+}
+
function Chat(): React.ReactElement {
// Chat list state
const [chats, setChats] = React.useState([]);
@@ -80,12 +89,33 @@ function Chat(): React.ReactElement {
// Operation error state (for displaying errors to user)
const [operationError, setOperationError] = React.useState(null);
+ // Job state
+ const [activeJobId, setActiveJobId] = React.useState(null);
+ const [jobStartTime, setJobStartTime] = React.useState(null);
+ const [elapsedSeconds, setElapsedSeconds] = React.useState(0);
+ const loadingBotMessageIdRef = React.useRef(null);
+ const originalMessageTextRef = React.useRef('');
+
+ // Use the polling hook
+ const { data: jobData } = useJobPolling(activeJobId);
+
const historyRef = React.useRef(null);
- const streamControllerRef = React.useRef<{ close: () => void } | null>(null);
const messageBoxRef = React.useRef(null);
const [userScrolledUp, setUserScrolledUp] = React.useState(false);
const scrollDetectionTimeoutRef = React.useRef(null);
+ // Elapsed time timer
+ React.useEffect(() => {
+ if (!jobStartTime || !activeJobId) {
+ setElapsedSeconds(0);
+ return;
+ }
+ const interval = setInterval(() => {
+ setElapsedSeconds(Math.round((Date.now() - jobStartTime.getTime()) / 1000));
+ }, 1000);
+ return () => clearInterval(interval);
+ }, [jobStartTime, activeJobId]);
+
// Scroll utility functions
const scrollToBottom = React.useCallback(() => {
if (messageBoxRef.current?.scrollToBottom) {
@@ -140,7 +170,7 @@ function Chat(): React.ReactElement {
}
};
- // Load messages and restore flow when chat changes
+ // Load messages and check for active jobs when chat changes
React.useEffect(() => {
setLastError(null);
setErrorMessages(new Map());
@@ -149,13 +179,17 @@ function Chat(): React.ReactElement {
if (chat?.flow_name) {
setSelectedFlowName(chat.flow_name);
}
- // Skip loading messages if we're currently sending — handleSend manages
+ // Skip loading messages if we're currently sending -- handleSend manages
// messages directly and loadMessages would overwrite the loading indicator.
if (!isSending) {
loadMessages(selectedChatId);
+ // Page refresh recovery: check for active jobs
+ checkForActiveJob(selectedChatId);
}
} else {
setMessages([]);
+ setActiveJobId(null);
+ setJobStartTime(null);
}
}, [selectedChatId]);
@@ -191,6 +225,131 @@ function Chat(): React.ReactElement {
}
};
+ /**
+ * Page refresh recovery: check if there's an active (non-terminal) job for this chat.
+ * If found, resume polling and show the typing indicator.
+ */
+ const checkForActiveJob = async (chatId: number) => {
+ try {
+ const activeJob = await ChatAPI.getActiveJob(chatId);
+ if (activeJob) {
+ // Resume polling for the active job
+ setActiveJobId(activeJob.id);
+ setJobStartTime(activeJob.started_at ? new Date(activeJob.started_at) : new Date());
+ setIsSending(true);
+
+ // Find the placeholder assistant message and show it as loading
+ const botMsgId = `recovered-bot-${activeJob.chat_message_id}`;
+ loadingBotMessageIdRef.current = botMsgId;
+
+ // Add a loading indicator for the recovered job
+ setMessages((prev) => {
+ // Check if the last message is the empty placeholder
+ const lastMsg = prev[prev.length - 1];
+ if (lastMsg && lastMsg.role === 'bot' && !lastMsg.content) {
+ // Replace the empty placeholder with a loading indicator
+ return prev.map((msg, i) =>
+ i === prev.length - 1
+ ? { ...msg, id: botMsgId, isLoading: true }
+ : msg
+ );
+ }
+ // Otherwise add a loading message
+ return [
+ ...prev,
+ {
+ id: botMsgId,
+ role: 'bot' as const,
+ content: '',
+ name: 'Assistant',
+ avatar: aiLogo,
+ timestamp: new Date().toLocaleString(),
+ isLoading: true,
+ },
+ ];
+ });
+ }
+ } catch {
+ // No active job -- expected for most chats
+ }
+ };
+
+ // React to job status changes
+ React.useEffect(() => {
+ if (!jobData || !activeJobId) return;
+ const botMessageId = loadingBotMessageIdRef.current;
+ if (!botMessageId) return;
+
+ if (jobData.status === 'completed' && jobData.result_content) {
+ // Update the bot message with the result
+ setMessages((prev) =>
+ prev.map((msg) =>
+ msg.id === botMessageId
+ ? { ...msg, content: jobData.result_content!, isLoading: false }
+ : msg
+ )
+ );
+ // Reload messages to get server-side IDs
+ if (selectedChatId) {
+ loadMessages(selectedChatId);
+ }
+ setIsSending(false);
+ setActiveJobId(null);
+ setJobStartTime(null);
+ loadingBotMessageIdRef.current = null;
+ setAnnouncement(`Assistant: ${jobData.result_content}`);
+
+ // Update chat title on first real message (only user+loading = length 2)
+ if (messages.length <= 2 && selectedChatId) {
+ const originalText = originalMessageTextRef.current;
+ if (originalText) {
+ const title = originalText.slice(0, 50) + (originalText.length > 50 ? '...' : '');
+ ChatAPI.updateChat(selectedChatId, { title }).then(() => loadChats());
+ const sendFlowName = isFlowLocked ? selectedChat?.flow_name : selectedFlowName;
+ if (sendFlowName) {
+ setChats((prev) =>
+ prev.map((c) =>
+ c.id === selectedChatId ? { ...c, flow_name: sendFlowName } : c
+ )
+ );
+ }
+ }
+ }
+ } else if (jobData.status === 'failed' || jobData.status === 'timed_out') {
+ const errorText = jobData.error_message || 'An unknown error occurred.';
+ setMessages((prev) =>
+ prev.map((msg) =>
+ msg.id === botMessageId
+ ? { ...msg, content: '', isLoading: false }
+ : msg
+ )
+ );
+ setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText));
+ setLastError({
+ message: originalMessageTextRef.current,
+ chatId: selectedChatId!,
+ botMessageId,
+ errorText,
+ });
+ setIsSending(false);
+ setActiveJobId(null);
+ setJobStartTime(null);
+ loadingBotMessageIdRef.current = null;
+ } else if (jobData.status === 'cancelled') {
+ setMessages((prev) =>
+ prev.map((msg) =>
+ msg.id === botMessageId
+ ? { ...msg, content: 'Request cancelled', isLoading: false }
+ : msg
+ )
+ );
+ setIsSending(false);
+ setActiveJobId(null);
+ setJobStartTime(null);
+ loadingBotMessageIdRef.current = null;
+ }
+ }, [jobData]);
+
const handleNewChat = async () => {
setOperationError(null);
try {
@@ -261,7 +420,10 @@ function Chat(): React.ReactElement {
setIsSending(true);
setLastError(null);
+ setActiveJobId(null);
+ setJobStartTime(new Date());
setUserScrolledUp(false);
+ originalMessageTextRef.current = messageText;
const timestamp = new Date().toLocaleString();
const isRetry = !!retryMessageText;
@@ -278,6 +440,7 @@ function Chat(): React.ReactElement {
// Add loading bot message
const botMessageId = `bot-${Date.now()}`;
+ loadingBotMessageIdRef.current = botMessageId;
const loadingBotMessage: MessageProps = {
id: botMessageId,
role: 'bot',
@@ -308,102 +471,72 @@ function Chat(): React.ReactElement {
setAnnouncement(`Message from You: ${messageText}. Assistant is thinking...`);
setTimeout(() => scrollToBottom(), 50);
- let accumulatedContent = '';
-
- const streamController = ChatAPI.createStreamingMessage(
- chatId,
- messageText,
- (event: StreamingEvent) => {
- if (event.type === 'content' && event.content) {
- accumulatedContent += event.content;
- // Update the bot message with accumulated content
- setMessages((prev) =>
- prev.map((msg) =>
- msg.id === botMessageId
- ? { ...msg, content: accumulatedContent, isLoading: false }
- : msg
- )
- );
- } else if (event.type === 'done') {
- streamControllerRef.current = null;
- // Reload to get the saved message IDs
- loadMessages(chatId);
- setIsSending(false);
- setAnnouncement(`Assistant: ${accumulatedContent}`);
-
- // Update chat title and lock flow on first message
- if (messages.length === 0) {
- const title = messageText.slice(0, 50) + (messageText.length > 50 ? '...' : '');
- ChatAPI.updateChat(chatId, { title }).then(() => loadChats());
- // Update local state so dropdown locks immediately
- setChats((prev) =>
- prev.map((c) =>
- c.id === chatId ? { ...c, flow_name: sendFlowName } : c
- )
- );
- }
- } else if (event.type === 'error') {
- const errorText = event.error || 'An unknown error occurred.';
- streamControllerRef.current = null;
- setMessages((prev) =>
- prev.map((msg) =>
- msg.id === botMessageId
- ? { ...msg, content: accumulatedContent || '', isLoading: false }
- : msg
- )
- );
- setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText));
- setLastError({
- message: messageText,
- chatId: chatId,
- botMessageId,
- errorText,
- });
- setIsSending(false);
- }
- },
- (err) => {
- console.error('Streaming error:', err);
- const errorText = err.message || 'An unknown error occurred.';
- streamControllerRef.current = null;
+ try {
+ const jobResponse = await ChatAPI.createJobMessage(
+ chatId, messageText, sendFlowName || undefined
+ );
+ setActiveJobId(jobResponse.id);
+ // If the job already failed immediately (e.g., flow resolution error)
+ if (jobResponse.status === 'failed') {
+ const errorText = jobResponse.error_message || 'Failed to submit request.';
setMessages((prev) =>
prev.map((msg) =>
msg.id === botMessageId
- ? { ...msg, content: accumulatedContent || '', isLoading: false }
+ ? { ...msg, content: '', isLoading: false }
: msg
)
);
setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText));
- setLastError({
- message: messageText,
- chatId: chatId,
- botMessageId,
- errorText,
- });
+ setLastError({ message: messageText, chatId, botMessageId, errorText });
setIsSending(false);
- },
- () => {
- streamControllerRef.current = null;
- setIsSending(false);
- },
- sendFlowName || undefined
- );
-
- streamControllerRef.current = streamController;
+ setActiveJobId(null);
+ setJobStartTime(null);
+ loadingBotMessageIdRef.current = null;
+ }
+ } catch (err) {
+ // Handle immediate submission failure
+ console.error('Failed to create job:', err);
+ const errorText = err instanceof Error ? err.message : 'Failed to submit request.';
+ setMessages((prev) =>
+ prev.map((msg) =>
+ msg.id === botMessageId
+ ? { ...msg, content: '', isLoading: false }
+ : msg
+ )
+ );
+ setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText));
+ setLastError({ message: messageText, chatId, botMessageId, errorText });
+ setIsSending(false);
+ setActiveJobId(null);
+ setJobStartTime(null);
+ loadingBotMessageIdRef.current = null;
+ }
};
- const handleStopStreaming = () => {
- if (streamControllerRef.current) {
- streamControllerRef.current.close();
- streamControllerRef.current = null;
+ const handleCancelJob = async () => {
+ if (activeJobId) {
+ try {
+ await ChatAPI.cancelJob(activeJobId);
+ // The useEffect watching jobData will handle the cancelled state on next poll
+ } catch (err) {
+ console.error('Failed to cancel job:', err);
+ // Force local cancellation state
+ const botMessageId = loadingBotMessageIdRef.current;
+ if (botMessageId) {
+ setMessages((prev) =>
+ prev.map((msg) =>
+ msg.id === botMessageId
+ ? { ...msg, content: 'Request cancelled', isLoading: false }
+ : msg
+ )
+ );
+ }
+ setIsSending(false);
+ setActiveJobId(null);
+ setJobStartTime(null);
+ loadingBotMessageIdRef.current = null;
+ }
}
- setIsSending(false);
- // Update any loading message to show it was stopped
- setMessages((prev) =>
- prev.map((msg) =>
- msg.isLoading ? { ...msg, content: msg.content || '(Stopped)', isLoading: false } : msg
- )
- );
};
const handleRetry = () => {
@@ -420,6 +553,14 @@ function Chat(): React.ReactElement {
const effectiveFlowName = isFlowLocked ? selectedChat.flow_name : selectedFlowName;
const isFlowAvailable = !!effectiveFlowName && flows.some((f) => f.name === effectiveFlowName);
+ // Derive job status text for tooltip
+ const jobStatusText = React.useMemo(() => {
+ if (!activeJobId || !jobData) return '';
+ const statusLabel = jobData.status === 'in_progress' ? 'Running' :
+ jobData.status === 'pending' ? 'Pending' : jobData.status;
+ return `${statusLabel} - ${formatElapsed(elapsedSeconds)}`;
+ }, [activeJobId, jobData, elapsedSeconds]);
+
// Build conversations for the drawer
const conversations: Conversation[] = chats.map((chat) => ({
id: chat.id.toString(),
@@ -535,6 +676,7 @@ function Chat(): React.ReactElement {
const hasPartialContent = hasError && !!message.content;
const canRetry = hasError && lastError && lastError.chatId === selectedChatId;
const showCopyAction = message.role === 'bot' && !message.isLoading && !hasError;
+ const isActiveJobMessage = message.isLoading && !!activeJobId;
const retryLink = canRetry ? (
@@ -542,10 +684,53 @@ function Chat(): React.ReactElement {
) : undefined;
- return (
+ // Build extra content for active job loading messages (cancel button)
+ // and for error messages with expandable details
+ let extraContent: MessageProps['extraContent'] = undefined;
+
+ if (isActiveJobMessage) {
+ // Cancel button inline in typing indicator message area
+ extraContent = {
+ afterMainContent: (
+
+
+
+ ),
+ };
+ } else if (hasError) {
+ // Error display with expandable details and retry
+ extraContent = {
+ afterMainContent: (
+
+ ),
+ };
+ }
+
+ // Wrap active job messages in a tooltip showing status + elapsed time
+ const messageElement = (
- ),
- },
- }
- : {})}
+ extraContent={extraContent}
/>
);
+
+ if (isActiveJobMessage && jobStatusText) {
+ return (
+
+
+ {messageElement}
+
+
+ );
+ }
+
+ return messageElement;
})}
@@ -597,7 +771,7 @@ function Chat(): React.ReactElement {
onSendMessage={handleSend}
isSendButtonDisabled={isSending || !isFlowAvailable}
hasStopButton={isSending}
- handleStopButton={handleStopStreaming}
+ handleStopButton={handleCancelJob}
/>
>
@@ -606,6 +780,6 @@ function Chat(): React.ReactElement {
);
-};
+}
export { Chat };
diff --git a/frontend/src/app/Chat/chatApi.ts b/frontend/src/app/Chat/chatApi.ts
index fe4e4d9..8583d79 100644
--- a/frontend/src/app/Chat/chatApi.ts
+++ b/frontend/src/app/Chat/chatApi.ts
@@ -64,6 +64,29 @@ export interface FlowsResponse {
default_flow?: string;
}
+// =============================================================================
+// Job Types
+// =============================================================================
+
+export type JobStatus = 'pending' | 'in_progress' | 'completed' | 'failed' | 'cancelled' | 'timed_out';
+
+export interface JobResponse {
+ id: number;
+ chat_message_id: number;
+ langflow_job_id: string | null;
+ flow_id: string | null;
+ status: JobStatus;
+ error_message: string | null;
+ result_content: string | null;
+ started_at: string | null;
+ completed_at: string | null;
+ created_at: string;
+ updated_at: string;
+}
+
+/** Terminal statuses -- polling stops when job reaches one of these. */
+export const TERMINAL_STATUSES: JobStatus[] = ['completed', 'failed', 'cancelled', 'timed_out'];
+
// =============================================================================
// Chat CRUD
// =============================================================================
@@ -110,7 +133,66 @@ export const ChatAPI = {
},
// ===========================================================================
- // Streaming
+ // Jobs
+ // ===========================================================================
+
+ /**
+ * Send a message and create a background job for AI processing.
+ *
+ * Returns the Job record. The frontend then polls GET /jobs/{id}?sync=true
+ * until the job reaches a terminal state.
+ */
+ async createJobMessage(chatId: number, content: string, flowName?: string): Promise {
+ const body = flowName ? { content, flow_name: flowName } : { content };
+ const response = await apiClient.post(
+ `${API_BASE}/${chatId}/messages/job`,
+ body
+ );
+ return response.data;
+ },
+
+ /**
+ * Get job status, optionally syncing with LangFlow first.
+ *
+ * When sync=true, the backend polls LangFlow V2 API before returning.
+ */
+ async getJob(jobId: number, sync: boolean = false): Promise {
+ const params = sync ? '?sync=true' : '';
+ const response = await apiClient.get(`/v1/jobs/${jobId}${params}`);
+ return response.data;
+ },
+
+ /**
+ * Cancel a running job.
+ *
+ * Calls LangFlow V2 stop endpoint and updates job status to cancelled.
+ */
+ async cancelJob(jobId: number): Promise {
+ const response = await apiClient.post(`/v1/jobs/${jobId}/cancel`);
+ return response.data;
+ },
+
+ /**
+ * Get the active (non-terminal) job for a chat, if any.
+ *
+ * Used for page refresh recovery -- checks if there's an in-flight job
+ * that the frontend should resume polling for.
+ * Returns null (via 404) if no active job exists.
+ */
+ async getActiveJob(chatId: number): Promise {
+ try {
+ const response = await apiClient.get(
+ `${API_BASE}/${chatId}/active-job`
+ );
+ return response.data;
+ } catch {
+ // 404 means no active job -- expected behavior
+ return null;
+ }
+ },
+
+ // ===========================================================================
+ // Streaming (legacy, kept for backward compatibility)
// ===========================================================================
/**
diff --git a/frontend/src/app/Chat/useJobPolling.ts b/frontend/src/app/Chat/useJobPolling.ts
new file mode 100644
index 0000000..e626140
--- /dev/null
+++ b/frontend/src/app/Chat/useJobPolling.ts
@@ -0,0 +1,31 @@
+/**
+ * TanStack Query hook for polling job status.
+ *
+ * Polls GET /jobs/{id}?sync=true every 5s while the job is active.
+ * Stops polling automatically when the job reaches a terminal state
+ * (completed, failed, cancelled, timed_out).
+ *
+ * Based on the tang-web-app useJobsByBranch pattern with refetchInterval callback.
+ */
+
+import { useQuery } from '@tanstack/react-query';
+import { ChatAPI, JobResponse, TERMINAL_STATUSES } from './chatApi';
+
+export function useJobPolling(jobId: number | null) {
+ return useQuery({
+ queryKey: ['job', jobId],
+ queryFn: async () => {
+ // Always sync with LangFlow on poll (backend skips sync for terminal states)
+ return ChatAPI.getJob(jobId!, true);
+ },
+ enabled: !!jobId,
+ refetchInterval: (query) => {
+ const job = query.state.data;
+ if (!job) return 5000;
+ const isTerminal = TERMINAL_STATUSES.includes(job.status);
+ return isTerminal ? false : 5000;
+ },
+ // Just under 5s to prevent extra refetches between polls
+ staleTime: 4000,
+ });
+}
diff --git a/helm/langflow/values-dev.yaml b/helm/langflow/values-dev.yaml
index 56dfd62..3b43a14 100644
--- a/helm/langflow/values-dev.yaml
+++ b/helm/langflow/values-dev.yaml
@@ -31,7 +31,7 @@ langflow:
# Custom image with redhat_agents baked in (built via GitLab CI)
repository: quay.io/cfchase/agents-python-langflow
imagePullPolicy: Always
- tag: "latest"
+ tag: "1.8.0.rc0"
service:
type: ClusterIP
port: 7860
@@ -87,6 +87,9 @@ langflow:
value: "INFO"
- name: LANGFLOW_UPDATE_STARTER_PROJECTS
value: "false"
+ # V2 Workflow API (required for job-based execution)
+ - name: LANGFLOW_DEVELOPER_API_ENABLED
+ value: "true"
# Custom image settings for redhat_agents
- name: LANGFLOW_LAZY_LOAD_COMPONENTS
value: "false"
diff --git a/k8s/langflow/base/deployment.yaml b/k8s/langflow/base/deployment.yaml
index d709974..4d77c97 100644
--- a/k8s/langflow/base/deployment.yaml
+++ b/k8s/langflow/base/deployment.yaml
@@ -48,7 +48,7 @@ spec:
- ALL
containers:
- name: langflow
- image: docker.io/langflowai/langflow:latest
+ image: docker.io/langflowai/langflow:1.8.0.rc0
imagePullPolicy: Always
ports:
- containerPort: 7860
@@ -84,6 +84,9 @@ spec:
value: "INFO"
- name: LANGFLOW_UPDATE_STARTER_PROJECTS
value: "false"
+ # V2 Workflow API (required for job-based execution)
+ - name: LANGFLOW_DEVELOPER_API_ENABLED
+ value: "true"
volumeMounts:
# All persistent data uses single PVC with subPath
- name: langflow-data
diff --git a/k8s/langflow/base/kustomization.yaml b/k8s/langflow/base/kustomization.yaml
index 7f30158..7f84df1 100644
--- a/k8s/langflow/base/kustomization.yaml
+++ b/k8s/langflow/base/kustomization.yaml
@@ -17,4 +17,4 @@ labels:
images:
- name: docker.io/langflowai/langflow
- newTag: latest
+ newTag: "1.8.0.rc0"
diff --git a/makefiles/services.mk b/makefiles/services.mk
index 38c3177..555e707 100644
--- a/makefiles/services.mk
+++ b/makefiles/services.mk
@@ -21,7 +21,7 @@ langflow-logs: ## Show LangFlow logs
@./scripts/dev-langflow.sh logs
langflow-import: ## Import flows from configured sources into LangFlow
- @uv run --with requests --with pyyaml python scripts/import_flows.py config/local/flow-sources.yaml
+ @set -a && . config/local/.env && set +a && uv run --with requests --with pyyaml python scripts/import_flows.py config/local/flow-sources.yaml
langflow-import-cluster: ## Import flows into cluster LangFlow (via port-forward)
@./scripts/langflow-import-cluster.sh $(or $(ENV),dev)
diff --git a/scripts/dev-langflow.sh b/scripts/dev-langflow.sh
index d88c341..ff2186f 100755
--- a/scripts/dev-langflow.sh
+++ b/scripts/dev-langflow.sh
@@ -14,7 +14,7 @@ source "$SCRIPT_DIR/lib/common.sh"
init_container_tool || exit 1
# Configuration
-LANGFLOW_VERSION="${LANGFLOW_VERSION:-latest}"
+LANGFLOW_VERSION="${LANGFLOW_VERSION:-1.8.0.rc0}"
LANGFLOW_IMAGE="${LANGFLOW_IMAGE:-docker.io/langflowai/langflow:${LANGFLOW_VERSION}}"
CONTAINER_NAME="app-langflow-dev"
LANGFLOW_PORT="${LANGFLOW_PORT:-7860}"
@@ -95,6 +95,10 @@ case "$1" in
-e LANGFUSE_HOST="${LANGFUSE_HOST:-http://${DB_HOST}:${LANGFUSE_WEB_PORT:-3000}}"
-e TZ="${TZ:-UTC}"
-e LANGFLOW_LAZY_LOAD_COMPONENTS=false
+ # V2 Workflow API (Phase 9: job model for long-running flows)
+ -e LANGFLOW_DEVELOPER_API_ENABLED=${LANGFLOW_DEVELOPER_API_ENABLED:-true}
+ ${LANGFLOW_API_KEY:+-e LANGFLOW_API_KEY=$LANGFLOW_API_KEY}
+ ${LANGFLOW_API_KEY_SOURCE:+-e LANGFLOW_API_KEY_SOURCE=$LANGFLOW_API_KEY_SOURCE}
)
# Detect host DNS servers (needed for VPN-resolved internal hostnames)
@@ -128,6 +132,7 @@ case "$1" in
-e GRANITE_GUARDIAN_ENDPOINT="${GRANITE_GUARDIAN_ENDPOINT:-}"
-e GRANITE_GUARDIAN_API_KEY="${GRANITE_GUARDIAN_API_KEY:-}"
-e GRANITE_CA_BUNDLE="${GRANITE_CA_BUNDLE:-}"
+ ${LANGFLOW_API_KEY:+-e LANGFLOW_API_KEY=$LANGFLOW_API_KEY}
)
CUSTOM_VOL_ARGS=(
@@ -279,10 +284,13 @@ case "$1" in
echo " LANGFLOW_DB - Database name (default: langflow)"
echo ""
echo "Custom image env vars (forwarded when LANGFLOW_IMAGE is set):"
- echo " GOOGLE_CLOUD_PROJECT - GCP project for Vertex AI"
- echo " GRANITE_GUARDIAN_ENDPOINT - Granite Guardian API endpoint"
- echo " GRANITE_GUARDIAN_API_KEY - Granite Guardian API key"
- echo " GRANITE_CA_BUNDLE - Custom CA bundle path"
+ echo " GOOGLE_CLOUD_PROJECT - GCP project for Vertex AI"
+ echo " GRANITE_GUARDIAN_ENDPOINT - Granite Guardian API endpoint"
+ echo " GRANITE_GUARDIAN_API_KEY - Granite Guardian API key"
+ echo " GRANITE_CA_BUNDLE - Custom CA bundle path"
+ echo " LANGFLOW_DEVELOPER_API_ENABLED - Enable V2 Workflow API (default: true)"
+ echo " LANGFLOW_API_KEY - Shared API key for backend-to-LangFlow auth"
+ echo " LANGFLOW_API_KEY_SOURCE - Set to 'env' to validate API key from env var"
echo ""
echo "Prerequisites:"
echo " PostgreSQL must be running: make db-start"
diff --git a/scripts/import_flows.py b/scripts/import_flows.py
index 838f165..31fd727 100644
--- a/scripts/import_flows.py
+++ b/scripts/import_flows.py
@@ -8,9 +8,7 @@
Environment variables:
LANGFLOW_URL - LangFlow API URL (default: http://localhost:7860)
- LANGFLOW_USER - LangFlow username (default: dev@localhost.local)
- LANGFLOW_PASSWORD - LangFlow password (default: devpassword123)
- LANGFLOW_API_KEY - LangFlow API key (optional, overrides user/password)
+ LANGFLOW_API_KEY - LangFlow API key (required, validated via x-api-key header)
FLOW_SOURCE_PATH - Simple mode: single local path to import
GITHUB_FLOW_TOKEN - Token for private git repos
@@ -38,8 +36,6 @@
PROJECT_ROOT = Path(__file__).parent.parent
DEFAULT_CONFIG = PROJECT_ROOT / "config" / "local" / "flow-sources.yaml"
LANGFLOW_URL = os.environ.get("LANGFLOW_URL", "http://localhost:7860")
-LANGFLOW_USER = os.environ.get("LANGFLOW_USER", "dev@localhost.local")
-LANGFLOW_PASSWORD = os.environ.get("LANGFLOW_PASSWORD", "devpassword123")
CACHE_DIR = Path(os.environ.get("FLOW_CACHE_DIR", "/tmp/flow-cache"))
# Component installation paths (relative to project root)
@@ -67,8 +63,8 @@
"metadata.google.internal", # GCP metadata
}
-# Global access token
-ACCESS_TOKEN: str | None = None
+# Global API key for LangFlow authentication
+API_KEY: str | None = os.environ.get("LANGFLOW_API_KEY")
# Cache for project name -> ID lookups
PROJECT_CACHE: dict[str, str] = {}
@@ -539,36 +535,34 @@ def install_components(source: dict) -> bool:
def authenticate() -> bool:
- """Authenticate with LangFlow and get access token."""
- global ACCESS_TOKEN
-
- # Check for API key first
- api_key = os.environ.get("LANGFLOW_API_KEY")
- if api_key:
- ACCESS_TOKEN = api_key
- log_info("Using API key from LANGFLOW_API_KEY")
- return True
+ """Verify LangFlow API key authentication.
- log_info(f"Authenticating as {LANGFLOW_USER}...")
+ Uses the LANGFLOW_API_KEY env var with x-api-key header.
+ LangFlow must have LANGFLOW_API_KEY_SOURCE=env set to validate
+ API keys from environment variables.
+ """
+ if not API_KEY:
+ log_error(
+ "LANGFLOW_API_KEY not set. "
+ "Set it in config/local/.env or export LANGFLOW_API_KEY=..."
+ )
+ return False
+
+ # Verify the API key works
resp = request_with_retry(
- "POST",
- f"{LANGFLOW_URL}/api/v1/login",
- data={"username": LANGFLOW_USER, "password": LANGFLOW_PASSWORD},
+ "GET",
+ f"{LANGFLOW_URL}/api/v1/flows/",
+ headers={"x-api-key": API_KEY},
timeout=10,
)
- if resp is None:
- return False
+ if resp is not None and resp.ok:
+ log_info("Authentication successful (API key)")
+ return True
- if resp.ok:
- try:
- data = resp.json()
- ACCESS_TOKEN = data.get("access_token")
- if ACCESS_TOKEN:
- log_info("Authentication successful")
- return True
- except json.JSONDecodeError:
- pass
- log_error(f"Authentication failed: {resp.text[:200]}")
+ detail = ""
+ if resp is not None:
+ detail = f": {resp.text[:200]}"
+ log_error(f"API key authentication failed{detail}")
return False
@@ -579,8 +573,8 @@ def list_all_flows() -> list[dict] | None:
Returns None on error.
"""
headers = {}
- if ACCESS_TOKEN:
- headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
+ if API_KEY:
+ headers["x-api-key"] = API_KEY
resp = request_with_retry(
"GET",
@@ -610,8 +604,8 @@ def delete_flow(flow_id: str) -> bool:
Returns True if deleted successfully.
"""
headers = {"Content-Type": "application/json"}
- if ACCESS_TOKEN:
- headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
+ if API_KEY:
+ headers["x-api-key"] = API_KEY
resp = request_with_retry(
"DELETE",
@@ -650,8 +644,8 @@ def find_flow_by_name(flows: list[dict], name: str, project_id: str | None = Non
def create_project(project_name: str) -> str | None:
"""Create a new project and return its ID."""
headers = {"Content-Type": "application/json"}
- if ACCESS_TOKEN:
- headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
+ if API_KEY:
+ headers["x-api-key"] = API_KEY
resp = request_with_retry(
"POST",
@@ -684,8 +678,8 @@ def get_project_id(project_name: str, create_if_missing: bool = True) -> str | N
return PROJECT_CACHE[project_name]
headers = {}
- if ACCESS_TOKEN:
- headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
+ if API_KEY:
+ headers["x-api-key"] = API_KEY
resp = request_with_retry(
"GET",
@@ -752,8 +746,8 @@ def import_flow_data(
same project, it is deleted before importing the new version.
"""
headers = {"Content-Type": "application/json"}
- if ACCESS_TOKEN:
- headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
+ if API_KEY:
+ headers["x-api-key"] = API_KEY
# Check for existing flow and delete if found
flows = list_all_flows()