Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions backend/app/alembic/versions/a1b2c3d4e5f6_add_job_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""add job table

Revision ID: a1b2c3d4e5f6
Revises: 146945cf3865
Create Date: 2026-02-27 00:10:00.000000

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes


# revision identifiers, used by Alembic.
revision = 'a1b2c3d4e5f6'
down_revision = '146945cf3865'
branch_labels = None
depends_on = None


def upgrade():
op.create_table('job',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('chat_message_id', sa.Integer(), nullable=False),
sa.Column('langflow_job_id', sqlmodel.sql.sqltypes.AutoString(length=255), nullable=True),
sa.Column('flow_id', sqlmodel.sql.sqltypes.AutoString(length=255), nullable=True),
sa.Column('status', sqlmodel.sql.sqltypes.AutoString(length=50), nullable=False),
sa.Column('error_message', sa.Text(), nullable=True),
sa.Column('result_content', sa.Text(), nullable=True),
sa.Column('started_at', sa.DateTime(), nullable=True),
sa.Column('completed_at', sa.DateTime(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(['chat_message_id'], ['chat_message.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_job_chat_message_id'), 'job', ['chat_message_id'], unique=False)
op.create_index(op.f('ix_job_langflow_job_id'), 'job', ['langflow_job_id'], unique=False)
op.create_index(op.f('ix_job_status'), 'job', ['status'], unique=False)


def downgrade():
op.drop_index(op.f('ix_job_status'), table_name='job')
op.drop_index(op.f('ix_job_langflow_job_id'), table_name='job')
op.drop_index(op.f('ix_job_chat_message_id'), table_name='job')
op.drop_table('job')
113 changes: 113 additions & 0 deletions backend/app/api/routes/v1/chat_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
ChatMessageCreate,
ChatMessagePublic,
ChatMessagesPublic,
Job,
JobPublic,
JobStatus,
Message,
)
from app.core.config import settings
Expand Down Expand Up @@ -131,6 +134,116 @@ def create_message(
return message


class JobMessageRequest(BaseModel):
"""Request body for job-based message endpoint."""
content: str
flow_id: str | None = None
flow_name: str | None = None


@router.post("/job", response_model=JobPublic)
async def create_job_message(
*,
session: SessionDep,
current_user: CurrentUser,
chat_id: int,
request: JobMessageRequest,
) -> Any:
"""
Send a message and create a background job for AI processing.

This endpoint:
1. Saves the user message to the database
2. Creates a placeholder assistant message (filled on job completion)
3. Creates a Job record (status=pending)
4. Submits to LangFlow V2 with background=true
5. Updates Job with langflow_job_id and status=in_progress
6. Returns JobPublic response (includes job id for frontend polling)
"""
# Verify chat exists and user has access
chat = get_chat_with_permission(session, current_user, chat_id)
flow_name = request.flow_name or settings.LANGFLOW_DEFAULT_FLOW

# Save user message
user_message = ChatMessage(
chat_id=chat_id,
content=request.content,
role="user",
)
session.add(user_message)

# Create placeholder assistant message (content filled on job completion)
assistant_message = ChatMessage(
chat_id=chat_id,
content="",
role="assistant",
)
session.add(assistant_message)

# Update chat metadata
chat.updated_at = datetime.now(timezone.utc)
if not chat.flow_name and flow_name:
chat.flow_name = flow_name
session.add(chat)
session.commit()
session.refresh(user_message)
session.refresh(assistant_message)

# Resolve flow ID
client = get_langflow_client()
resolved_flow_id = await client.resolve_flow_id(
flow_id=request.flow_id, flow_name=flow_name
)
if not resolved_flow_id:
raise HTTPException(status_code=400, detail="No flow configured or found")

# Create job record
job = Job(
chat_message_id=assistant_message.id,
flow_id=resolved_flow_id,
status=JobStatus.PENDING.value,
)
session.add(job)
session.commit()
session.refresh(job)

# Build tweaks and submit to LangFlow V2
user_data = await build_user_settings_data(
session=session, user_id=current_user.id
)
app_data = build_app_settings_data()
tweaks = build_generic_tweaks(user_data=user_data, app_data=app_data)

try:
v2_inputs = client.build_v2_inputs(
message=request.content,
session_id=str(chat_id),
tweaks=tweaks,
)
lf_response = await client.submit_workflow(
flow_id=resolved_flow_id,
inputs=v2_inputs,
session_id=str(chat_id),
)
now = datetime.now(timezone.utc)
job.langflow_job_id = lf_response.get("job_id")
job.status = JobStatus.IN_PROGRESS.value
job.started_at = now
job.updated_at = now
except Exception as e:
logger.error(f"Failed to submit workflow: {e}")
now = datetime.now(timezone.utc)
job.status = JobStatus.FAILED.value
job.error_message = str(e)
job.completed_at = now
job.updated_at = now

session.add(job)
session.commit()
session.refresh(job)
return job


@router.delete("/{message_id}")
def delete_message(
session: SessionDep,
Expand Down
37 changes: 37 additions & 0 deletions backend/app/api/routes/v1/chats.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Create new chat
- Update chat
- Delete chat
- Get active job for a chat (page refresh recovery)
"""

from datetime import datetime, timezone
Expand All @@ -19,10 +20,15 @@
from app.models import (
Chat,
ChatCreate,
ChatMessage,
ChatPublic,
ChatsPublic,
ChatUpdate,
Job,
JobPublic,
JobStatus,
Message,
TERMINAL_STATUSES,
)

router = APIRouter(prefix="/chats", tags=["chats"])
Expand Down Expand Up @@ -119,6 +125,37 @@ def update_chat(
return chat


@router.get("/{id}/active-job", response_model=JobPublic)
def get_active_job(
session: SessionDep, current_user: CurrentUser, id: int
) -> Any:
"""
Get the active (non-terminal) job for a chat, if any.

Used for page refresh recovery -- checks if there's an in-flight job
that the frontend should resume polling for.

Returns 404 if no active job exists.
"""
chat = get_chat_with_permission(session, current_user, id)

# Find the latest non-terminal job for any message in this chat
terminal_values = {s.value for s in TERMINAL_STATUSES}
statement = (
select(Job)
.join(ChatMessage, Job.chat_message_id == ChatMessage.id)
.where(ChatMessage.chat_id == chat.id)
.where(Job.status.notin_(terminal_values))
.order_by(Job.id.desc())
.limit(1)
)
job = session.exec(statement).first()
if not job:
raise HTTPException(status_code=404, detail="No active job found")

return job


@router.delete("/{id}")
def delete_chat(session: SessionDep, current_user: CurrentUser, id: int) -> Message:
"""
Expand Down
Loading