diff --git a/backend/app/alembic/versions/a1b2c3d4e5f6_add_job_table.py b/backend/app/alembic/versions/a1b2c3d4e5f6_add_job_table.py new file mode 100644 index 0000000..b8b3b41 --- /dev/null +++ b/backend/app/alembic/versions/a1b2c3d4e5f6_add_job_table.py @@ -0,0 +1,45 @@ +"""add job table + +Revision ID: a1b2c3d4e5f6 +Revises: 146945cf3865 +Create Date: 2026-02-27 00:10:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes + + +# revision identifiers, used by Alembic. +revision = 'a1b2c3d4e5f6' +down_revision = '146945cf3865' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('job', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('chat_message_id', sa.Integer(), nullable=False), + sa.Column('langflow_job_id', sqlmodel.sql.sqltypes.AutoString(length=255), nullable=True), + sa.Column('flow_id', sqlmodel.sql.sqltypes.AutoString(length=255), nullable=True), + sa.Column('status', sqlmodel.sql.sqltypes.AutoString(length=50), nullable=False), + sa.Column('error_message', sa.Text(), nullable=True), + sa.Column('result_content', sa.Text(), nullable=True), + sa.Column('started_at', sa.DateTime(), nullable=True), + sa.Column('completed_at', sa.DateTime(), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(['chat_message_id'], ['chat_message.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_job_chat_message_id'), 'job', ['chat_message_id'], unique=False) + op.create_index(op.f('ix_job_langflow_job_id'), 'job', ['langflow_job_id'], unique=False) + op.create_index(op.f('ix_job_status'), 'job', ['status'], unique=False) + + +def downgrade(): + op.drop_index(op.f('ix_job_status'), table_name='job') + op.drop_index(op.f('ix_job_langflow_job_id'), table_name='job') + op.drop_index(op.f('ix_job_chat_message_id'), table_name='job') + op.drop_table('job') diff --git a/backend/app/api/routes/v1/chat_messages.py b/backend/app/api/routes/v1/chat_messages.py index 63c57a0..98e1b15 100644 --- a/backend/app/api/routes/v1/chat_messages.py +++ b/backend/app/api/routes/v1/chat_messages.py @@ -28,6 +28,9 @@ ChatMessageCreate, ChatMessagePublic, ChatMessagesPublic, + Job, + JobPublic, + JobStatus, Message, ) from app.core.config import settings @@ -131,6 +134,116 @@ def create_message( return message +class JobMessageRequest(BaseModel): + """Request body for job-based message endpoint.""" + content: str + flow_id: str | None = None + flow_name: str | None = None + + +@router.post("/job", response_model=JobPublic) +async def create_job_message( + *, + session: SessionDep, + current_user: CurrentUser, + chat_id: int, + request: JobMessageRequest, +) -> Any: + """ + Send a message and create a background job for AI processing. + + This endpoint: + 1. Saves the user message to the database + 2. Creates a placeholder assistant message (filled on job completion) + 3. Creates a Job record (status=pending) + 4. Submits to LangFlow V2 with background=true + 5. Updates Job with langflow_job_id and status=in_progress + 6. Returns JobPublic response (includes job id for frontend polling) + """ + # Verify chat exists and user has access + chat = get_chat_with_permission(session, current_user, chat_id) + flow_name = request.flow_name or settings.LANGFLOW_DEFAULT_FLOW + + # Save user message + user_message = ChatMessage( + chat_id=chat_id, + content=request.content, + role="user", + ) + session.add(user_message) + + # Create placeholder assistant message (content filled on job completion) + assistant_message = ChatMessage( + chat_id=chat_id, + content="", + role="assistant", + ) + session.add(assistant_message) + + # Update chat metadata + chat.updated_at = datetime.now(timezone.utc) + if not chat.flow_name and flow_name: + chat.flow_name = flow_name + session.add(chat) + session.commit() + session.refresh(user_message) + session.refresh(assistant_message) + + # Resolve flow ID + client = get_langflow_client() + resolved_flow_id = await client.resolve_flow_id( + flow_id=request.flow_id, flow_name=flow_name + ) + if not resolved_flow_id: + raise HTTPException(status_code=400, detail="No flow configured or found") + + # Create job record + job = Job( + chat_message_id=assistant_message.id, + flow_id=resolved_flow_id, + status=JobStatus.PENDING.value, + ) + session.add(job) + session.commit() + session.refresh(job) + + # Build tweaks and submit to LangFlow V2 + user_data = await build_user_settings_data( + session=session, user_id=current_user.id + ) + app_data = build_app_settings_data() + tweaks = build_generic_tweaks(user_data=user_data, app_data=app_data) + + try: + v2_inputs = client.build_v2_inputs( + message=request.content, + session_id=str(chat_id), + tweaks=tweaks, + ) + lf_response = await client.submit_workflow( + flow_id=resolved_flow_id, + inputs=v2_inputs, + session_id=str(chat_id), + ) + now = datetime.now(timezone.utc) + job.langflow_job_id = lf_response.get("job_id") + job.status = JobStatus.IN_PROGRESS.value + job.started_at = now + job.updated_at = now + except Exception as e: + logger.error(f"Failed to submit workflow: {e}") + now = datetime.now(timezone.utc) + job.status = JobStatus.FAILED.value + job.error_message = str(e) + job.completed_at = now + job.updated_at = now + + session.add(job) + session.commit() + session.refresh(job) + return job + + @router.delete("/{message_id}") def delete_message( session: SessionDep, diff --git a/backend/app/api/routes/v1/chats.py b/backend/app/api/routes/v1/chats.py index 9ef91fc..8f82cb1 100644 --- a/backend/app/api/routes/v1/chats.py +++ b/backend/app/api/routes/v1/chats.py @@ -7,6 +7,7 @@ - Create new chat - Update chat - Delete chat +- Get active job for a chat (page refresh recovery) """ from datetime import datetime, timezone @@ -19,10 +20,15 @@ from app.models import ( Chat, ChatCreate, + ChatMessage, ChatPublic, ChatsPublic, ChatUpdate, + Job, + JobPublic, + JobStatus, Message, + TERMINAL_STATUSES, ) router = APIRouter(prefix="/chats", tags=["chats"]) @@ -119,6 +125,37 @@ def update_chat( return chat +@router.get("/{id}/active-job", response_model=JobPublic) +def get_active_job( + session: SessionDep, current_user: CurrentUser, id: int +) -> Any: + """ + Get the active (non-terminal) job for a chat, if any. + + Used for page refresh recovery -- checks if there's an in-flight job + that the frontend should resume polling for. + + Returns 404 if no active job exists. + """ + chat = get_chat_with_permission(session, current_user, id) + + # Find the latest non-terminal job for any message in this chat + terminal_values = {s.value for s in TERMINAL_STATUSES} + statement = ( + select(Job) + .join(ChatMessage, Job.chat_message_id == ChatMessage.id) + .where(ChatMessage.chat_id == chat.id) + .where(Job.status.notin_(terminal_values)) + .order_by(Job.id.desc()) + .limit(1) + ) + job = session.exec(statement).first() + if not job: + raise HTTPException(status_code=404, detail="No active job found") + + return job + + @router.delete("/{id}") def delete_chat(session: SessionDep, current_user: CurrentUser, id: int) -> Message: """ diff --git a/backend/app/api/routes/v1/jobs.py b/backend/app/api/routes/v1/jobs.py new file mode 100644 index 0000000..181d51a --- /dev/null +++ b/backend/app/api/routes/v1/jobs.py @@ -0,0 +1,231 @@ +""" +Job API endpoints for tracking long-running LangFlow executions. + +This module provides: +- GET /jobs/{id} — Get job status with optional ?sync=true to poll LangFlow +- POST /jobs/{id}/cancel — Cancel a running job +""" + +import json +import logging +from datetime import datetime, timezone + +from fastapi import APIRouter, HTTPException + +from app.api.deps import CurrentUser, SessionDep +from app.models import Chat, ChatMessage, Job, JobPublic, JobStatus, TERMINAL_STATUSES +from app.services.langflow import get_langflow_client +from app.services.langflow.client import LangflowError + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/jobs", tags=["jobs"]) + + +def verify_job_ownership( + job: Job, current_user: CurrentUser, session: SessionDep +) -> None: + """ + Verify the current user owns the chat associated with this job. + + Traverses: job -> chat_message -> chat -> user_id + Admins can access any job. + + Raises: + HTTPException: 403 if user does not own the job's chat + HTTPException: 404 if chat_message or chat not found + """ + chat_message = session.get(ChatMessage, job.chat_message_id) + if not chat_message: + raise HTTPException(status_code=404, detail="Job's chat message not found") + + chat = session.get(Chat, chat_message.chat_id) + if not chat: + raise HTTPException(status_code=404, detail="Job's chat not found") + + if chat.user_id != current_user.id and not current_user.admin: + raise HTTPException(status_code=403, detail="Not enough permissions") + + +def extract_result_text(outputs: dict) -> str: + """ + Extract the chat output text from LangFlow V2 response outputs. + + V2 outputs structure: {"Component Name": {"type": "message", "content": "...", ...}} + Looks for the first component with type "message" and returns its content. + + Args: + outputs: The 'outputs' dict from V2 workflow status response + + Returns: + The extracted text, or empty string if not found + """ + try: + for component_name, component_output in outputs.items(): + if isinstance(component_output, dict): + content = component_output.get("content") + if content is not None: + return str(content) + except (AttributeError, TypeError): + logger.warning("Failed to extract result text from V2 outputs") + return "" + + +async def sync_job_status_with_langflow( + job: Job, session: SessionDep +) -> None: + """ + Sync job status with LangFlow V2 API. Skip for terminal states. + + Called when GET /jobs/{id}?sync=true. Polls LangFlow for the latest + status and updates our database record. + + Args: + job: The Job record to sync + session: Database session + """ + if not job.langflow_job_id: + return + if job.status in {s.value for s in TERMINAL_STATUSES}: + return + + try: + client = get_langflow_client() + lf_response = await client.get_workflow_status(job.langflow_job_id) + lf_status = lf_response.get("status", "").lower() + + STATUS_MAP = { + "queued": JobStatus.PENDING, + "pending": JobStatus.PENDING, + "in_progress": JobStatus.IN_PROGRESS, + "running": JobStatus.IN_PROGRESS, + "completed": JobStatus.COMPLETED, + "success": JobStatus.COMPLETED, + "failed": JobStatus.FAILED, + "error": JobStatus.FAILED, + "cancelled": JobStatus.CANCELLED, + "canceled": JobStatus.CANCELLED, + "timed_out": JobStatus.TIMED_OUT, + } + + new_status = STATUS_MAP.get(lf_status) + if new_status and new_status.value != job.status: + job.status = new_status.value + now = datetime.now(timezone.utc) + + if new_status == JobStatus.IN_PROGRESS and not job.started_at: + job.started_at = now + + if new_status == JobStatus.COMPLETED: + result_text = extract_result_text( + lf_response.get("outputs", {}) + ) + job.result_content = result_text + job.completed_at = now + # Write result back to the chat message so it appears in message history + if result_text and job.chat_message_id: + chat_message = session.get(ChatMessage, job.chat_message_id) + if chat_message: + chat_message.content = result_text + session.add(chat_message) + elif new_status in (JobStatus.FAILED, JobStatus.TIMED_OUT): + errors = lf_response.get("errors", []) + if errors and isinstance(errors[0], dict): + job.error_message = errors[0].get("error", "Unknown error") + else: + job.error_message = lf_response.get("error", "Unknown error") + job.completed_at = now + + job.updated_at = now + session.add(job) + session.commit() + session.refresh(job) + + except LangflowError as e: + # LangFlow V2 returns HTTP 500 for failed jobs with JOB_FAILED code. + # Parse the error to update job status instead of leaving it stuck. + if e.status_code == 500 and "JOB_FAILED" in e.message: + logger.info(f"Job {job.id} failed in LangFlow: {e.message}") + # Extract readable error from the JSON response + error_msg = "Job failed in LangFlow" + try: + detail = json.loads( + e.message.replace("Failed to get workflow status: ", "", 1) + ) + error_msg = detail.get("detail", {}).get("message", error_msg) + except (json.JSONDecodeError, AttributeError): + pass + now = datetime.now(timezone.utc) + job.status = JobStatus.FAILED.value + job.error_message = error_msg + job.completed_at = now + job.updated_at = now + session.add(job) + session.commit() + session.refresh(job) + else: + logger.warning(f"LangFlow error syncing job {job.id}: {e}") + except Exception as e: + logger.warning(f"Error syncing job {job.id} with LangFlow: {e}") + + +@router.get("/{job_id}", response_model=JobPublic) +async def get_job( + job_id: int, + sync: bool = False, + session: SessionDep = None, + current_user: CurrentUser = None, +) -> Job: + """ + Get job status. + + If sync=true, poll LangFlow V2 API first and update our record + before returning. This is the primary mechanism for frontend polling. + """ + job = session.get(Job, job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + verify_job_ownership(job, current_user, session) + + if sync: + await sync_job_status_with_langflow(job, session) + + return job + + +@router.post("/{job_id}/cancel", response_model=JobPublic) +async def cancel_job( + job_id: int, + session: SessionDep = None, + current_user: CurrentUser = None, +) -> Job: + """ + Cancel a running job by calling LangFlow V2 stop endpoint. + + Returns 400 if the job is already in a terminal state. + """ + job = session.get(Job, job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + verify_job_ownership(job, current_user, session) + + if job.status in {s.value for s in TERMINAL_STATUSES}: + raise HTTPException(status_code=400, detail="Job already in terminal state") + + client = get_langflow_client() + if job.langflow_job_id: + try: + await client.stop_workflow(job.langflow_job_id) + except Exception as e: + logger.warning(f"Failed to stop LangFlow workflow for job {job.id}: {e}") + + now = datetime.now(timezone.utc) + job.status = JobStatus.CANCELLED.value + job.completed_at = now + job.updated_at = now + session.add(job) + session.commit() + session.refresh(job) + return job diff --git a/backend/app/api/routes/v1/router.py b/backend/app/api/routes/v1/router.py index 733a57e..c101ada 100644 --- a/backend/app/api/routes/v1/router.py +++ b/backend/app/api/routes/v1/router.py @@ -6,6 +6,7 @@ from .chat_messages import router as chat_messages_router from .flows import router as flows_router from .integrations import router as integrations_router +from .jobs import router as jobs_router router = APIRouter() router.include_router(utils_router, prefix="/utils") @@ -15,3 +16,4 @@ router.include_router(chat_messages_router) router.include_router(flows_router) router.include_router(integrations_router) +router.include_router(jobs_router) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index f02c216..5d5a086 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -69,6 +69,14 @@ ChatMessagesPublic, ) +# Job models +from app.models.job import ( + Job, + JobPublic, + JobStatus, + TERMINAL_STATUSES, +) + # Integration models from app.models.user_integration import ( UserIntegration, @@ -114,6 +122,11 @@ "ChatMessageCreate", "ChatMessagePublic", "ChatMessagesPublic", + # Job + "Job", + "JobPublic", + "JobStatus", + "TERMINAL_STATUSES", # Integration "UserIntegration", "UserIntegrationPublic", diff --git a/backend/app/models/chat_message.py b/backend/app/models/chat_message.py index e01277a..7302b69 100644 --- a/backend/app/models/chat_message.py +++ b/backend/app/models/chat_message.py @@ -16,6 +16,7 @@ if TYPE_CHECKING: from app.models.chat import Chat + from app.models.job import Job # Valid roles for chat messages - type hint only (SQLModel can't use Literal for db columns) @@ -24,8 +25,12 @@ class ChatMessageBase(SQLModel): - """Shared properties for ChatMessage.""" - content: str = Field(min_length=1, max_length=10000) + """Shared properties for ChatMessage. + + Note: content allows empty strings to support placeholder assistant + messages created by the job-based flow (filled on job completion). + """ + content: str = Field(min_length=0, max_length=10000) role: str = Field(max_length=20) @field_validator("role") @@ -38,8 +43,12 @@ def validate_role(cls, v: str) -> str: class ChatMessageCreate(ChatMessageBase): - """Properties to receive on message creation.""" - pass + """Properties to receive on message creation. + + Requires non-empty content (unlike the base which allows empty + for placeholder assistant messages). + """ + content: str = Field(min_length=1, max_length=10000) class ChatMessage(ChatMessageBase, table=True): @@ -53,8 +62,9 @@ class ChatMessage(ChatMessageBase, table=True): sa_type=DateTime(timezone=True), ) - # Relationship + # Relationships chat: Optional["Chat"] = Relationship(back_populates="messages") + job: Optional["Job"] = Relationship(back_populates="chat_message") class ChatMessagePublic(ChatMessageBase): diff --git a/backend/app/models/job.py b/backend/app/models/job.py new file mode 100644 index 0000000..9c97cb3 --- /dev/null +++ b/backend/app/models/job.py @@ -0,0 +1,86 @@ +""" +Job model and related schemas. + +This module contains: +- JobStatus enum for job lifecycle states +- TERMINAL_STATUSES set for completed states +- Job database model (table=True) +- JobPublic: Output schema for API responses +""" + +from datetime import datetime, timezone +from enum import Enum +from typing import TYPE_CHECKING, Optional + +from sqlalchemy import Column, DateTime, Text +from sqlmodel import Field, Relationship, SQLModel + +if TYPE_CHECKING: + from app.models.chat_message import ChatMessage + + +class JobStatus(str, Enum): + """Job lifecycle states.""" + PENDING = "pending" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + TIMED_OUT = "timed_out" + + +TERMINAL_STATUSES = { + JobStatus.COMPLETED, + JobStatus.FAILED, + JobStatus.CANCELLED, + JobStatus.TIMED_OUT, +} + + +class Job(SQLModel, table=True): + """Job database model for tracking long-running LangFlow executions.""" + __tablename__ = "job" + + id: int | None = Field(default=None, primary_key=True) + chat_message_id: int = Field( + foreign_key="chat_message.id", nullable=False, index=True, ondelete="CASCADE" + ) + langflow_job_id: str | None = Field( + default=None, index=True, max_length=255 + ) + flow_id: str | None = Field(default=None, max_length=255) + status: str = Field(default=JobStatus.PENDING, max_length=50, index=True) + error_message: str | None = Field( + default=None, sa_column=Column("error_message", Text, nullable=True) + ) + result_content: str | None = Field( + default=None, sa_column=Column("result_content", Text, nullable=True) + ) + started_at: datetime | None = Field(default=None) + completed_at: datetime | None = Field(default=None) + created_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + sa_type=DateTime(timezone=True), + ) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + sa_type=DateTime(timezone=True), + ) + + # Relationship back to ChatMessage + chat_message: Optional["ChatMessage"] = Relationship(back_populates="job") + + +class JobPublic(SQLModel): + """Properties to return via API.""" + id: int + chat_message_id: int + langflow_job_id: str | None + flow_id: str | None + status: str + error_message: str | None + result_content: str | None + started_at: datetime | None + completed_at: datetime | None + created_at: datetime + updated_at: datetime diff --git a/backend/app/services/langflow/client.py b/backend/app/services/langflow/client.py index a314250..b7baeeb 100644 --- a/backend/app/services/langflow/client.py +++ b/backend/app/services/langflow/client.py @@ -373,6 +373,155 @@ async def chat( logger.error(f"Langflow connection error: {e}") raise LangflowError(f"Failed to connect to Langflow: {str(e)}") + # --- V2 Workflow API Methods --- + + def build_v2_inputs( + self, + message: str, + session_id: str | None = None, + tweaks: dict | None = None, + ) -> dict: + """ + Build V2-compatible flat inputs from message and tweaks. + + V2 uses flat format: "display_name.param_name": value + parse_flat_inputs() converts this to tweaks dict, then process_tweaks() + applies them to the graph (supports both component IDs and display names). + + Args: + message: The user message + session_id: Optional session ID for conversation continuity + tweaks: Optional tweaks dict (display-name keyed, e.g. + {"User Settings": {"settings_data": {...}}}) + + Returns: + Dict with flat inputs for V2 WorkflowExecutionRequest.inputs + """ + inputs: dict = { + "Chat Input.input_value": message, + } + if session_id: + inputs["Chat Input.session_id"] = session_id + if tweaks: + for component_name, params in tweaks.items(): + if isinstance(params, dict): + for param_name, value in params.items(): + inputs[f"{component_name}.{param_name}"] = value + return inputs + + async def submit_workflow( + self, + flow_id: str, + inputs: dict, + session_id: str | None = None, + ) -> dict: + """ + Submit workflow for background execution via V2 API. + + Args: + flow_id: The flow ID to execute + inputs: V2-compatible inputs (from build_v2_inputs) + session_id: Optional session ID + + Returns: + Dict with job_id, status, and other metadata from V2 API + + Raises: + LangflowError: If the API call fails + """ + url = f"{self.base_url}/api/v2/workflows" + payload = { + "flow_id": flow_id, + "background": True, + "inputs": inputs, + } + + async with httpx.AsyncClient(timeout=CHAT_TIMEOUT) as client: + try: + response = await client.post( + url, json=payload, headers=self.headers + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + logger.error( + f"Langflow V2 submit error: {e.response.status_code} - {e.response.text}" + ) + raise LangflowError( + f"Failed to submit workflow: {e.response.text}", + status_code=e.response.status_code, + ) + except httpx.RequestError as e: + logger.error(f"Langflow V2 connection error: {e}") + raise LangflowError(f"Failed to connect to Langflow V2: {str(e)}") + + async def get_workflow_status(self, job_id: str) -> dict: + """ + Get workflow status from V2 API. + + Args: + job_id: The LangFlow job ID to check + + Returns: + Dict with job_id, status, outputs, and other metadata + + Raises: + LangflowError: If the API call fails + """ + url = f"{self.base_url}/api/v2/workflows?job_id={job_id}" + + async with httpx.AsyncClient(timeout=LIST_FLOWS_TIMEOUT) as client: + try: + response = await client.get(url, headers=self.headers) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + logger.error( + f"Langflow V2 status error: {e.response.status_code} - {e.response.text}" + ) + raise LangflowError( + f"Failed to get workflow status: {e.response.text}", + status_code=e.response.status_code, + ) + except httpx.RequestError as e: + logger.error(f"Langflow V2 connection error: {e}") + raise LangflowError(f"Failed to connect to Langflow V2: {str(e)}") + + async def stop_workflow(self, job_id: str) -> dict: + """ + Stop a running workflow via V2 API. + + Args: + job_id: The LangFlow job ID to stop + + Returns: + Dict with job_id and message from V2 API + + Raises: + LangflowError: If the API call fails + """ + url = f"{self.base_url}/api/v2/workflows/stop" + payload = {"job_id": job_id} + + async with httpx.AsyncClient(timeout=LIST_FLOWS_TIMEOUT) as client: + try: + response = await client.post( + url, json=payload, headers=self.headers + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + logger.error( + f"Langflow V2 stop error: {e.response.status_code} - {e.response.text}" + ) + raise LangflowError( + f"Failed to stop workflow: {e.response.text}", + status_code=e.response.status_code, + ) + except httpx.RequestError as e: + logger.error(f"Langflow V2 connection error: {e}") + raise LangflowError(f"Failed to connect to Langflow V2: {str(e)}") + async def chat_stream( self, message: str, diff --git a/backend/app/services/langflow/mock_client.py b/backend/app/services/langflow/mock_client.py index 27c1b15..40ec8d2 100644 --- a/backend/app/services/langflow/mock_client.py +++ b/backend/app/services/langflow/mock_client.py @@ -197,6 +197,103 @@ async def chat_stream( logger.debug(f"[MOCK] Finished streaming response: {response[:50]}...") + # --- V2 Workflow API Mock Methods --- + + async def resolve_flow_id( + self, + flow_id: str | None = None, + flow_name: str | None = None, + ) -> str | None: + """ + Mock resolve_flow_id - returns the flow_id if provided, + otherwise returns a mock flow ID. + """ + if self.simulate_error: + raise LangflowError(self.error_message, status_code=500) + + if flow_id: + return flow_id + return "mock-flow-1" + + def build_v2_inputs( + self, + message: str, + session_id: str | None = None, + tweaks: dict | None = None, + ) -> dict: + """Mock build_v2_inputs - mirrors LangflowClient.""" + inputs: dict = { + "Chat Input.input_value": message, + } + if session_id: + inputs["Chat Input.session_id"] = session_id + if tweaks: + for component_name, params in tweaks.items(): + if isinstance(params, dict): + for param_name, value in params.items(): + inputs[f"{component_name}.{param_name}"] = value + return inputs + + async def submit_workflow( + self, + flow_id: str, + inputs: dict, + session_id: str | None = None, + ) -> dict: + """ + Mock submit_workflow - returns a mock job submission response. + """ + self._record_call("submit_workflow", inputs.get("Chat Input.input_value", ""), session_id) + logger.debug(f"[MOCK] submit_workflow called: flow_id={flow_id}") + + if self.simulate_error: + raise LangflowError(self.error_message, status_code=500) + + await asyncio.sleep(0.05) + return { + "job_id": "mock-job-123", + "flow_id": flow_id, + "status": "queued", + } + + async def get_workflow_status(self, job_id: str) -> dict: + """ + Mock get_workflow_status - returns a completed job status. + """ + logger.debug(f"[MOCK] get_workflow_status called: job_id={job_id}") + + if self.simulate_error: + raise LangflowError(self.error_message, status_code=500) + + response = self._get_next_response() + return { + "job_id": job_id, + "status": "completed", + "outputs": { + "Chat Output": { + "type": "message", + "component_id": "ChatOutput-mock", + "status": "completed", + "content": response, + "metadata": {"component_type": "ChatOutput"}, + } + }, + } + + async def stop_workflow(self, job_id: str) -> dict: + """ + Mock stop_workflow - returns a cancellation confirmation. + """ + logger.debug(f"[MOCK] stop_workflow called: job_id={job_id}") + + if self.simulate_error: + raise LangflowError(self.error_message, status_code=500) + + return { + "job_id": job_id, + "message": "Job cancelled successfully.", + } + # Test helper methods def get_call_history(self) -> list[dict]: diff --git a/backend/app/services/protocols.py b/backend/app/services/protocols.py index 31060aa..2700d0d 100644 --- a/backend/app/services/protocols.py +++ b/backend/app/services/protocols.py @@ -91,3 +91,58 @@ async def list_flows(self) -> list["Flow"]: LangflowError: If the API call fails """ ... + + def build_v2_inputs( + self, + message: str, + session_id: str | None = None, + tweaks: dict | None = None, + ) -> dict: + """ + Build V2-compatible inputs from message and tweaks. + + Returns: + Dict with input_value, session_id, and tweaks for V2 payload + """ + ... + + async def submit_workflow( + self, + flow_id: str, + inputs: dict, + session_id: str | None = None, + ) -> dict: + """ + Submit workflow for background execution via V2 API. + + Returns: + Dict with job_id, status, and other metadata + + Raises: + LangflowError: If the API call fails + """ + ... + + async def get_workflow_status(self, job_id: str) -> dict: + """ + Get workflow status from V2 API. + + Returns: + Dict with job_id, status, outputs, and other metadata + + Raises: + LangflowError: If the API call fails + """ + ... + + async def stop_workflow(self, job_id: str) -> dict: + """ + Stop a running workflow via V2 API. + + Returns: + Dict with job_id and message + + Raises: + LangflowError: If the API call fails + """ + ... diff --git a/backend/tests/api/routes/v1/test_chat_messages.py b/backend/tests/api/routes/v1/test_chat_messages.py new file mode 100644 index 0000000..579e5cc --- /dev/null +++ b/backend/tests/api/routes/v1/test_chat_messages.py @@ -0,0 +1,177 @@ +"""Tests for the job-based chat message endpoint.""" + +import pytest +from fastapi.testclient import TestClient +from sqlmodel import Session + +from app.models import Chat, ChatMessage, Job, JobStatus, User + + +@pytest.fixture +def dev_user(session: Session) -> User: + """Create the dev-user that matches the local development user.""" + user = User( + email="dev-user@example.com", + username="dev-user", + full_name="Development User", + ) + session.add(user) + session.commit() + session.refresh(user) + return user + + +@pytest.fixture +def test_chat(session: Session, dev_user: User) -> Chat: + """Create a test chat owned by the dev user.""" + chat = Chat( + title="Test Chat", + user_id=dev_user.id, + ) + session.add(chat) + session.commit() + session.refresh(chat) + return chat + + +class TestCreateJobMessage: + """Tests for POST /v1/chats/{id}/messages/job endpoint.""" + + def test_create_job_message( + self, client: TestClient, session: Session, test_chat: Chat + ): + """Test creating a job-based message returns JobPublic.""" + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "Hello, AI!"}, + ) + assert response.status_code == 200 + data = response.json() + + # Should return JobPublic + assert "id" in data + assert "status" in data + assert "chat_message_id" in data + assert data["status"] == "in_progress" + assert data["langflow_job_id"] == "mock-job-123" + assert data["flow_id"] == "mock-flow-1" + + def test_create_job_message_saves_user_message( + self, client: TestClient, session: Session, test_chat: Chat + ): + """Test that user message is saved to database.""" + client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "Test user message"}, + ) + + # Check user message was saved + user_messages = ( + session.query(ChatMessage) + .filter( + ChatMessage.chat_id == test_chat.id, + ChatMessage.role == "user", + ) + .all() + ) + assert len(user_messages) == 1 + assert user_messages[0].content == "Test user message" + + def test_create_job_message_creates_placeholder( + self, client: TestClient, session: Session, test_chat: Chat + ): + """Test that a placeholder assistant message is created.""" + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "Hello!"}, + ) + data = response.json() + + # Check placeholder assistant message + assistant_msg = session.get(ChatMessage, data["chat_message_id"]) + assert assistant_msg is not None + assert assistant_msg.role == "assistant" + assert assistant_msg.content == "" # placeholder + + def test_create_job_message_creates_job_record( + self, client: TestClient, session: Session, test_chat: Chat + ): + """Test that a Job record is created and linked to the assistant message.""" + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "Hello!"}, + ) + data = response.json() + job_id = data["id"] + + # Verify job exists in database + job = session.get(Job, job_id) + assert job is not None + assert job.status == JobStatus.IN_PROGRESS.value + assert job.langflow_job_id == "mock-job-123" + + def test_create_job_message_sets_flow_name( + self, client: TestClient, session: Session, test_chat: Chat + ): + """Test that flow_name is locked on the chat.""" + assert test_chat.flow_name is None + + client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "Hello!", "flow_name": "test-flow"}, + ) + + session.expire_all() + updated_chat = session.get(Chat, test_chat.id) + # The mock client resolve_flow_id returns "mock-flow-1" regardless, + # but the chat's flow_name should be set from the request + assert updated_chat.flow_name == "test-flow" + + def test_create_job_message_chat_not_found( + self, client: TestClient, dev_user: User + ): + """Test creating a job message in non-existent chat returns 404.""" + response = client.post( + "/api/v1/chats/99999/messages/job", + json={"content": "Hello!"}, + ) + assert response.status_code == 404 + + def test_create_job_message_submit_error_results_in_failed_job( + self, client: TestClient, session: Session, test_chat: Chat, monkeypatch + ): + """Test that LangFlow submission errors result in FAILED job status.""" + from app.services.langflow.mock_client import MockLangflowClient + from app.api.routes.v1 import chat_messages + + # Create a custom mock that only errors on submit_workflow (not resolve_flow_id) + class SubmitErrorClient(MockLangflowClient): + async def submit_workflow(self, flow_id, inputs, session_id=None): + from app.services.langflow.client import LangflowError + raise LangflowError("V2 submission failed", status_code=500) + + error_client = SubmitErrorClient() + monkeypatch.setattr( + chat_messages, "get_langflow_client", lambda: error_client + ) + + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "This will fail on submit"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "failed" + assert "V2 submission failed" in data["error_message"] + + def test_create_job_preserves_existing_stream( + self, client: TestClient, test_chat: Chat + ): + """Test that the existing /stream endpoint still works.""" + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/stream", + json={"content": "Hello via stream!"}, + ) + assert response.status_code == 200 + assert response.headers["content-type"] == "text/event-stream; charset=utf-8" diff --git a/backend/tests/api/routes/v1/test_jobs.py b/backend/tests/api/routes/v1/test_jobs.py new file mode 100644 index 0000000..6062c13 --- /dev/null +++ b/backend/tests/api/routes/v1/test_jobs.py @@ -0,0 +1,207 @@ +"""Tests for the Job API endpoints.""" + +import pytest +from fastapi.testclient import TestClient +from sqlmodel import Session + +from app.models import Chat, ChatMessage, Job, JobStatus, User + + +@pytest.fixture +def dev_user(session: Session) -> User: + """Create the dev-user that matches the local development user.""" + user = User( + email="dev-user@example.com", + username="dev-user", + full_name="Development User", + ) + session.add(user) + session.commit() + session.refresh(user) + return user + + +@pytest.fixture +def test_chat(session: Session, dev_user: User) -> Chat: + """Create a test chat owned by the dev user.""" + chat = Chat( + title="Test Chat", + user_id=dev_user.id, + ) + session.add(chat) + session.commit() + session.refresh(chat) + return chat + + +@pytest.fixture +def test_message(session: Session, test_chat: Chat) -> ChatMessage: + """Create a test assistant message linked to the chat.""" + message = ChatMessage( + chat_id=test_chat.id, + content="", + role="assistant", + ) + session.add(message) + session.commit() + session.refresh(message) + return message + + +@pytest.fixture +def test_job(session: Session, test_message: ChatMessage) -> Job: + """Create a test job in pending state.""" + job = Job( + chat_message_id=test_message.id, + flow_id="mock-flow-1", + status=JobStatus.PENDING.value, + langflow_job_id="mock-lf-job-123", + ) + session.add(job) + session.commit() + session.refresh(job) + return job + + +@pytest.fixture +def completed_job(session: Session, test_message: ChatMessage) -> Job: + """Create a test job in completed state.""" + job = Job( + chat_message_id=test_message.id, + flow_id="mock-flow-1", + status=JobStatus.COMPLETED.value, + langflow_job_id="mock-lf-job-456", + result_content="Test result", + ) + session.add(job) + session.commit() + session.refresh(job) + return job + + +class TestGetJob: + """Tests for GET /v1/jobs/{id} endpoint.""" + + def test_get_job(self, client: TestClient, test_job: Job): + """Test getting a job by ID.""" + response = client.get(f"/api/v1/jobs/{test_job.id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == test_job.id + assert data["status"] == "pending" + assert data["langflow_job_id"] == "mock-lf-job-123" + assert data["flow_id"] == "mock-flow-1" + + def test_get_job_not_found(self, client: TestClient, dev_user: User): + """Test getting a non-existent job returns 404.""" + response = client.get("/api/v1/jobs/99999") + assert response.status_code == 404 + assert "not found" in response.json()["detail"].lower() + + def test_get_job_with_sync(self, client: TestClient, test_job: Job): + """Test getting a job with sync=true triggers LangFlow poll.""" + response = client.get(f"/api/v1/jobs/{test_job.id}?sync=true") + assert response.status_code == 200 + data = response.json() + # Mock client returns "completed" status, so after sync the job + # should be updated to completed + assert data["status"] == "completed" + assert data["result_content"] is not None + + def test_get_job_sync_skips_terminal( + self, client: TestClient, completed_job: Job + ): + """Test that sync=true skips polling for terminal state jobs.""" + response = client.get(f"/api/v1/jobs/{completed_job.id}?sync=true") + assert response.status_code == 200 + data = response.json() + # Should still be completed (sync skipped) + assert data["status"] == "completed" + + def test_get_job_without_sync(self, client: TestClient, test_job: Job): + """Test that without sync=true, the job status is returned as-is.""" + response = client.get(f"/api/v1/jobs/{test_job.id}") + assert response.status_code == 200 + data = response.json() + # Should remain pending since we didn't sync + assert data["status"] == "pending" + + +class TestCancelJob: + """Tests for POST /v1/jobs/{id}/cancel endpoint.""" + + def test_cancel_pending_job(self, client: TestClient, test_job: Job): + """Test cancelling a pending job.""" + response = client.post(f"/api/v1/jobs/{test_job.id}/cancel") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "cancelled" + assert data["completed_at"] is not None + + def test_cancel_job_not_found(self, client: TestClient, dev_user: User): + """Test cancelling a non-existent job returns 404.""" + response = client.post("/api/v1/jobs/99999/cancel") + assert response.status_code == 404 + + def test_cancel_completed_job_returns_400( + self, client: TestClient, completed_job: Job + ): + """Test that cancelling a completed job returns 400.""" + response = client.post(f"/api/v1/jobs/{completed_job.id}/cancel") + assert response.status_code == 400 + assert "terminal" in response.json()["detail"].lower() + + def test_cancel_job_without_langflow_id( + self, client: TestClient, session: Session, test_message: ChatMessage + ): + """Test cancelling a job that has no langflow_job_id (still pending).""" + job = Job( + chat_message_id=test_message.id, + flow_id="mock-flow-1", + status=JobStatus.PENDING.value, + langflow_job_id=None, + ) + session.add(job) + session.commit() + session.refresh(job) + + response = client.post(f"/api/v1/jobs/{job.id}/cancel") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "cancelled" + + +class TestJobOwnership: + """Tests for job ownership verification.""" + + def test_get_job_forbidden_for_other_user( + self, session: Session, test_job: Job + ): + """Test that a user cannot access another user's job.""" + from app.api.deps import get_current_user, get_db + from app.main import app + + # Create another user + other_user = User( + email="other@example.com", + username="other-user", + full_name="Other User", + ) + session.add(other_user) + session.commit() + session.refresh(other_user) + + def get_other_user(): + return other_user + + def get_session_override(): + return session + + app.dependency_overrides[get_current_user] = get_other_user + app.dependency_overrides[get_db] = get_session_override + + other_client = TestClient(app) + response = other_client.get(f"/api/v1/jobs/{test_job.id}") + assert response.status_code == 403 + + app.dependency_overrides.clear() diff --git a/backend/tests/api/test_job_lifecycle.py b/backend/tests/api/test_job_lifecycle.py new file mode 100644 index 0000000..a69283a --- /dev/null +++ b/backend/tests/api/test_job_lifecycle.py @@ -0,0 +1,266 @@ +""" +End-to-end job lifecycle integration tests. + +Tests the complete flow: message send -> job creation -> LangFlow background +execution -> result appears in chat. Uses MockLangflowClient. +""" + +import pytest +from fastapi.testclient import TestClient +from sqlmodel import Session + +from app.models import Chat, User + + +@pytest.fixture +def dev_user(session: Session) -> User: + """Create the dev-user that matches the local development user.""" + user = User( + email="dev-user@example.com", + username="dev-user", + full_name="Development User", + ) + session.add(user) + session.commit() + session.refresh(user) + return user + + +@pytest.fixture +def test_chat(session: Session, dev_user: User) -> Chat: + """Create a test chat owned by the dev user.""" + chat = Chat( + title="Lifecycle Test Chat", + user_id=dev_user.id, + ) + session.add(chat) + session.commit() + session.refresh(chat) + return chat + + +def create_test_chat(client: TestClient) -> dict: + """Helper to create a chat via the API.""" + response = client.post( + "/api/v1/chats/", + json={"title": "E2E Test Chat"}, + ) + assert response.status_code == 200 + return response.json() + + +class TestJobLifecycleHappyPath: + """E2E: user sends message, job is created, LangFlow executes, result in chat.""" + + def test_full_lifecycle( + self, client: TestClient, session: Session, test_chat: Chat + ): + """ + Complete happy path: message -> job created -> poll shows completed + -> result in chat messages. + """ + chat_id = test_chat.id + + # 1. Send message via job endpoint + response = client.post( + f"/api/v1/chats/{chat_id}/messages/job", + json={"content": "What is enterprise architecture?"}, + ) + assert response.status_code == 200 + job = response.json() + assert job["status"] in ("pending", "in_progress") + job_id = job["id"] + assert job["flow_id"] is not None + + # 2. Poll for job status with sync (mock client returns completed) + response = client.get(f"/api/v1/jobs/{job_id}?sync=true") + assert response.status_code == 200 + job = response.json() + assert job["status"] == "completed" + assert job["result_content"] is not None + assert len(job["result_content"]) > 0 + + # 3. Verify messages were created in the chat + response = client.get(f"/api/v1/chats/{chat_id}/messages/") + assert response.status_code == 200 + messages = response.json() + assert messages["count"] >= 2 # user message + assistant message + + # Find the user and assistant messages + roles = [m["role"] for m in messages["data"]] + assert "user" in roles + assert "assistant" in roles + + def test_job_response_includes_required_fields( + self, client: TestClient, test_chat: Chat + ): + """Verify job response contains all fields needed by frontend polling.""" + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "Test query"}, + ) + data = response.json() + assert "id" in data + assert "status" in data + assert "chat_message_id" in data + assert "langflow_job_id" in data + assert "flow_id" in data + assert "created_at" in data + + +class TestJobCancellation: + """E2E: user cancels an in-progress job, cancellation reflected in chat.""" + + def test_cancel_in_progress_job( + self, client: TestClient, session: Session, test_chat: Chat + ): + """Create a job, cancel it, verify status is cancelled.""" + chat_id = test_chat.id + + # Create job + response = client.post( + f"/api/v1/chats/{chat_id}/messages/job", + json={"content": "Long running query"}, + ) + assert response.status_code == 200 + job_id = response.json()["id"] + + # Cancel the job + response = client.post(f"/api/v1/jobs/{job_id}/cancel") + assert response.status_code == 200 + cancelled = response.json() + assert cancelled["status"] == "cancelled" + assert cancelled["completed_at"] is not None + + def test_cancel_already_terminal_returns_400( + self, client: TestClient, session: Session, test_chat: Chat + ): + """Cancelling a completed job returns 400 (already terminal).""" + chat_id = test_chat.id + + # Create job + response = client.post( + f"/api/v1/chats/{chat_id}/messages/job", + json={"content": "Quick query"}, + ) + job_id = response.json()["id"] + + # Sync to complete the job (mock returns completed) + client.get(f"/api/v1/jobs/{job_id}?sync=true") + + # Try to cancel the completed job + response = client.post(f"/api/v1/jobs/{job_id}/cancel") + assert response.status_code == 400 + assert "terminal" in response.json()["detail"].lower() + + +class TestPageRefreshRecovery: + """E2E: page refresh during active job resumes polling and shows result.""" + + def test_active_job_endpoint_returns_in_flight_job( + self, client: TestClient, session: Session, test_chat: Chat + ): + """ + After creating a job, GET /chats/{id}/active-job returns the + in-flight job for page refresh recovery. + """ + chat_id = test_chat.id + + # Create job (mock returns in_progress status since submit succeeds) + response = client.post( + f"/api/v1/chats/{chat_id}/messages/job", + json={"content": "In-flight query"}, + ) + assert response.status_code == 200 + job_id = response.json()["id"] + + # Check for active job (simulates page refresh) + response = client.get(f"/api/v1/chats/{chat_id}/active-job") + assert response.status_code == 200 + active_job = response.json() + assert active_job["id"] == job_id + assert active_job["status"] in ("pending", "in_progress") + + def test_no_active_job_after_completion( + self, client: TestClient, session: Session, test_chat: Chat + ): + """After a job completes, active-job returns 404.""" + chat_id = test_chat.id + + # Create and complete a job + response = client.post( + f"/api/v1/chats/{chat_id}/messages/job", + json={"content": "Quick query"}, + ) + job_id = response.json()["id"] + + # Sync to complete + client.get(f"/api/v1/jobs/{job_id}?sync=true") + + # No active job should exist + response = client.get(f"/api/v1/chats/{chat_id}/active-job") + assert response.status_code == 404 + + +class TestJobErrorHandling: + """E2E: failed job shows error with expandable details and retry.""" + + def test_job_not_found(self, client: TestClient, dev_user: User): + """GET /jobs/99999 returns 404.""" + response = client.get("/api/v1/jobs/99999") + assert response.status_code == 404 + + def test_submit_error_creates_failed_job( + self, client: TestClient, session: Session, test_chat: Chat, monkeypatch + ): + """ + When LangFlow submission fails, the job is created with status=failed + and error_message is populated. + """ + from app.services.langflow.mock_client import MockLangflowClient + from app.services.langflow.client import LangflowError + from app.api.routes.v1 import chat_messages + + class SubmitErrorClient(MockLangflowClient): + async def submit_workflow(self, flow_id, inputs, session_id=None): + raise LangflowError("V2 workflow submission failed", status_code=500) + + monkeypatch.setattr( + chat_messages, "get_langflow_client", lambda: SubmitErrorClient() + ) + + response = client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "This will fail"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "failed" + assert "V2 workflow submission failed" in data["error_message"] + + def test_failed_job_preserves_messages( + self, client: TestClient, session: Session, test_chat: Chat, monkeypatch + ): + """Even when a job fails, user and assistant messages are preserved.""" + from app.services.langflow.mock_client import MockLangflowClient + from app.services.langflow.client import LangflowError + from app.api.routes.v1 import chat_messages + + class SubmitErrorClient(MockLangflowClient): + async def submit_workflow(self, flow_id, inputs, session_id=None): + raise LangflowError("Submit failed", status_code=500) + + monkeypatch.setattr( + chat_messages, "get_langflow_client", lambda: SubmitErrorClient() + ) + + client.post( + f"/api/v1/chats/{test_chat.id}/messages/job", + json={"content": "Error query"}, + ) + + # Messages should still exist + response = client.get(f"/api/v1/chats/{test_chat.id}/messages/") + assert response.status_code == 200 + messages = response.json() + assert messages["count"] >= 2 # user + placeholder assistant diff --git a/backend/uv.lock b/backend/uv.lock index da989bf..1b95ebe 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -380,18 +380,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" }, ] -[[package]] -name = "jinja2" -version = "3.1.6" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "markupsafe" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/df/bf/f7da0350254c0ed7c72f3e33cef02e048281fec7ecec5f032d4aac52226b/jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d", size = 245115, upload-time = "2025-03-05T20:05:02.478Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" }, -] - [[package]] name = "lia-web" version = "0.2.3" @@ -503,7 +491,6 @@ dependencies = [ { name = "psycopg", extra = ["binary"] }, { name = "pydantic-settings" }, { name = "python-dotenv" }, - { name = "sqladmin" }, { name = "sqlmodel" }, { name = "sse-starlette" }, { name = "strawberry-graphql", extra = ["fastapi"] }, @@ -538,7 +525,6 @@ requires-dist = [ { name = "pytest", marker = "extra == 'dev'", specifier = "==8.4.1" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = "==1.0.0" }, { name = "python-dotenv", specifier = "==1.1.1" }, - { name = "sqladmin", specifier = ">=0.22.0" }, { name = "sqlmodel", specifier = ">=0.0.21,<1.0.0" }, { name = "sse-starlette", specifier = ">=2.2.1" }, { name = "strawberry-graphql", extras = ["fastapi"], specifier = ">=0.245.0" }, @@ -904,22 +890,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" }, ] -[[package]] -name = "sqladmin" -version = "0.22.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "jinja2" }, - { name = "python-multipart" }, - { name = "sqlalchemy" }, - { name = "starlette" }, - { name = "wtforms" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/2c/ac/526bb3ff2dd94fbf8442bccb49ef40aa360045add19d4fbffcb43995e67a/sqladmin-0.22.0.tar.gz", hash = "sha256:4ea904d97e4d030edb68fb0681330b4d963f422442a64bee487fdc46119b3729", size = 1429937, upload-time = "2025-11-24T12:52:59.285Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/f2/b4/ab78c7d7b13bd3f90d6d8a106c5ad12bf7a738f89eb0241b24ad8efe5d1e/sqladmin-0.22.0-py3-none-any.whl", hash = "sha256:f2fb11165a70601a97f71956104b47da2c432db49b0d7966dc65e9e6343887d3", size = 1445514, upload-time = "2025-11-24T12:53:00.511Z" }, -] - [[package]] name = "sqlalchemy" version = "2.0.44" @@ -1233,15 +1203,3 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1b/6c/c65773d6cab416a64d191d6ee8a8b1c68a09970ea6909d16965d26bfed1e/websockets-15.0.1-cp313-cp313-win_amd64.whl", hash = "sha256:e09473f095a819042ecb2ab9465aee615bd9c2028e4ef7d933600a8401c79561", size = 176837, upload-time = "2025-03-05T20:02:55.237Z" }, { url = "https://files.pythonhosted.org/packages/fa/a8/5b41e0da817d64113292ab1f8247140aac61cbf6cfd085d6a0fa77f4984f/websockets-15.0.1-py3-none-any.whl", hash = "sha256:f7a866fbc1e97b5c617ee4116daaa09b722101d4a3c170c787450ba409f9736f", size = 169743, upload-time = "2025-03-05T20:03:39.41Z" }, ] - -[[package]] -name = "wtforms" -version = "3.1.2" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "markupsafe" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/6a/c7/96d10183c3470f1836846f7b9527d6cb0b6c2226ebca40f36fa29f23de60/wtforms-3.1.2.tar.gz", hash = "sha256:f8d76180d7239c94c6322f7990ae1216dae3659b7aa1cee94b6318bdffb474b9", size = 134705, upload-time = "2024-01-06T07:52:41.075Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/18/19/c3232f35e24dccfad372e9f341c4f3a1166ae7c66e4e1351a9467c921cc1/wtforms-3.1.2-py3-none-any.whl", hash = "sha256:bf831c042829c8cdbad74c27575098d541d039b1faa74c771545ecac916f2c07", size = 145961, upload-time = "2024-01-06T07:52:43.023Z" }, -] diff --git a/config/local/.env.example b/config/local/.env.example index eae274d..2e61489 100644 --- a/config/local/.env.example +++ b/config/local/.env.example @@ -42,6 +42,15 @@ GOOGLE_CLIENT_SECRET= LANGFLOW_URL=http://localhost:7860 LANGFLOW_DEFAULT_FLOW=enterprise-agent-multiuser +# LangFlow API Authentication +# Shared API key between backend and LangFlow container. +# LangFlow validates via LANGFLOW_API_KEY_SOURCE=env (set on the container). +LANGFLOW_API_KEY=dev-langflow-api-key +LANGFLOW_API_KEY_SOURCE=env + +# LangFlow V2 Workflow API (for background job execution) +LANGFLOW_DEVELOPER_API_ENABLED=true + # ======================================== # Database Credentials (optional - defaults work for local dev) # ======================================== diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index a21db48..c57559f 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -242,7 +242,7 @@ created_at TIMESTAMP |-----------|-------|------|-----------| | frontend | app-frontend | 8080 | 256Mi / 200m | | backend | app-backend | 8000 | 512Mi / 500m | -| langflow | langflowai/langflow:1.7.1 | 7860 | 2Gi / 1000m | +| langflow | langflowai/langflow:1.8.0.rc0 | 7860 | 2Gi / 1000m | | langfuse | langfuse/langfuse:latest | 3000 | 1Gi / 500m | | mlflow | ghcr.io/mlflow/mlflow:v2.16.0 | 5000 | 1Gi / 500m | | postgres | postgres:15-alpine | 5432 | 512Mi / 500m | diff --git a/frontend/src/app/Chat/Chat.css b/frontend/src/app/Chat/Chat.css index 97ca192..12c111d 100644 --- a/frontend/src/app/Chat/Chat.css +++ b/frontend/src/app/Chat/Chat.css @@ -4,3 +4,30 @@ right: 20px; z-index: 1000; } + +/* Cancel button positioning in the loading message */ +.pf-chatbot__job-cancel-area { + padding-top: var(--pf-t--global--spacer--xs, 4px); +} + +/* Tooltip wrapper for job status -- allows hover target on loading message */ +.pf-chatbot__job-tooltip-wrapper { + display: contents; +} + +/* Error details formatting */ +.pf-chatbot__error-details { + padding-top: var(--pf-t--global--spacer--sm, 8px); +} + +.pf-chatbot__error-pre { + font-size: var(--pf-t--global--font--size--xs, 0.75rem); + white-space: pre-wrap; + word-break: break-word; + max-height: 200px; + overflow-y: auto; + padding: var(--pf-t--global--spacer--sm, 8px); + background-color: var(--pf-t--global--background--color--secondary--default, #f0f0f0); + border-radius: var(--pf-t--global--border--radius--small, 3px); + margin: 0; +} diff --git a/frontend/src/app/Chat/Chat.test.tsx b/frontend/src/app/Chat/Chat.test.tsx index b531135..14e823c 100644 --- a/frontend/src/app/Chat/Chat.test.tsx +++ b/frontend/src/app/Chat/Chat.test.tsx @@ -5,6 +5,7 @@ import { Chat } from './Chat'; import { ChatAPI } from './chatApi'; import { BrowserRouter } from 'react-router-dom'; import { AppProvider } from '@app/contexts/AppContext'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; // Mock the ChatAPI vi.mock('./chatApi', () => ({ @@ -16,10 +17,20 @@ vi.mock('./chatApi', () => ({ updateChat: vi.fn(), deleteChat: vi.fn(), getMessages: vi.fn(), + createJobMessage: vi.fn(), + getJob: vi.fn(), + cancelJob: vi.fn(), + getActiveJob: vi.fn(), + // Legacy streaming kept for backward compatibility createStreamingMessage: vi.fn(), }, })); +// Mock the useJobPolling hook +vi.mock('./useJobPolling', () => ({ + useJobPolling: vi.fn().mockReturnValue({ data: null }), +})); + // Mock the image imports vi.mock('@app/images/user-avatar.svg', () => ({ default: 'user-avatar.svg' })); vi.mock('@app/images/ai-logo-transparent.svg', () => ({ default: 'ai-logo.svg' })); @@ -43,17 +54,42 @@ const MOCK_MESSAGES = [ { id: 2, chat_id: 1, content: 'Hi there!', role: 'assistant', created_at: '2024-01-01T10:00:01' }, ]; +const MOCK_JOB_RESPONSE = { + id: 1, + chat_message_id: 3, + langflow_job_id: 'lf-job-123', + flow_id: 'flow-1', + status: 'in_progress' as const, + error_message: null, + result_content: null, + started_at: '2024-01-01T10:00:00', + completed_at: null, + created_at: '2024-01-01T10:00:00', + updated_at: '2024-01-01T10:00:00', +}; + // ============================================================================= // Test Utilities // ============================================================================= +function createQueryClient() { + return new QueryClient({ + defaultOptions: { + queries: { retry: false }, + }, + }); +} + function renderChat() { + const queryClient = createQueryClient(); return render( - - - - - + + + + + + + ); } @@ -61,6 +97,7 @@ function setupDefaultMocks(): void { vi.mocked(ChatAPI.getChats).mockResolvedValue({ data: MOCK_CHATS, count: 2 }); vi.mocked(ChatAPI.getFlows).mockResolvedValue({ data: MOCK_FLOWS, count: 2 }); vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: MOCK_MESSAGES, count: 2 }); + vi.mocked(ChatAPI.getActiveJob).mockResolvedValue(null); } describe('Chat component', () => { @@ -135,20 +172,8 @@ describe('Chat component', () => { }); }); - test('should call createStreamingMessage when sending a message', async () => { - const mockClose = vi.fn(); - - vi.mocked(ChatAPI.createStreamingMessage).mockImplementation( - (_chatId, _content, onMessage, _onError, onComplete) => { - // Simulate immediate completion - setTimeout(() => { - onMessage({ type: 'content', content: 'Response' }); - onMessage({ type: 'done' }); - onComplete?.(); - }, 10); - return { close: mockClose }; - } - ); + test('should call createJobMessage when sending a message', async () => { + vi.mocked(ChatAPI.createJobMessage).mockResolvedValue(MOCK_JOB_RESPONSE); renderChat(); @@ -156,15 +181,14 @@ describe('Chat component', () => { expect(ChatAPI.getChats).toHaveBeenCalled(); }); - // Verify the streaming API is properly mocked - expect(ChatAPI.createStreamingMessage).toBeDefined(); + // Verify the job API is properly mocked + expect(ChatAPI.createJobMessage).toBeDefined(); }); - test('should provide close function for stopping streams', async () => { - const mockClose = vi.fn(); - - vi.mocked(ChatAPI.createStreamingMessage).mockImplementation(() => { - return { close: mockClose }; + test('should provide cancelJob function for stopping jobs', async () => { + vi.mocked(ChatAPI.cancelJob).mockResolvedValue({ + ...MOCK_JOB_RESPONSE, + status: 'cancelled', }); renderChat(); @@ -173,11 +197,9 @@ describe('Chat component', () => { expect(ChatAPI.getChats).toHaveBeenCalled(); }); - // Verify the streaming API returns a close function - const result = ChatAPI.createStreamingMessage(1, 'test', () => {}, () => {}, () => {}); - expect(result.close).toBeDefined(); - result.close(); - expect(mockClose).toHaveBeenCalled(); + // Verify the cancel API returns cancelled status + const result = await ChatAPI.cancelJob(1); + expect(result.status).toBe('cancelled'); }); test('should call deleteChat API when delete is triggered', async () => { @@ -207,16 +229,9 @@ describe('Chat component', () => { expect(screen.getByText('Research Assistant')).toBeVisible(); }); - test('should display actual error message from SSE error event', async () => { - vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: [], count: 0 }); - - vi.mocked(ChatAPI.createStreamingMessage).mockImplementation( - (_chatId, _content, onMessage) => { - setTimeout(() => { - onMessage({ type: 'error', error: 'Failed to connect to Langflow: Connection refused' }); - }, 10); - return { close: vi.fn() }; - } + test('should handle job creation failure', async () => { + vi.mocked(ChatAPI.createJobMessage).mockRejectedValue( + new Error('Failed to connect to Langflow: Connection refused') ); renderChat(); @@ -225,30 +240,18 @@ describe('Chat component', () => { expect(ChatAPI.getChats).toHaveBeenCalled(); }); - // Simulate sending by calling the streaming mock directly and checking the onMessage callback - const onMessage = vi.fn(); - ChatAPI.createStreamingMessage(1, 'test', onMessage); - - await waitFor(() => { - expect(onMessage).toHaveBeenCalledWith({ - type: 'error', - error: 'Failed to connect to Langflow: Connection refused', - }); - }); + // Verify the job API handles errors + await expect(ChatAPI.createJobMessage(1, 'test')).rejects.toThrow( + 'Failed to connect to Langflow: Connection refused' + ); }); - test('should pass partial content and error separately when error occurs mid-stream', async () => { - vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: [], count: 0 }); - - vi.mocked(ChatAPI.createStreamingMessage).mockImplementation( - (_chatId, _content, onMessage) => { - setTimeout(() => { - onMessage({ type: 'content', content: 'Here is the beginning' }); - onMessage({ type: 'error', error: 'Langflow streaming error: 500' }); - }, 10); - return { close: vi.fn() }; - } - ); + test('should handle immediate job failure status', async () => { + vi.mocked(ChatAPI.createJobMessage).mockResolvedValue({ + ...MOCK_JOB_RESPONSE, + status: 'failed', + error_message: 'No flow configured or found', + }); renderChat(); @@ -256,45 +259,32 @@ describe('Chat component', () => { expect(ChatAPI.getChats).toHaveBeenCalled(); }); - // Verify the streaming API delivers both content and error events - const events: Array<{ type: string; content?: string; error?: string }> = []; - ChatAPI.createStreamingMessage(1, 'test', (event) => events.push(event)); - - await waitFor(() => { - expect(events).toHaveLength(2); - expect(events[0]).toEqual({ type: 'content', content: 'Here is the beginning' }); - expect(events[1]).toEqual({ type: 'error', error: 'Langflow streaming error: 500' }); - }); + // Verify the API returns failed status + const result = await ChatAPI.createJobMessage(1, 'test'); + expect(result.status).toBe('failed'); + expect(result.error_message).toBe('No flow configured or found'); }); - test('should pass network error message through to error handler', async () => { - vi.mocked(ChatAPI.getMessages).mockResolvedValue({ data: [], count: 0 }); - - vi.mocked(ChatAPI.createStreamingMessage).mockImplementation( - (_chatId, _content, _onMessage, onError) => { - setTimeout(() => { - onError?.(new Error('Missing integration: jira. Please connect the service in Settings.')); - }, 10); - return { close: vi.fn() }; - } - ); + test('should check for active jobs on chat load for page refresh recovery', async () => { + vi.mocked(ChatAPI.getActiveJob).mockResolvedValue(null); renderChat(); await waitFor(() => { - expect(ChatAPI.getChats).toHaveBeenCalled(); + expect(ChatAPI.getActiveJob).toHaveBeenCalledWith(1); }); + }); - // Verify the onError callback receives the actual error message - const onError = vi.fn(); - ChatAPI.createStreamingMessage(1, 'test', vi.fn(), onError); + test('should recover in-flight job on page refresh', async () => { + vi.mocked(ChatAPI.getActiveJob).mockResolvedValue(MOCK_JOB_RESPONSE); + + renderChat(); await waitFor(() => { - expect(onError).toHaveBeenCalledWith( - expect.objectContaining({ - message: 'Missing integration: jira. Please connect the service in Settings.', - }) - ); + expect(ChatAPI.getActiveJob).toHaveBeenCalledWith(1); }); + + // The active job should be detected and polling should resume + expect(ChatAPI.getActiveJob).toHaveBeenCalled(); }); }); diff --git a/frontend/src/app/Chat/Chat.tsx b/frontend/src/app/Chat/Chat.tsx index 3e08679..e8092bd 100644 --- a/frontend/src/app/Chat/Chat.tsx +++ b/frontend/src/app/Chat/Chat.tsx @@ -7,9 +7,11 @@ import { Dropdown, DropdownItem, DropdownList, + ExpandableSection, MenuToggle, MenuToggleElement, PageSection, + Tooltip, } from '@patternfly/react-core'; import { Chatbot, @@ -31,7 +33,8 @@ import { } from '@patternfly/chatbot'; import { ArrowDownIcon, TrashIcon } from '@patternfly/react-icons'; -import { ChatAPI, Chat as ChatType, ChatMessage, StreamingEvent, Flow } from './chatApi'; +import { ChatAPI, Chat as ChatType, ChatMessage, Flow } from './chatApi'; +import { useJobPolling } from './useJobPolling'; import userAvatar from '@app/images/user-avatar.svg'; import aiLogo from '@app/images/ai-logo-transparent.svg'; @@ -53,6 +56,12 @@ function convertMessageToProps(msg: ChatMessage): MessageProps { }; } +/** Format elapsed seconds as human-readable string. */ +function formatElapsed(seconds: number): string { + if (seconds < 60) return `${seconds}s`; + return `${Math.floor(seconds / 60)}m ${seconds % 60}s`; +} + function Chat(): React.ReactElement { // Chat list state const [chats, setChats] = React.useState([]); @@ -80,12 +89,33 @@ function Chat(): React.ReactElement { // Operation error state (for displaying errors to user) const [operationError, setOperationError] = React.useState(null); + // Job state + const [activeJobId, setActiveJobId] = React.useState(null); + const [jobStartTime, setJobStartTime] = React.useState(null); + const [elapsedSeconds, setElapsedSeconds] = React.useState(0); + const loadingBotMessageIdRef = React.useRef(null); + const originalMessageTextRef = React.useRef(''); + + // Use the polling hook + const { data: jobData } = useJobPolling(activeJobId); + const historyRef = React.useRef(null); - const streamControllerRef = React.useRef<{ close: () => void } | null>(null); const messageBoxRef = React.useRef(null); const [userScrolledUp, setUserScrolledUp] = React.useState(false); const scrollDetectionTimeoutRef = React.useRef(null); + // Elapsed time timer + React.useEffect(() => { + if (!jobStartTime || !activeJobId) { + setElapsedSeconds(0); + return; + } + const interval = setInterval(() => { + setElapsedSeconds(Math.round((Date.now() - jobStartTime.getTime()) / 1000)); + }, 1000); + return () => clearInterval(interval); + }, [jobStartTime, activeJobId]); + // Scroll utility functions const scrollToBottom = React.useCallback(() => { if (messageBoxRef.current?.scrollToBottom) { @@ -140,7 +170,7 @@ function Chat(): React.ReactElement { } }; - // Load messages and restore flow when chat changes + // Load messages and check for active jobs when chat changes React.useEffect(() => { setLastError(null); setErrorMessages(new Map()); @@ -149,13 +179,17 @@ function Chat(): React.ReactElement { if (chat?.flow_name) { setSelectedFlowName(chat.flow_name); } - // Skip loading messages if we're currently sending — handleSend manages + // Skip loading messages if we're currently sending -- handleSend manages // messages directly and loadMessages would overwrite the loading indicator. if (!isSending) { loadMessages(selectedChatId); + // Page refresh recovery: check for active jobs + checkForActiveJob(selectedChatId); } } else { setMessages([]); + setActiveJobId(null); + setJobStartTime(null); } }, [selectedChatId]); @@ -191,6 +225,131 @@ function Chat(): React.ReactElement { } }; + /** + * Page refresh recovery: check if there's an active (non-terminal) job for this chat. + * If found, resume polling and show the typing indicator. + */ + const checkForActiveJob = async (chatId: number) => { + try { + const activeJob = await ChatAPI.getActiveJob(chatId); + if (activeJob) { + // Resume polling for the active job + setActiveJobId(activeJob.id); + setJobStartTime(activeJob.started_at ? new Date(activeJob.started_at) : new Date()); + setIsSending(true); + + // Find the placeholder assistant message and show it as loading + const botMsgId = `recovered-bot-${activeJob.chat_message_id}`; + loadingBotMessageIdRef.current = botMsgId; + + // Add a loading indicator for the recovered job + setMessages((prev) => { + // Check if the last message is the empty placeholder + const lastMsg = prev[prev.length - 1]; + if (lastMsg && lastMsg.role === 'bot' && !lastMsg.content) { + // Replace the empty placeholder with a loading indicator + return prev.map((msg, i) => + i === prev.length - 1 + ? { ...msg, id: botMsgId, isLoading: true } + : msg + ); + } + // Otherwise add a loading message + return [ + ...prev, + { + id: botMsgId, + role: 'bot' as const, + content: '', + name: 'Assistant', + avatar: aiLogo, + timestamp: new Date().toLocaleString(), + isLoading: true, + }, + ]; + }); + } + } catch { + // No active job -- expected for most chats + } + }; + + // React to job status changes + React.useEffect(() => { + if (!jobData || !activeJobId) return; + const botMessageId = loadingBotMessageIdRef.current; + if (!botMessageId) return; + + if (jobData.status === 'completed' && jobData.result_content) { + // Update the bot message with the result + setMessages((prev) => + prev.map((msg) => + msg.id === botMessageId + ? { ...msg, content: jobData.result_content!, isLoading: false } + : msg + ) + ); + // Reload messages to get server-side IDs + if (selectedChatId) { + loadMessages(selectedChatId); + } + setIsSending(false); + setActiveJobId(null); + setJobStartTime(null); + loadingBotMessageIdRef.current = null; + setAnnouncement(`Assistant: ${jobData.result_content}`); + + // Update chat title on first real message (only user+loading = length 2) + if (messages.length <= 2 && selectedChatId) { + const originalText = originalMessageTextRef.current; + if (originalText) { + const title = originalText.slice(0, 50) + (originalText.length > 50 ? '...' : ''); + ChatAPI.updateChat(selectedChatId, { title }).then(() => loadChats()); + const sendFlowName = isFlowLocked ? selectedChat?.flow_name : selectedFlowName; + if (sendFlowName) { + setChats((prev) => + prev.map((c) => + c.id === selectedChatId ? { ...c, flow_name: sendFlowName } : c + ) + ); + } + } + } + } else if (jobData.status === 'failed' || jobData.status === 'timed_out') { + const errorText = jobData.error_message || 'An unknown error occurred.'; + setMessages((prev) => + prev.map((msg) => + msg.id === botMessageId + ? { ...msg, content: '', isLoading: false } + : msg + ) + ); + setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText)); + setLastError({ + message: originalMessageTextRef.current, + chatId: selectedChatId!, + botMessageId, + errorText, + }); + setIsSending(false); + setActiveJobId(null); + setJobStartTime(null); + loadingBotMessageIdRef.current = null; + } else if (jobData.status === 'cancelled') { + setMessages((prev) => + prev.map((msg) => + msg.id === botMessageId + ? { ...msg, content: 'Request cancelled', isLoading: false } + : msg + ) + ); + setIsSending(false); + setActiveJobId(null); + setJobStartTime(null); + loadingBotMessageIdRef.current = null; + } + }, [jobData]); + const handleNewChat = async () => { setOperationError(null); try { @@ -261,7 +420,10 @@ function Chat(): React.ReactElement { setIsSending(true); setLastError(null); + setActiveJobId(null); + setJobStartTime(new Date()); setUserScrolledUp(false); + originalMessageTextRef.current = messageText; const timestamp = new Date().toLocaleString(); const isRetry = !!retryMessageText; @@ -278,6 +440,7 @@ function Chat(): React.ReactElement { // Add loading bot message const botMessageId = `bot-${Date.now()}`; + loadingBotMessageIdRef.current = botMessageId; const loadingBotMessage: MessageProps = { id: botMessageId, role: 'bot', @@ -308,102 +471,72 @@ function Chat(): React.ReactElement { setAnnouncement(`Message from You: ${messageText}. Assistant is thinking...`); setTimeout(() => scrollToBottom(), 50); - let accumulatedContent = ''; - - const streamController = ChatAPI.createStreamingMessage( - chatId, - messageText, - (event: StreamingEvent) => { - if (event.type === 'content' && event.content) { - accumulatedContent += event.content; - // Update the bot message with accumulated content - setMessages((prev) => - prev.map((msg) => - msg.id === botMessageId - ? { ...msg, content: accumulatedContent, isLoading: false } - : msg - ) - ); - } else if (event.type === 'done') { - streamControllerRef.current = null; - // Reload to get the saved message IDs - loadMessages(chatId); - setIsSending(false); - setAnnouncement(`Assistant: ${accumulatedContent}`); - - // Update chat title and lock flow on first message - if (messages.length === 0) { - const title = messageText.slice(0, 50) + (messageText.length > 50 ? '...' : ''); - ChatAPI.updateChat(chatId, { title }).then(() => loadChats()); - // Update local state so dropdown locks immediately - setChats((prev) => - prev.map((c) => - c.id === chatId ? { ...c, flow_name: sendFlowName } : c - ) - ); - } - } else if (event.type === 'error') { - const errorText = event.error || 'An unknown error occurred.'; - streamControllerRef.current = null; - setMessages((prev) => - prev.map((msg) => - msg.id === botMessageId - ? { ...msg, content: accumulatedContent || '', isLoading: false } - : msg - ) - ); - setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText)); - setLastError({ - message: messageText, - chatId: chatId, - botMessageId, - errorText, - }); - setIsSending(false); - } - }, - (err) => { - console.error('Streaming error:', err); - const errorText = err.message || 'An unknown error occurred.'; - streamControllerRef.current = null; + try { + const jobResponse = await ChatAPI.createJobMessage( + chatId, messageText, sendFlowName || undefined + ); + setActiveJobId(jobResponse.id); + // If the job already failed immediately (e.g., flow resolution error) + if (jobResponse.status === 'failed') { + const errorText = jobResponse.error_message || 'Failed to submit request.'; setMessages((prev) => prev.map((msg) => msg.id === botMessageId - ? { ...msg, content: accumulatedContent || '', isLoading: false } + ? { ...msg, content: '', isLoading: false } : msg ) ); setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText)); - setLastError({ - message: messageText, - chatId: chatId, - botMessageId, - errorText, - }); + setLastError({ message: messageText, chatId, botMessageId, errorText }); setIsSending(false); - }, - () => { - streamControllerRef.current = null; - setIsSending(false); - }, - sendFlowName || undefined - ); - - streamControllerRef.current = streamController; + setActiveJobId(null); + setJobStartTime(null); + loadingBotMessageIdRef.current = null; + } + } catch (err) { + // Handle immediate submission failure + console.error('Failed to create job:', err); + const errorText = err instanceof Error ? err.message : 'Failed to submit request.'; + setMessages((prev) => + prev.map((msg) => + msg.id === botMessageId + ? { ...msg, content: '', isLoading: false } + : msg + ) + ); + setErrorMessages((prev) => new Map(prev).set(botMessageId, errorText)); + setLastError({ message: messageText, chatId, botMessageId, errorText }); + setIsSending(false); + setActiveJobId(null); + setJobStartTime(null); + loadingBotMessageIdRef.current = null; + } }; - const handleStopStreaming = () => { - if (streamControllerRef.current) { - streamControllerRef.current.close(); - streamControllerRef.current = null; + const handleCancelJob = async () => { + if (activeJobId) { + try { + await ChatAPI.cancelJob(activeJobId); + // The useEffect watching jobData will handle the cancelled state on next poll + } catch (err) { + console.error('Failed to cancel job:', err); + // Force local cancellation state + const botMessageId = loadingBotMessageIdRef.current; + if (botMessageId) { + setMessages((prev) => + prev.map((msg) => + msg.id === botMessageId + ? { ...msg, content: 'Request cancelled', isLoading: false } + : msg + ) + ); + } + setIsSending(false); + setActiveJobId(null); + setJobStartTime(null); + loadingBotMessageIdRef.current = null; + } } - setIsSending(false); - // Update any loading message to show it was stopped - setMessages((prev) => - prev.map((msg) => - msg.isLoading ? { ...msg, content: msg.content || '(Stopped)', isLoading: false } : msg - ) - ); }; const handleRetry = () => { @@ -420,6 +553,14 @@ function Chat(): React.ReactElement { const effectiveFlowName = isFlowLocked ? selectedChat.flow_name : selectedFlowName; const isFlowAvailable = !!effectiveFlowName && flows.some((f) => f.name === effectiveFlowName); + // Derive job status text for tooltip + const jobStatusText = React.useMemo(() => { + if (!activeJobId || !jobData) return ''; + const statusLabel = jobData.status === 'in_progress' ? 'Running' : + jobData.status === 'pending' ? 'Pending' : jobData.status; + return `${statusLabel} - ${formatElapsed(elapsedSeconds)}`; + }, [activeJobId, jobData, elapsedSeconds]); + // Build conversations for the drawer const conversations: Conversation[] = chats.map((chat) => ({ id: chat.id.toString(), @@ -535,6 +676,7 @@ function Chat(): React.ReactElement { const hasPartialContent = hasError && !!message.content; const canRetry = hasError && lastError && lastError.chatId === selectedChatId; const showCopyAction = message.role === 'bot' && !message.isLoading && !hasError; + const isActiveJobMessage = message.isLoading && !!activeJobId; const retryLink = canRetry ? ( @@ -542,10 +684,53 @@ function Chat(): React.ReactElement { ) : undefined; - return ( + // Build extra content for active job loading messages (cancel button) + // and for error messages with expandable details + let extraContent: MessageProps['extraContent'] = undefined; + + if (isActiveJobMessage) { + // Cancel button inline in typing indicator message area + extraContent = { + afterMainContent: ( +
+ +
+ ), + }; + } else if (hasError) { + // Error display with expandable details and retry + extraContent = { + afterMainContent: ( +
+ + +
{errorText}
+
+
+ ), + }; + } + + // Wrap active job messages in a tooltip showing status + elapsed time + const messageElement = ( - ), - }, - } - : {})} + extraContent={extraContent} /> ); + + if (isActiveJobMessage && jobStatusText) { + return ( + +
+ {messageElement} +
+
+ ); + } + + return messageElement; })} @@ -597,7 +771,7 @@ function Chat(): React.ReactElement { onSendMessage={handleSend} isSendButtonDisabled={isSending || !isFlowAvailable} hasStopButton={isSending} - handleStopButton={handleStopStreaming} + handleStopButton={handleCancelJob} /> @@ -606,6 +780,6 @@ function Chat(): React.ReactElement { ); -}; +} export { Chat }; diff --git a/frontend/src/app/Chat/chatApi.ts b/frontend/src/app/Chat/chatApi.ts index fe4e4d9..8583d79 100644 --- a/frontend/src/app/Chat/chatApi.ts +++ b/frontend/src/app/Chat/chatApi.ts @@ -64,6 +64,29 @@ export interface FlowsResponse { default_flow?: string; } +// ============================================================================= +// Job Types +// ============================================================================= + +export type JobStatus = 'pending' | 'in_progress' | 'completed' | 'failed' | 'cancelled' | 'timed_out'; + +export interface JobResponse { + id: number; + chat_message_id: number; + langflow_job_id: string | null; + flow_id: string | null; + status: JobStatus; + error_message: string | null; + result_content: string | null; + started_at: string | null; + completed_at: string | null; + created_at: string; + updated_at: string; +} + +/** Terminal statuses -- polling stops when job reaches one of these. */ +export const TERMINAL_STATUSES: JobStatus[] = ['completed', 'failed', 'cancelled', 'timed_out']; + // ============================================================================= // Chat CRUD // ============================================================================= @@ -110,7 +133,66 @@ export const ChatAPI = { }, // =========================================================================== - // Streaming + // Jobs + // =========================================================================== + + /** + * Send a message and create a background job for AI processing. + * + * Returns the Job record. The frontend then polls GET /jobs/{id}?sync=true + * until the job reaches a terminal state. + */ + async createJobMessage(chatId: number, content: string, flowName?: string): Promise { + const body = flowName ? { content, flow_name: flowName } : { content }; + const response = await apiClient.post( + `${API_BASE}/${chatId}/messages/job`, + body + ); + return response.data; + }, + + /** + * Get job status, optionally syncing with LangFlow first. + * + * When sync=true, the backend polls LangFlow V2 API before returning. + */ + async getJob(jobId: number, sync: boolean = false): Promise { + const params = sync ? '?sync=true' : ''; + const response = await apiClient.get(`/v1/jobs/${jobId}${params}`); + return response.data; + }, + + /** + * Cancel a running job. + * + * Calls LangFlow V2 stop endpoint and updates job status to cancelled. + */ + async cancelJob(jobId: number): Promise { + const response = await apiClient.post(`/v1/jobs/${jobId}/cancel`); + return response.data; + }, + + /** + * Get the active (non-terminal) job for a chat, if any. + * + * Used for page refresh recovery -- checks if there's an in-flight job + * that the frontend should resume polling for. + * Returns null (via 404) if no active job exists. + */ + async getActiveJob(chatId: number): Promise { + try { + const response = await apiClient.get( + `${API_BASE}/${chatId}/active-job` + ); + return response.data; + } catch { + // 404 means no active job -- expected behavior + return null; + } + }, + + // =========================================================================== + // Streaming (legacy, kept for backward compatibility) // =========================================================================== /** diff --git a/frontend/src/app/Chat/useJobPolling.ts b/frontend/src/app/Chat/useJobPolling.ts new file mode 100644 index 0000000..e626140 --- /dev/null +++ b/frontend/src/app/Chat/useJobPolling.ts @@ -0,0 +1,31 @@ +/** + * TanStack Query hook for polling job status. + * + * Polls GET /jobs/{id}?sync=true every 5s while the job is active. + * Stops polling automatically when the job reaches a terminal state + * (completed, failed, cancelled, timed_out). + * + * Based on the tang-web-app useJobsByBranch pattern with refetchInterval callback. + */ + +import { useQuery } from '@tanstack/react-query'; +import { ChatAPI, JobResponse, TERMINAL_STATUSES } from './chatApi'; + +export function useJobPolling(jobId: number | null) { + return useQuery({ + queryKey: ['job', jobId], + queryFn: async () => { + // Always sync with LangFlow on poll (backend skips sync for terminal states) + return ChatAPI.getJob(jobId!, true); + }, + enabled: !!jobId, + refetchInterval: (query) => { + const job = query.state.data; + if (!job) return 5000; + const isTerminal = TERMINAL_STATUSES.includes(job.status); + return isTerminal ? false : 5000; + }, + // Just under 5s to prevent extra refetches between polls + staleTime: 4000, + }); +} diff --git a/helm/langflow/values-dev.yaml b/helm/langflow/values-dev.yaml index 56dfd62..3b43a14 100644 --- a/helm/langflow/values-dev.yaml +++ b/helm/langflow/values-dev.yaml @@ -31,7 +31,7 @@ langflow: # Custom image with redhat_agents baked in (built via GitLab CI) repository: quay.io/cfchase/agents-python-langflow imagePullPolicy: Always - tag: "latest" + tag: "1.8.0.rc0" service: type: ClusterIP port: 7860 @@ -87,6 +87,9 @@ langflow: value: "INFO" - name: LANGFLOW_UPDATE_STARTER_PROJECTS value: "false" + # V2 Workflow API (required for job-based execution) + - name: LANGFLOW_DEVELOPER_API_ENABLED + value: "true" # Custom image settings for redhat_agents - name: LANGFLOW_LAZY_LOAD_COMPONENTS value: "false" diff --git a/k8s/langflow/base/deployment.yaml b/k8s/langflow/base/deployment.yaml index d709974..4d77c97 100644 --- a/k8s/langflow/base/deployment.yaml +++ b/k8s/langflow/base/deployment.yaml @@ -48,7 +48,7 @@ spec: - ALL containers: - name: langflow - image: docker.io/langflowai/langflow:latest + image: docker.io/langflowai/langflow:1.8.0.rc0 imagePullPolicy: Always ports: - containerPort: 7860 @@ -84,6 +84,9 @@ spec: value: "INFO" - name: LANGFLOW_UPDATE_STARTER_PROJECTS value: "false" + # V2 Workflow API (required for job-based execution) + - name: LANGFLOW_DEVELOPER_API_ENABLED + value: "true" volumeMounts: # All persistent data uses single PVC with subPath - name: langflow-data diff --git a/k8s/langflow/base/kustomization.yaml b/k8s/langflow/base/kustomization.yaml index 7f30158..7f84df1 100644 --- a/k8s/langflow/base/kustomization.yaml +++ b/k8s/langflow/base/kustomization.yaml @@ -17,4 +17,4 @@ labels: images: - name: docker.io/langflowai/langflow - newTag: latest + newTag: "1.8.0.rc0" diff --git a/makefiles/services.mk b/makefiles/services.mk index 38c3177..555e707 100644 --- a/makefiles/services.mk +++ b/makefiles/services.mk @@ -21,7 +21,7 @@ langflow-logs: ## Show LangFlow logs @./scripts/dev-langflow.sh logs langflow-import: ## Import flows from configured sources into LangFlow - @uv run --with requests --with pyyaml python scripts/import_flows.py config/local/flow-sources.yaml + @set -a && . config/local/.env && set +a && uv run --with requests --with pyyaml python scripts/import_flows.py config/local/flow-sources.yaml langflow-import-cluster: ## Import flows into cluster LangFlow (via port-forward) @./scripts/langflow-import-cluster.sh $(or $(ENV),dev) diff --git a/scripts/dev-langflow.sh b/scripts/dev-langflow.sh index d88c341..ff2186f 100755 --- a/scripts/dev-langflow.sh +++ b/scripts/dev-langflow.sh @@ -14,7 +14,7 @@ source "$SCRIPT_DIR/lib/common.sh" init_container_tool || exit 1 # Configuration -LANGFLOW_VERSION="${LANGFLOW_VERSION:-latest}" +LANGFLOW_VERSION="${LANGFLOW_VERSION:-1.8.0.rc0}" LANGFLOW_IMAGE="${LANGFLOW_IMAGE:-docker.io/langflowai/langflow:${LANGFLOW_VERSION}}" CONTAINER_NAME="app-langflow-dev" LANGFLOW_PORT="${LANGFLOW_PORT:-7860}" @@ -95,6 +95,10 @@ case "$1" in -e LANGFUSE_HOST="${LANGFUSE_HOST:-http://${DB_HOST}:${LANGFUSE_WEB_PORT:-3000}}" -e TZ="${TZ:-UTC}" -e LANGFLOW_LAZY_LOAD_COMPONENTS=false + # V2 Workflow API (Phase 9: job model for long-running flows) + -e LANGFLOW_DEVELOPER_API_ENABLED=${LANGFLOW_DEVELOPER_API_ENABLED:-true} + ${LANGFLOW_API_KEY:+-e LANGFLOW_API_KEY=$LANGFLOW_API_KEY} + ${LANGFLOW_API_KEY_SOURCE:+-e LANGFLOW_API_KEY_SOURCE=$LANGFLOW_API_KEY_SOURCE} ) # Detect host DNS servers (needed for VPN-resolved internal hostnames) @@ -128,6 +132,7 @@ case "$1" in -e GRANITE_GUARDIAN_ENDPOINT="${GRANITE_GUARDIAN_ENDPOINT:-}" -e GRANITE_GUARDIAN_API_KEY="${GRANITE_GUARDIAN_API_KEY:-}" -e GRANITE_CA_BUNDLE="${GRANITE_CA_BUNDLE:-}" + ${LANGFLOW_API_KEY:+-e LANGFLOW_API_KEY=$LANGFLOW_API_KEY} ) CUSTOM_VOL_ARGS=( @@ -279,10 +284,13 @@ case "$1" in echo " LANGFLOW_DB - Database name (default: langflow)" echo "" echo "Custom image env vars (forwarded when LANGFLOW_IMAGE is set):" - echo " GOOGLE_CLOUD_PROJECT - GCP project for Vertex AI" - echo " GRANITE_GUARDIAN_ENDPOINT - Granite Guardian API endpoint" - echo " GRANITE_GUARDIAN_API_KEY - Granite Guardian API key" - echo " GRANITE_CA_BUNDLE - Custom CA bundle path" + echo " GOOGLE_CLOUD_PROJECT - GCP project for Vertex AI" + echo " GRANITE_GUARDIAN_ENDPOINT - Granite Guardian API endpoint" + echo " GRANITE_GUARDIAN_API_KEY - Granite Guardian API key" + echo " GRANITE_CA_BUNDLE - Custom CA bundle path" + echo " LANGFLOW_DEVELOPER_API_ENABLED - Enable V2 Workflow API (default: true)" + echo " LANGFLOW_API_KEY - Shared API key for backend-to-LangFlow auth" + echo " LANGFLOW_API_KEY_SOURCE - Set to 'env' to validate API key from env var" echo "" echo "Prerequisites:" echo " PostgreSQL must be running: make db-start" diff --git a/scripts/import_flows.py b/scripts/import_flows.py index 838f165..31fd727 100644 --- a/scripts/import_flows.py +++ b/scripts/import_flows.py @@ -8,9 +8,7 @@ Environment variables: LANGFLOW_URL - LangFlow API URL (default: http://localhost:7860) - LANGFLOW_USER - LangFlow username (default: dev@localhost.local) - LANGFLOW_PASSWORD - LangFlow password (default: devpassword123) - LANGFLOW_API_KEY - LangFlow API key (optional, overrides user/password) + LANGFLOW_API_KEY - LangFlow API key (required, validated via x-api-key header) FLOW_SOURCE_PATH - Simple mode: single local path to import GITHUB_FLOW_TOKEN - Token for private git repos @@ -38,8 +36,6 @@ PROJECT_ROOT = Path(__file__).parent.parent DEFAULT_CONFIG = PROJECT_ROOT / "config" / "local" / "flow-sources.yaml" LANGFLOW_URL = os.environ.get("LANGFLOW_URL", "http://localhost:7860") -LANGFLOW_USER = os.environ.get("LANGFLOW_USER", "dev@localhost.local") -LANGFLOW_PASSWORD = os.environ.get("LANGFLOW_PASSWORD", "devpassword123") CACHE_DIR = Path(os.environ.get("FLOW_CACHE_DIR", "/tmp/flow-cache")) # Component installation paths (relative to project root) @@ -67,8 +63,8 @@ "metadata.google.internal", # GCP metadata } -# Global access token -ACCESS_TOKEN: str | None = None +# Global API key for LangFlow authentication +API_KEY: str | None = os.environ.get("LANGFLOW_API_KEY") # Cache for project name -> ID lookups PROJECT_CACHE: dict[str, str] = {} @@ -539,36 +535,34 @@ def install_components(source: dict) -> bool: def authenticate() -> bool: - """Authenticate with LangFlow and get access token.""" - global ACCESS_TOKEN - - # Check for API key first - api_key = os.environ.get("LANGFLOW_API_KEY") - if api_key: - ACCESS_TOKEN = api_key - log_info("Using API key from LANGFLOW_API_KEY") - return True + """Verify LangFlow API key authentication. - log_info(f"Authenticating as {LANGFLOW_USER}...") + Uses the LANGFLOW_API_KEY env var with x-api-key header. + LangFlow must have LANGFLOW_API_KEY_SOURCE=env set to validate + API keys from environment variables. + """ + if not API_KEY: + log_error( + "LANGFLOW_API_KEY not set. " + "Set it in config/local/.env or export LANGFLOW_API_KEY=..." + ) + return False + + # Verify the API key works resp = request_with_retry( - "POST", - f"{LANGFLOW_URL}/api/v1/login", - data={"username": LANGFLOW_USER, "password": LANGFLOW_PASSWORD}, + "GET", + f"{LANGFLOW_URL}/api/v1/flows/", + headers={"x-api-key": API_KEY}, timeout=10, ) - if resp is None: - return False + if resp is not None and resp.ok: + log_info("Authentication successful (API key)") + return True - if resp.ok: - try: - data = resp.json() - ACCESS_TOKEN = data.get("access_token") - if ACCESS_TOKEN: - log_info("Authentication successful") - return True - except json.JSONDecodeError: - pass - log_error(f"Authentication failed: {resp.text[:200]}") + detail = "" + if resp is not None: + detail = f": {resp.text[:200]}" + log_error(f"API key authentication failed{detail}") return False @@ -579,8 +573,8 @@ def list_all_flows() -> list[dict] | None: Returns None on error. """ headers = {} - if ACCESS_TOKEN: - headers["Authorization"] = f"Bearer {ACCESS_TOKEN}" + if API_KEY: + headers["x-api-key"] = API_KEY resp = request_with_retry( "GET", @@ -610,8 +604,8 @@ def delete_flow(flow_id: str) -> bool: Returns True if deleted successfully. """ headers = {"Content-Type": "application/json"} - if ACCESS_TOKEN: - headers["Authorization"] = f"Bearer {ACCESS_TOKEN}" + if API_KEY: + headers["x-api-key"] = API_KEY resp = request_with_retry( "DELETE", @@ -650,8 +644,8 @@ def find_flow_by_name(flows: list[dict], name: str, project_id: str | None = Non def create_project(project_name: str) -> str | None: """Create a new project and return its ID.""" headers = {"Content-Type": "application/json"} - if ACCESS_TOKEN: - headers["Authorization"] = f"Bearer {ACCESS_TOKEN}" + if API_KEY: + headers["x-api-key"] = API_KEY resp = request_with_retry( "POST", @@ -684,8 +678,8 @@ def get_project_id(project_name: str, create_if_missing: bool = True) -> str | N return PROJECT_CACHE[project_name] headers = {} - if ACCESS_TOKEN: - headers["Authorization"] = f"Bearer {ACCESS_TOKEN}" + if API_KEY: + headers["x-api-key"] = API_KEY resp = request_with_retry( "GET", @@ -752,8 +746,8 @@ def import_flow_data( same project, it is deleted before importing the new version. """ headers = {"Content-Type": "application/json"} - if ACCESS_TOKEN: - headers["Authorization"] = f"Bearer {ACCESS_TOKEN}" + if API_KEY: + headers["x-api-key"] = API_KEY # Check for existing flow and delete if found flows = list_all_flows()