An AI-powered travel itinerary planning service for Australian destinations. The system uses a structured Agno workflow that combines CAG (Cache-Augmented Generation for intent classification) and RAG (Retrieval-Augmented Generation for factual enrichment), with database-driven prompt templates and full SSE streaming.
POST /generate
│
▼
FastAPI Service
│ creates session_id, validates request
▼
Agno Workflow
1. intake — validate + normalise input fields
2. cag_match — embed intent, cosine match against itinerary_categories
3. retrieval_decision — confidence >= 0.85 → skip RAG, else retrieve
4. rag_retrieve — pgvector top-5 similarity search on source_documents
5. generate — resolve DB prompt template, call LLM
6. validate_output — Pydantic schema check, retry once on failure
│
▼
SSE stream (one structured event per step)
Each SSE event has the form:
{
"session_id": "...",
"step": "cag_match",
"status": "completed",
"timestamp": "...",
"data": {},
"error": null
}| Layer | Technology |
|---|---|
| API | FastAPI + sse-starlette |
| Workflow | Agno (Step, Condition) |
| Knowledge base | Agno Knowledge + PgVector (travel_docs table) |
| Database | PostgreSQL 16 + pgvector |
| ORM / migrations | SQLAlchemy 2 + Alembic |
| LLM | Gemini 2.0 Flash (google-genai) |
| Embeddings | gemini-embedding-001 (Google, 768 dims) |
| Validation | Pydantic v2 |
- Python 3.11+
- Docker Desktop
1. Clone and copy env file
git clone <repo-url>
cd Agno_Travel_Agent
cp .env.example .env
# fill in GEMINI_API_KEY in .env2. Install dependencies
python scripts/dev.py install3. Start PostgreSQL
python scripts/dev.py up4. Apply migrations
python scripts/dev.py migrate5. Seed the database
python scripts/dev.py seed6. Start the app
PYTHONPATH=src uvicorn app.main:app --reload # macOS / Linux
$env:PYTHONPATH="src"; uvicorn app.main:app --reload # Windows PowerShellAll variables are documented in .env.example.
| Variable | Purpose |
|---|---|
POSTGRES_USER |
PostgreSQL username |
POSTGRES_PASSWORD |
PostgreSQL password |
POSTGRES_DB |
Database name |
POSTGRES_PORT |
Host port to expose (default 5432) |
DATABASE_URL |
Full SQLAlchemy / Alembic DSN |
GEMINI_API_KEY |
Google Gemini API key |
LLM_MODEL |
Gemini model for generation (e.g. gemini-2.0-flash) |
EMBEDDING_MODEL |
Embedding model — must match seed and cag_match (default: gemini-embedding-001) |
APP_HOST |
Uvicorn bind host |
APP_PORT |
Uvicorn bind port |
All commands work on Windows and Unix via the Python task runner:
python scripts/dev.py up # start PostgreSQL
python scripts/dev.py down # stop PostgreSQL
python scripts/dev.py install # pip install -e ".[dev]"
python scripts/dev.py migrate # alembic upgrade head
python scripts/dev.py seed # seed categories, prompt templates, source docs
python scripts/dev.py count-rows # print row counts + embedding coverage for seeded tables
python scripts/dev.py check-health # GET /health → 200
python scripts/dev.py test # pytest
python scripts/dev.py verify # up → migrate → uvicorn → health + pytest
python scripts/dev.py verify-schema # up → migrate → db-schema tests (16 checks)On macOS/Linux you can also use make <target> as a shorthand.
The app serves a single-page frontend at GET / from src/app/static/index.html — plain HTML + vanilla JS, no build step required.
# Windows PowerShell
$env:PYTHONPATH="src"; python.exe -m uvicorn app.main:app --reload
# macOS / Linux
PYTHONPATH=src uvicorn app.main:app --reloadThen open http://localhost:8000 in your browser.
1. Use a golden preset button
Three preset buttons pre-fill the form with the same inputs used in test_golden.py:
| Button | Destination | Days | Budget | Style | Expected path |
|---|---|---|---|---|---|
| Family trip | Sydney | 5 | moderate | family | High-confidence → no RAG |
| Budget trip | Melbourne | 3 | budget | budget | High-confidence → no RAG |
| Low-confidence | Sydney | 4 | moderate | family | Low-confidence → RAG triggered |
Click a preset, then click Generate Itinerary.
2. Watch the SSE step panels
Each workflow step appears as a panel as it arrives in real time:
- Green left border =
completed - Red left border =
error - The
generatestep panel highlightsprompt_template_keyandprompt_template_versionin amber
3. Check the Trace Log table
After the stream finishes the page automatically calls GET /traces/{session_id} and renders all trace rows in a table showing step, status, latency, template version, and degraded mode.
4. Try the optional template version override
Enter a version number in the Template version field before submitting to test a specific prompt template version (equivalent to ?template_version=N on the API).
To start with a clean trace table:
TRUNCATE TABLE trace_events RESTART IDENTITY;Via Docker:
docker exec -it <postgres-container> psql -U <user> -d <dbname> -c "TRUNCATE TABLE trace_events RESTART IDENTITY;"This is safe for manual UI exploration. Automated tests mock the DB and are unaffected.
The POST /generate endpoint is the public entry point for the workflow. It validates the request, runs the Agno workflow, and streams all step events as Server-Sent Events.
POST /generate?template_version=2
Content-Type: application/json
{
"destination": "Sydney",
"trip_length": 5,
"budget": "moderate",
"travel_style": "family"
}All four body fields are required. Missing or invalid fields return 422 Unprocessable Entity — never a 500.
| Field | Type | Values | Notes |
|---|---|---|---|
destination |
string |
free text | normalised to lowercase |
trip_length |
integer |
≥ 1 | number of days |
budget |
enum |
"budget", "moderate", "luxury" |
any other value → 422 |
travel_style |
string |
free text | used for CAG semantic matching |
Optional query parameter:
| Parameter | Type | Default | Effect |
|---|---|---|---|
template_version |
integer |
none | fetch a specific prompt template version instead of the active one |
When template_version is omitted the active template (is_active=true) is used. Pass an explicit version to reproduce a past run or test a specific template version.
HTTP/1.1 200 OK
Content-Type: text/event-stream
data: {"session_id": "...", "step": "intake", "status": "completed", ...}
data: {"session_id": "...", "step": "cag_match", "status": "completed", ...}
data: {"session_id": "...", "step": "retrieval_decision", "status": "completed", ...}
data: {"session_id": "...", "step": "generate", "status": "completed", ...}
data: {"session_id": "...", "step": "validate_output", "status": "completed", ...}- Each
dataline is a JSON-serialised SSE event matching DESIGN.md section 9. session_idis generated per request (uuid4) and is identical across all events.- The
rag_retrieveevent is present only whenconfidence_score < 0.85. - Events arrive in workflow order:
intake → cag_match → retrieval_decision → (rag_retrieve) → generate → validate_output.
POST /generate
├── FastAPI validates TravelRequest body → 422 on bad input
├── generate session_id = uuid4()
├── _run_workflow(session_id, request)
│ ├── travel_workflow.run(additional_data={session_id, destination, ...})
│ │ stdout captured via contextlib.redirect_stdout to collect
│ │ the retrieval_decision evaluator's side-effect print()
│ ├── _collect_step_result_events(result.step_results)
│ │ └── parses intake, cag_match, (rag_retrieve), generate, validate_output
│ ├── parse retrieval_decision event from captured stdout
│ └── sort all events by _STEP_ORDER → [0..5]
└── EventSourceResponse(event_generator())
└── yields {"data": json.dumps(event)} for each event
# Active template (default)
curl -N -X POST http://localhost:8000/generate \
-H "Content-Type: application/json" \
-d '{"destination": "Sydney", "trip_length": 5, "budget": "moderate", "travel_style": "family"}'
# Specific template version
curl -N -X POST "http://localhost:8000/generate?template_version=2" \
-H "Content-Type: application/json" \
-d '{"destination": "Sydney", "trip_length": 5, "budget": "moderate", "travel_style": "family"}'Returns all trace_events rows for a session as a JSON array ordered by created_at.
GET /traces/abc-123-def200 response:
[
{
"session_id": "abc-123-def",
"step_name": "intake",
"status": "completed",
"latency_ms": 4.2,
"prompt_template_key": null,
"prompt_template_version": null,
"degraded_mode": null,
"error_type": null,
"created_at": "..."
},
{
"session_id": "abc-123-def",
"step_name": "generate",
"status": "completed",
"latency_ms": 2341.8,
"prompt_template_key": "high_confidence_minimal_context",
"prompt_template_version": 1,
"degraded_mode": null,
"error_type": null,
"created_at": "..."
}
]404 when no rows exist for that session_id.
Step 4 (rag_retrieve) uses Agno's built-in Knowledge + PgVector rather than raw SQL, keeping retrieval logic within the Agno framework.
rag_retrieve_executor
└── retrieval_service.retrieve_documents(query_text, db_url, api_key)
└── knowledge_base.build_knowledge(db_url, api_key)
└── Knowledge(vector_db=PgVector(table_name="travel_docs", embedder=GeminiEmbedder))
└── knowledge.search(query, max_results=5) → List[Document]
- Table:
travel_docsin theaischema — created and managed automatically by Agno'sPgVector. Separate from thesource_documentsAlembic-managed table. - Embedder:
GeminiEmbedder— a custom Agno-compatible wrapper aroundgoogle-genaiusinggemini-embedding-001(768 dims,SEMANTIC_SIMILARITY), matching the model used for CAG and seed embeddings. - Search type: vector (cosine distance).
- Top-k: 5 chunks by default.
- Graceful degradation: any exception in embedding or search is caught and logged; the step always completes and emits an SSE event, returning an empty
retrieved_contextlist on failure.
scripts/seed.py inserts itinerary categories, 3 prompt templates (including the correction template for validate_output), and 13 Australian travel documents into the knowledge base using knowledge.insert():
python scripts/dev.py seedDocuments store destination and category tags in meta_data so they are accessible after retrieval.
Step 5 (generate) resolves the active prompt template from the database and calls the Gemini LLM.
generate_executor
├── reads upstream state:
│ intake → destination, trip_length, budget, travel_style
│ cag_match → matched_category, confidence_score, tone, constraints
│ rag_retrieve → retrieved_context (empty string if step was skipped)
├── selects template key:
│ confidence_score >= 0.85 → high_confidence_minimal_context
│ confidence_score < 0.85 → low_confidence_with_retrieval
├── template_resolver.resolve_template(template_key, db_url, version=None)
│ ├── version=None → WHERE template_key=? AND is_active=true
│ └── version=N → WHERE template_key=? AND version=N
├── _format_template(template_text, variables)
│ └── regex substitution — safely handles literal { } in the JSON schema example
└── _call_llm(prompt, api_key, model) → raw_llm_output
Three prompt templates are seeded in prompt_templates:
template_key |
Used by | Purpose |
|---|---|---|
high_confidence_minimal_context |
generate (confidence ≥ 0.85) |
Minimal prompt, no RAG context |
low_confidence_with_retrieval |
generate (confidence < 0.85) |
RAG-augmented prompt |
validate_output_correction |
validate_output (on first failure) |
Corrective retry prompt |
All templates are versioned (version, is_active) and fetched from the database at runtime — never hardcoded. The resolved prompt_template_version is written to the SSE event data field for traceability.
{
"session_id": "...",
"step": "generate",
"status": "completed",
"data": {
"raw_llm_output": "{ ... }",
"prompt_template_key": "high_confidence_minimal_context",
"prompt_template_version": 1
}
}Step 6 (validate_output) parses raw_llm_output from the generate step against a Pydantic schema and retries once on failure.
validate_output_executor
├── _extract_raw_llm_output(step_input)
│ └── reads raw_llm_output from generate SSE event
├── parse_llm_output(raw_text) [attempt 1]
│ ├── strip markdown fences (```json … ```)
│ ├── json.loads(cleaned_text)
│ ├── ItineraryOutput.model_validate(data)
│ └── content guardrail: len(daily_itinerary) == trip_length_days
│
├── [success] → emit SSE with itinerary + degraded_mode=False
│
└── [failure] → retry once:
├── resolve_template("validate_output_correction", db_url)
├── _build_correction_prompt(template, raw_output, errors)
├── _call_llm(correction_prompt, ...)
├── parse_llm_output(retry_output) [attempt 2]
├── [success] → emit SSE with itinerary + degraded_mode=True
└── [failure] → emit SSE status="error" + degraded_mode=True
(no unhandled exception)
class DailyItineraryItem(BaseModel):
day: int
morning: str
afternoon: str
evening: str
estimated_cost_aud: float
class ItineraryOutput(BaseModel):
destination: str
trip_length_days: int
budget_level: str
travel_style: str
daily_itinerary: List[DailyItineraryItem]
total_estimated_cost_aud: float
key_tips: List[str]{
"session_id": "...",
"step": "validate_output",
"status": "completed",
"data": {
"itinerary": {
"destination": "Sydney",
"trip_length_days": 5,
"...": "..."
},
"degraded_mode": false
}
}On unrecoverable failure: "status": "error", "degraded_mode": true, "error": "Validation failed after retry: ...".
Every workflow step writes one row to the trace_events table after it completes. This gives a full per-request audit trail without coupling the steps to any streaming or logging infrastructure.
write_trace(db_url, session_id, step_name, status, latency_ms, **step_fields)
└── creates SQLAlchemy engine → inserts TraceEvent row → commits
any DB error is caught and logged — never propagates to the caller
trace_logger.write_trace is imported by each step executor and called on every exit path — success, error, and degraded-mode retry.
| Column | Type | Steps |
|---|---|---|
session_id |
String(100) |
all |
step_name |
String(100) |
all |
status |
String(50) |
all — "completed" or "error" |
latency_ms |
Float |
all — wall-clock duration of the step |
prompt_template_key |
String(100) |
generate only |
prompt_template_version |
Integer |
generate only |
degraded_mode |
Boolean |
validate_output only — True when a retry was attempted |
error_type |
String(200) |
validate_output only — short description on failure |
created_at |
DateTime(tz) |
all — server default |
session_id step_name status latency_ms prompt_template_key degraded_mode
───────────────────── ─────────────────── ───────── ────────── ─────────────────────────────── ─────────────
abc123… intake completed 4.2 null null
abc123… cag_match completed 312.0 null null
abc123… retrieval_decision completed 0.1 null null
abc123… rag_retrieve completed 890.5 null null
abc123… generate completed 2341.8 low_confidence_with_retrieval null
abc123… validate_output completed 12.3 null false
-- All steps for a session, in order
SELECT step_name, status, latency_ms, degraded_mode
FROM trace_events
WHERE session_id = 'abc123...'
ORDER BY created_at;
-- Degraded validate_output runs in the last hour
SELECT session_id, error_type, latency_ms, created_at
FROM trace_events
WHERE step_name = 'validate_output'
AND degraded_mode = true
AND created_at > now() - interval '1 hour';Tests are split into two tiers:
Unit tests — no database or API key required, run anywhere:
python -m pytest tests/ -m "not integration" -vIntegration tests — require Docker (PostgreSQL) and a real GEMINI_API_KEY in .env. Includes the 3 golden tests and the end-to-end POST /generate test:
# 1. start db + apply migrations + seed data
python scripts/dev.py up
python scripts/dev.py migrate
python scripts/dev.py seed
# 2. run integration tests only (golden + observability + cag-match)
python -m pytest tests/ -m integration -v
# 3. or run everything together
python -m pytest tests/ -vGolden and end-to-end tests are marked @pytest.mark.integration and are skipped in the unit-test run.
tests/conftest.py loads .env automatically so environment variables do not need to be exported manually before running pytest.
.
├── src/
│ └── app/
│ ├── main.py # FastAPI entrypoint — GET /health, POST /generate, GET /traces/{id}
│ ├── static/
│ │ └── index.html # Single-page browser UI (plain HTML + vanilla JS)
│ ├── models/
│ │ ├── base.py # SQLAlchemy DeclarativeBase
│ │ ├── itinerary_category.py # itinerary_categories table + Vector(768)
│ │ ├── prompt_template.py # prompt_templates table
│ │ ├── source_document.py # source_documents table + Vector(768)
│ │ └── trace_event.py # trace_events table
│ └── workflow/
│ ├── sse.py # SSE event builder (build_sse_event, sse_event_to_json)
│ ├── intake_model.py # TravelRequest Pydantic model (Step 1 data contract)
│ ├── travel_workflow.py # Agno Workflow — 6 steps registered in order
│ ├── trace_logger.py # write_trace() — inserts one TraceEvent row per step
│ ├── cag_matcher.py # CAG logic: embed intent → pgvector search → MatchResult
│ ├── gemini_embedder.py # Agno-compatible Gemini embedder (wraps google-genai)
│ ├── knowledge_base.py # Factory: Knowledge + PgVector(travel_docs) + GeminiEmbedder
│ ├── retrieval_service.py # RAG logic: knowledge.search() → list[RetrievedChunk]
│ ├── template_resolver.py # DB lookup: resolve_template(key) → (text, version)
│ ├── output_validator.py # ItineraryOutput Pydantic schema + parse_llm_output()
│ └── steps/
│ ├── intake.py # Step 1 — validates + normalises input (TravelRequest)
│ ├── cag_match.py # Step 2 — Gemini embed + cosine match → matched_category
│ ├── retrieval_decision.py # Step 3 — confidence >= 0.85 → skip RAG, else retrieve
│ ├── rag_retrieve.py # Step 4 — Agno Knowledge top-5 retrieval → retrieved_context
│ ├── generate.py # Step 5 — DB prompt template + LLM call → raw_llm_output
│ └── validate_output.py # Step 6 — Pydantic validation, retry once, structured error
├── tests/
│ ├── conftest.py # loads .env; redirects Agno logger to stderr
│ ├── test_db_schema.py # db-schema merge checklist (migrations, models, live DB)
│ ├── test_workflow_skeleton.py # workflow-skeleton checklist (SSE structure, 6-step run)
│ ├── test_intake.py # intake checklist + BudgetLevel enum tests
│ ├── test_cag_match.py # cag-match checklist (unit + integration)
│ ├── test_retrieval_decision.py # retrieval-decision checklist (threshold logic, SSE)
│ ├── test_rag_retrieve.py # rag-retrieve checklist (top-k, graceful degradation)
│ ├── test_generate.py # generate checklist + template version override tests
│ ├── test_validate_output.py # validate-output checklist + content guardrail tests
│ ├── test_observability.py # observability checklist (unit + integration)
│ ├── test_fastapi_service.py # fastapi-service checklist (HTTP layer, 422s, SSE shape)
│ ├── test_golden.py # 3 golden tests + 1 end-to-end POST /generate test
│ └── test_ui_api.py # unit tests for GET /traces/{session_id} (DB mocked)
├── alembic/
│ ├── env.py # loads .env, imports models, runs migrations
│ ├── script.py.mako # migration file template
│ └── versions/
│ ├── 0001_enable_pgvector_create_itinerary_categories.py
│ ├── 0002_create_prompt_templates.py
│ ├── 0003_create_source_documents.py
│ └── 0004_create_trace_events.py
├── scripts/
│ ├── dev.py # cross-platform task runner
│ └── seed.py # seed itinerary_categories, prompt_templates, source_documents
├── docs/
│ ├── DESIGN.md # single source of truth for system design
│ ├── CLAUDE.md # AI usage guidelines for this repo
│ └── git-workflow.md # feature branch order and merge checklist
├── alembic.ini # Alembic config (DB URL injected from env)
├── docker-compose.yml # PostgreSQL + pgvector
├── pyproject.toml # dependencies and project metadata
├── Makefile # thin wrappers around scripts/dev.py
└── .env.example # required environment variables with comments
Branch: feat/ui — browser UI with SSE step panels, trace viewer, and golden preset buttons. Also adds GET /traces/{session_id} endpoint.
| Branch | Scope | Status |
|---|---|---|
feat/project-scaffold |
Dependencies, Docker, directory layout, health endpoint | Done |
feat/db-schema |
Alembic migrations, SQLAlchemy models | Done |
feat/db-seed |
Category embeddings, prompt templates, source documents | Done |
feat/workflow-skeleton |
Agno workflow with 6 stub steps, SSE emitter | Done |
feat/intake |
Input validation and normalisation (Step 1) | Done |
feat/cag-match |
Embedding-based intent classification (Step 2) | Done |
feat/retrieval-decision |
Confidence threshold routing (Step 3) | Done |
feat/rag-retrieve |
Agno Knowledge + PgVector top-5 retrieval (Step 4) | Done |
feat/generate |
DB-template prompt construction + LLM call (Step 5) | Done |
feat/validate-output |
Pydantic validation + retry logic (Step 6) | Done |
feat/fastapi-service |
POST /generate SSE endpoint |
Done |
feat/observability |
trace_events writes across all steps |
Done |
feat/testing |
Unit, integration, end-to-end, and golden tests | Done |
feat/ui |
Browser UI, GET /traces/{session_id}, static file serving |
Done |
This branch completes the full test suite and adds three functional improvements to the workflow.
| File | Type | What it covers |
|---|---|---|
test_golden.py |
integration | 3 golden tests (family, budget, low-confidence) + 1 end-to-end POST /generate |
test_fastapi_service.py |
unit | template_version query param forwarded correctly |
test_generate.py |
unit | resolve_template version override, None default |
test_validate_output.py |
unit | content guardrail pass and fail cases |
Golden tests assert structure and branch correctness — not exact LLM output strings:
- Family trip — high-confidence path, no
rag_retrieveevent, structured itinerary returned - Budget trip — same branch, different destination and style
- Low-confidence —
rag_retrieveevent present,retrieved_contextnon-empty, itinerary returned
End-to-end test makes a real HTTP call to POST /generate via TestClient (no mocking of _run_workflow) and asserts all 6 step names appear in the SSE stream with correct structure.
budget field changed to enum (BudgetLevel):
Values "budget", "moderate", "luxury" are now enforced at the Pydantic layer. Any other string returns 422. travel_style remains free text so the CAG step can perform semantic matching on it.
Prompt template version override (?template_version=N):
POST /generate accepts an optional template_version query parameter. When set, resolve_template fetches that specific version instead of the active one — enabling reproducibility and A/B testing without changing the database. The version is propagated through additional_data to generate_executor.
Content-level output guardrail:
parse_llm_output now checks that len(daily_itinerary) == trip_length_days after Pydantic schema validation. A mismatch triggers the existing retry mechanism in validate_output_executor — no extra code path needed.
Seed script robustness fix:
scripts/seed.py detects when travel_docs exists but is missing the embedding column (caused by a partial or legacy seed run) and drops + recreates the table before inserting documents.
This branch adds a browser UI and a traces read endpoint on top of the completed API.
| File | Purpose |
|---|---|
src/app/static/index.html |
Single-page UI — form, SSE step panels, trace table, preset buttons |
tests/test_ui_api.py |
13 unit tests for GET /traces/{session_id} (DB mocked) |
src/app/main.py
- Added
GET /traces/{session_id}— queriestrace_eventsfor all rows matchingsession_id, returns JSON array ordered bycreated_at, 404 when none found - Added
StaticFilesmount at/servingsrc/app/static/(mounted after all API routes so API routes take precedence)
- Form — destination, trip_length, budget (enum select), travel_style, optional template_version
- 3 golden preset buttons — pre-fill the form with the same inputs as
test_golden.py - SSE step panels — render in real time as each workflow step completes;
generatepanel highlights template key and version - Trace table — auto-fetched from
GET /traces/{session_id}after stream completes; shows step, status, latency, template version, degraded mode
- DESIGN.md — data model, workflow steps, SSE contract, prompt strategy
- api.md — full API reference: request/response formats, SSE event shapes per step
- git-workflow.md — branch order, scope, and merge checklist
- CLAUDE.md — AI tool usage rules and architecture constraints