Skip to content

feat: replace SSE streaming with job-based LangFlow V2 workflow#14

Closed
cfchase wants to merge 11 commits into
mainfrom
feature/langflow-jobs
Closed

feat: replace SSE streaming with job-based LangFlow V2 workflow#14
cfchase wants to merge 11 commits into
mainfrom
feature/langflow-jobs

Conversation

@cfchase
Copy link
Copy Markdown
Owner

@cfchase cfchase commented Mar 2, 2026

Summary

  • Replace SSE streaming with LangFlow V2 Workflow API's job-based async execution model
  • Add Job model and API endpoints for tracking long-running LangFlow executions
  • Frontend polls job status instead of consuming SSE streams
  • Upgrade LangFlow to 1.8.0 with V2 API support
  • Add integration tests for the full job lifecycle

Test plan

  • Run make test-backend to verify all backend tests pass
  • Run make test-frontend to verify frontend tests pass
  • Manual test: send a chat message and verify job polling completes
  • Verify job cancellation works via the stop button

🤖 Generated with Claude Code

- Job model with JobStatus enum and TERMINAL_STATUSES lifecycle tracking
- LangflowClient V2 methods: submit_workflow, get_workflow_status, stop_workflow, build_v2_inputs
- MockLangflowClient with matching V2 mock methods and resolve_flow_id
- LangflowClientProtocol updated with V2 method signatures
- ChatMessage reverse relationship to Job
- Alembic migration for job table with indexes
…e creation

- GET /v1/jobs/{id} with optional ?sync=true for LangFlow polling
- POST /v1/jobs/{id}/cancel to stop running jobs via V2 API
- POST /v1/chats/{id}/messages/job for background job-based AI processing
- Status sync maps LangFlow V2 statuses to our job lifecycle
- Ownership verification traverses job -> chat_message -> chat -> user
- Existing /stream endpoint preserved for backward compatibility
- Tests for job CRUD, sync, cancel, ownership, and message creation
- Add JobResponse type, TERMINAL_STATUSES, createJobMessage, getJob, cancelJob, getActiveJob to chatApi.ts
- Create useJobPolling hook with TanStack Query refetchInterval pattern (5s polling, stops on terminal)
- Add GET /chats/{id}/active-job backend endpoint for page refresh recovery
- handleSend uses createJobMessage instead of createStreamingMessage
- useJobPolling drives message lifecycle (loading -> completed/failed/cancelled)
- Typing indicator with animated dots via PatternFly Message isLoading prop
- Cancel button inline in typing indicator message area
- Tooltip shows status + elapsed time on hover
- Errors display as inline messages with expandable details and retry
- Page refresh recovery checks for active jobs on chat load
- Chat input disabled while job is in progress
- Request cancelled state preserved in chat history
- All 23 frontend tests pass with job-based flow
- Forward LANGFLOW_DEVELOPER_API_ENABLED and LANGFLOW_API_KEY in dev-langflow.sh
- Document V2 Workflow API variables in .env.example
- Add E2E job lifecycle tests (happy path, cancel, page refresh, error)
- Fix ChatMessageBase min_length=0 to allow placeholder assistant messages
- build_v2_inputs: use V2 flat format ("Chat Input.input_value")
  instead of V1-style dict (input_value, input_type, output_type)
- submit_workflow: wrap inputs as "inputs" key instead of spreading
- extract_result_text: parse V2 output structure (dict with component
  names as keys) instead of V1 nested arrays
- Update mock client to match V2 format for tests
- Pin all deployment configs to langflowai/langflow:1.8.0
- Add LANGFLOW_DEVELOPER_API_ENABLED=true to k8s, helm, and dev script
- Move LANGFLOW_DEVELOPER_API_ENABLED and LANGFLOW_API_KEY to
  COMMON_ENV_ARGS in dev-langflow.sh (works with both stock and
  custom images)
- Default LANGFLOW_VERSION to 1.8.0 in dev script
@cfchase
Copy link
Copy Markdown
Owner Author

cfchase commented Mar 2, 2026

Code review

Found 1 issue:

  1. Missing ondelete="CASCADE" on the Job.chat_message_id foreign key. The backend/CLAUDE.md documents cascade rules: "Join Tables: Use ondelete="CASCADE" on foreign keys" and "One-to-Many: CASCADE to owned children". Every other foreign key in the codebase follows this pattern (ChatMessage.chat_id, Chat.user_id, Item.owner_id, UserIntegration.user_id), but Job.chat_message_id omits it. Without CASCADE, deleting a ChatMessage will either raise a foreign key constraint violation or leave orphaned Job records. The migration file also needs the corresponding ondelete='CASCADE' on its ForeignKeyConstraint.

id: int | None = Field(default=None, primary_key=True)
chat_message_id: int = Field(
foreign_key="chat_message.id", nullable=False, index=True
)

sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(['chat_message_id'], ['chat_message.id'], ),
sa.PrimaryKeyConstraint('id')

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

cfchase added 3 commits March 2, 2026 11:42
Without CASCADE, deleting a ChatMessage leaves orphaned Job records
or raises a foreign key constraint violation. Aligns with the cascade
pattern used by all other foreign keys in the codebase.
Switch from auto_login/Bearer auth to x-api-key with
LANGFLOW_API_KEY_SOURCE=env for LangFlow 1.8.0 compatibility.
Fix job sync getting stuck in_progress when LangFlow V2 returns
HTTP 500 for failed jobs (JOB_FAILED code).
When a V2 workflow job completes, the result was stored in
job.result_content but never written back to the associated
ChatMessage.content field. The frontend reads messages from the
messages API, so responses appeared as empty bubbles.
@cfchase
Copy link
Copy Markdown
Owner Author

cfchase commented Mar 3, 2026

Closed: changes were squash merged directly to main in db8b39a.

@cfchase cfchase closed this Mar 3, 2026
@cfchase cfchase deleted the feature/langflow-jobs branch March 3, 2026 15:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant