Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,17 @@ Core modules:
- `app/agent/executor.py`: SQL execution engine (pandas helpers retained)
- `app/agent/analysis.py`: single-pass narrative from query + steps
- `app/agent/graph.py`: LangGraph orchestration
- `app/api/routes.py`: API surface
- `app/api/routes.py`: Shared API surface (health, uploads, inspections, **stateless** `POST /analyze`)
- `app/api/chat_routes.py`: **Primary product** chat API (`POST /chat`, conversation history)
- `ui/`: React + Vite frontend

### API: primary chat vs stateless analyze

| Path | Role |
|------|------|
| **`POST /chat`** (with JWT) | **Product path:** persists conversations, messages, and inspection snapshots; use for the React app and any integrated client. |
| **`POST /analyze`** (no auth) | **Debug / manual testing:** same analytics engine, but **no persistence** and inspection data only in server memory until restart. Marked **deprecated** in OpenAPI; do not treat it as a peer to `/chat`. |

## Repo Structure

```text
Expand Down Expand Up @@ -121,14 +129,16 @@ API endpoints:
- `GET /sample-questions`
- `POST /uploads`
- `GET /inspections/{inspection_id}`
- `POST /analyze`
- **`POST /chat`** — **primary:** authenticated analysis turn; persists thread + inspection snapshot
- `GET /conversations`, `GET /conversations/{id}` — list/load chat history (authenticated)
- `POST /analyze` — **deprecated / debug only:** stateless run (see table above)
- `POST /auth/signup` — create user (SQLite), returns JWT
- `POST /auth/login` — issue JWT
- `GET /auth/me` — current user (`Authorization: Bearer <token>`)

**Database:** On API startup the app creates SQLite tables if needed (no separate migration step for this demo). By default the DB file is `planera.db` in the project root (same directory as `requirements.txt`). Override with `DATABASE_PATH` in `.env`. Add a strong `JWT_SECRET_KEY` before any shared deployment; the repo default is for local dev only.

Example request:
Example (debug — stateless; prefer `/chat` with a JWT for real usage):

```bash
curl -X POST http://localhost:8000/analyze \
Expand Down
207 changes: 207 additions & 0 deletions app/api/chat_routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
"""User-scoped conversation and authenticated chat orchestration.

This module defines the **primary product HTTP API** for analysis turns: ``POST /chat`` persists
messages and inspection snapshots. Stateless ``POST /analyze`` (see ``app.api.routes``) exists only
for debugging and is not a substitute for these routes.
"""

from __future__ import annotations

from datetime import datetime, timezone

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.orm import Session

from app.auth.deps import get_current_user
from app.config import get_settings
from app.db.session import get_db
from app.models.conversation import Conversation, Message
from app.models.user import User
from app.schemas import (
ChatAssistantMessagePublic,
ChatSubmitRequest,
ChatTurnResponse,
ConversationDetailResponse,
ConversationPublic,
ConversationsListResponse,
ConversationSummary,
MessagePublic,
)
from app.services.analysis_run import run_stored_analysis
from app.services.inspection_persistence import save_inspection_for_assistant_message
from app.utils.logging import get_logger


router = APIRouter(tags=["chat"])
logger = get_logger(__name__)


def _conversation_title_from_query(query: str, max_len: int = 56) -> str:
compact = " ".join(query.split())
if len(compact) <= max_len:
return compact
return f"{compact[: max_len - 3]}..."


def _preview_text(text: str | None, max_len: int = 120) -> str | None:
if text is None:
return None
single_line = " ".join(text.split())
if len(single_line) <= max_len:
return single_line
return f"{single_line[: max_len - 1]}…"


@router.get("/conversations", response_model=ConversationsListResponse)
def list_conversations(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> ConversationsListResponse:
last_id_sq = (
select(Message.conversation_id.label("cid"), func.max(Message.id).label("mid"))
.group_by(Message.conversation_id)
.subquery()
)
stmt = (
select(Conversation, Message.content)
.outerjoin(last_id_sq, last_id_sq.c.cid == Conversation.id)
.outerjoin(Message, Message.id == last_id_sq.c.mid)
.where(Conversation.user_id == current_user.id)
.order_by(Conversation.updated_at.desc(), Conversation.id.desc())
)
rows = db.execute(stmt).all()
items = [
ConversationSummary(
id=conv.id,
title=conv.title,
updated_at=conv.updated_at,
last_message_preview=_preview_text(last_content),
)
for conv, last_content in rows
]
return ConversationsListResponse(conversations=items)


@router.get("/conversations/{conversation_id}", response_model=ConversationDetailResponse)
def get_conversation(
conversation_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> ConversationDetailResponse:
conv = db.get(Conversation, conversation_id)
if conv is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"message": "Conversation not found."})
if conv.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail={"message": "Not allowed to access this conversation."})

msg_rows = db.execute(
select(Message)
.where(Message.conversation_id == conv.id)
.order_by(Message.created_at.asc(), Message.id.asc())
).scalars()

return ConversationDetailResponse(
conversation=ConversationPublic.model_validate(conv),
messages=[MessagePublic.model_validate(m) for m in msg_rows],
)


@router.post(
"/chat",
response_model=ChatTurnResponse,
summary="Submit one chat turn (primary product path)",
description=(
"Authenticated endpoint: saves the user message, runs the analytics workflow, stores the "
"assistant reply and links an inspection snapshot when present. "
"Use this for all normal app traffic instead of ``POST /analyze``."
),
)
def chat_turn(
body: ChatSubmitRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> ChatTurnResponse:
now = datetime.now(timezone.utc)
if body.conversation_id is None:
conv = Conversation(
user_id=current_user.id,
title=_conversation_title_from_query(body.query),
created_at=now,
updated_at=now,
)
db.add(conv)
db.flush()
else:
conv = db.get(Conversation, body.conversation_id)
if conv is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"message": "Conversation not found."})
if conv.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={"message": "Not allowed to access this conversation."},
)

user_msg = Message(conversation_id=conv.id, role="user", content=body.query, created_at=now)
db.add(user_msg)
db.flush()

try:
analysis_run = run_stored_analysis(body.query)
analysis_result = analysis_run.response
inspection_payload = analysis_run.inspection
except Exception as exc: # pragma: no cover - defensive API fallback
db.rollback()
logger.exception("Chat analyze failed", extra={"query": body.query, "conversation_id": conv.id})
settings = get_settings()
detail: dict[str, str] = {
"message": "The analysis could not complete successfully. Inspect the server logs and retry.",
}
if settings.app_env.lower() != "production":
detail["error"] = str(exc)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=detail) from exc

assistant_meta: dict = {
"trace": [e.model_dump() for e in analysis_result.trace],
"executed_steps": [s.model_dump() for s in analysis_result.executed_steps],
"errors": [err.model_dump() for err in analysis_result.errors],
"inspection_id": analysis_result.inspection_id,
}
assistant_msg = Message(
conversation_id=conv.id,
role="assistant",
content=analysis_result.analysis,
metadata_json=assistant_meta,
created_at=datetime.now(timezone.utc),
)
db.add(assistant_msg)
db.flush()
if analysis_result.inspection_id:
save_inspection_for_assistant_message(
db,
inspection_id=analysis_result.inspection_id,
payload=inspection_payload,
conversation_id=conv.id,
message_id=assistant_msg.id,
)
conv.updated_at = assistant_msg.created_at
db.commit()
db.refresh(conv)
db.refresh(assistant_msg)

return ChatTurnResponse(
conversation=ConversationPublic.model_validate(conv),
assistant_message=ChatAssistantMessagePublic(
id=assistant_msg.id,
role="assistant",
content=assistant_msg.content,
created_at=assistant_msg.created_at,
status="ready",
metadata_json=assistant_msg.metadata_json,
),
analysis=analysis_result.analysis,
trace=analysis_result.trace,
executed_steps=analysis_result.executed_steps,
errors=analysis_result.errors,
inspection_id=analysis_result.inspection_id,
)
78 changes: 55 additions & 23 deletions app/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

from __future__ import annotations

from fastapi import APIRouter, File, HTTPException, UploadFile, status
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
from sqlalchemy.orm import Session

from app.agent.graph import run_analysis
from app.api.workspace import get_inspection, profile_upload, store_inspection
from app.api.workspace import get_inspection, profile_upload
from app.auth.deps import get_current_user_optional
from app.config import get_settings
from app.schemas import AnalyzeRequest, AnalyzeResponse, HealthResponse, InspectionResponse, SampleQuestionsResponse, UploadResponse
from app.db.session import get_db
from app.models.conversation import Conversation
from app.models.inspection_snapshot import InspectionSnapshot
from app.models.user import User
from app.schemas import AnalyzeRequest, AnalyzeResponse, HealthResponse, InspectionData, InspectionResponse, SampleQuestionsResponse, UploadResponse
from app.services.analysis_run import run_stored_analysis
from app.utils.constants import SAMPLE_QUESTIONS
from app.utils.logging import get_logger

Expand Down Expand Up @@ -44,35 +50,61 @@ async def upload_dataset(file: UploadFile = File(...)) -> UploadResponse:


@router.get("/inspections/{inspection_id}", response_model=InspectionResponse)
def inspection_details(inspection_id: str) -> InspectionResponse:
"""Return a stored inspection payload for the requested analysis."""
def inspection_details(
inspection_id: str,
db: Session = Depends(get_db),
current_user: User | None = Depends(get_current_user_optional),
) -> InspectionResponse:
"""Return inspection detail.

Prefer rows loaded from ``inspection_snapshots`` (written by ``POST /chat``); those require auth.
Otherwise falls back to the in-memory store populated by a stateless ``POST /analyze`` run in
the same server process (debug/demo).
"""

row = db.get(InspectionSnapshot, inspection_id)
if row is not None:
if current_user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={"message": "Authentication required to view this inspection."},
headers={"WWW-Authenticate": "Bearer"},
)
conv = db.get(Conversation, row.conversation_id)
if conv is None or conv.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={"message": "Not allowed to view this inspection."},
)
inspection = InspectionData.model_validate(row.payload_json)
return InspectionResponse(inspection=inspection, fallback=False)

inspection = get_inspection(inspection_id)
if inspection is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"message": "Inspection not found."})
return InspectionResponse(inspection=inspection, fallback=False)


@router.post("/analyze", response_model=AnalyzeResponse)
@router.post(
"/analyze",
response_model=AnalyzeResponse,
tags=["debug"],
deprecated=True,
summary="Stateless analysis (debug / manual testing only)",
description=(
"**Not the primary product API.** For normal use, authenticated clients should call "
"`POST /chat`, which persists conversations, messages, and inspection snapshots.\n\n"
"This endpoint runs the same analytics pipeline without auth, without database persistence, "
"and keeps the inspection payload only in process memory (lost on restart). Use it for "
"local debugging, Swagger/Postman checks, and quick stateless demos.\n\n"
"Response shape aligns with the analysis/trace/steps portion of `POST /chat`."
),
)
def analyze(request: AnalyzeRequest) -> AnalyzeResponse:
"""Execute the analytics workflow for a user query."""
"""Run analytics without persistence (see OpenAPI ``description`` — prefer ``POST /chat`` for product flows)."""

try:
state = run_analysis(request.query)
base_response = AnalyzeResponse(
analysis=state["analysis"],
trace=state.get("trace", []),
executed_steps=state.get("executed_steps", []),
errors=state.get("errors", []),
)
inspection_id = store_inspection(request.query, base_response)
return AnalyzeResponse(
analysis=base_response.analysis,
trace=base_response.trace,
executed_steps=base_response.executed_steps,
errors=base_response.errors,
inspection_id=inspection_id,
)
return run_stored_analysis(request.query).response
except Exception as exc: # pragma: no cover - defensive API fallback
logger.exception("Analyze request failed", extra={"query": request.query})
settings = get_settings()
Expand Down
6 changes: 3 additions & 3 deletions app/api/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ def profile_upload(filename: str, content: bytes) -> UploadedAsset:
return asset


def store_inspection(prompt: str, response: AnalyzeResponse) -> str:
"""Build and store an inspection payload for later retrieval."""
def store_inspection(prompt: str, response: AnalyzeResponse) -> tuple[str, InspectionData]:
"""Build and store an inspection payload in memory for same-session retrieval."""

inspection = _build_inspection(_short_id("inspect"), prompt, response)
with _STORE_LOCK:
_INSPECTIONS[inspection.id] = inspection
return inspection.id
return inspection.id, inspection


def get_inspection(inspection_id: str) -> InspectionData | None:
Expand Down
21 changes: 21 additions & 0 deletions app/auth/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,24 @@ def get_current_user(
headers={"WWW-Authenticate": "Bearer"},
)
return user


def get_current_user_optional(
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
db: Session = Depends(get_db),
) -> User | None:
"""Same as `get_current_user` but returns None when unauthenticated (no 401)."""

if creds is None or creds.scheme.lower() != "bearer":
return None
try:
payload = decode_access_token(creds.credentials)
user_id_str: str | None = payload.get("sub")
if user_id_str is None:
return None
user_id = int(user_id_str)
except Exception:
return None

user = db.get(User, user_id)
return user
Loading
Loading