Skip to content

yutingwang999/Agno_Travel_Agent

Repository files navigation

Travel Itinerary Planner

An AI-powered travel itinerary planning service for Australian destinations. The system uses a structured Agno workflow that combines CAG (Cache-Augmented Generation for intent classification) and RAG (Retrieval-Augmented Generation for factual enrichment), with database-driven prompt templates and full SSE streaming.


Architecture

POST /generate
      │
      ▼
FastAPI Service
      │  creates session_id, validates request
      ▼
Agno Workflow
  1. intake              — validate + normalise input fields
  2. cag_match           — embed intent, cosine match against itinerary_categories
  3. retrieval_decision  — confidence >= 0.85 → skip RAG, else retrieve
  4. rag_retrieve        — pgvector top-5 similarity search on source_documents
  5. generate            — resolve DB prompt template, call LLM
  6. validate_output     — Pydantic schema check, retry once on failure
      │
      ▼
SSE stream (one structured event per step)

Each SSE event has the form:

{
  "session_id": "...",
  "step": "cag_match",
  "status": "completed",
  "timestamp": "...",
  "data": {},
  "error": null
}

Tech Stack

Layer Technology
API FastAPI + sse-starlette
Workflow Agno (Step, Condition)
Knowledge base Agno Knowledge + PgVector (travel_docs table)
Database PostgreSQL 16 + pgvector
ORM / migrations SQLAlchemy 2 + Alembic
LLM Gemini 2.0 Flash (google-genai)
Embeddings gemini-embedding-001 (Google, 768 dims)
Validation Pydantic v2

Prerequisites

  • Python 3.11+
  • Docker Desktop

Setup

1. Clone and copy env file

git clone <repo-url>
cd Agno_Travel_Agent
cp .env.example .env
# fill in GEMINI_API_KEY in .env

2. Install dependencies

python scripts/dev.py install

3. Start PostgreSQL

python scripts/dev.py up

4. Apply migrations

python scripts/dev.py migrate

5. Seed the database

python scripts/dev.py seed

6. Start the app

PYTHONPATH=src uvicorn app.main:app --reload   # macOS / Linux
$env:PYTHONPATH="src"; uvicorn app.main:app --reload  # Windows PowerShell

Environment Variables

All variables are documented in .env.example.

Variable Purpose
POSTGRES_USER PostgreSQL username
POSTGRES_PASSWORD PostgreSQL password
POSTGRES_DB Database name
POSTGRES_PORT Host port to expose (default 5432)
DATABASE_URL Full SQLAlchemy / Alembic DSN
GEMINI_API_KEY Google Gemini API key
LLM_MODEL Gemini model for generation (e.g. gemini-2.0-flash)
EMBEDDING_MODEL Embedding model — must match seed and cag_match (default: gemini-embedding-001)
APP_HOST Uvicorn bind host
APP_PORT Uvicorn bind port

Dev Commands

All commands work on Windows and Unix via the Python task runner:

python scripts/dev.py up             # start PostgreSQL
python scripts/dev.py down           # stop PostgreSQL
python scripts/dev.py install        # pip install -e ".[dev]"
python scripts/dev.py migrate        # alembic upgrade head
python scripts/dev.py seed           # seed categories, prompt templates, source docs
python scripts/dev.py count-rows     # print row counts + embedding coverage for seeded tables
python scripts/dev.py check-health   # GET /health → 200
python scripts/dev.py test           # pytest
python scripts/dev.py verify         # up → migrate → uvicorn → health + pytest
python scripts/dev.py verify-schema  # up → migrate → db-schema tests (16 checks)

On macOS/Linux you can also use make <target> as a shorthand.


Browser UI

The app serves a single-page frontend at GET / from src/app/static/index.html — plain HTML + vanilla JS, no build step required.

Starting the app

# Windows PowerShell
$env:PYTHONPATH="src"; python.exe -m uvicorn app.main:app --reload

# macOS / Linux
PYTHONPATH=src uvicorn app.main:app --reload

Then open http://localhost:8000 in your browser.

Manual UI test walkthrough

1. Use a golden preset button

Three preset buttons pre-fill the form with the same inputs used in test_golden.py:

Button Destination Days Budget Style Expected path
Family trip Sydney 5 moderate family High-confidence → no RAG
Budget trip Melbourne 3 budget budget High-confidence → no RAG
Low-confidence Sydney 4 moderate family Low-confidence → RAG triggered

Click a preset, then click Generate Itinerary.

2. Watch the SSE step panels

Each workflow step appears as a panel as it arrives in real time:

  • Green left border = completed
  • Red left border = error
  • The generate step panel highlights prompt_template_key and prompt_template_version in amber

3. Check the Trace Log table

After the stream finishes the page automatically calls GET /traces/{session_id} and renders all trace rows in a table showing step, status, latency, template version, and degraded mode.

4. Try the optional template version override

Enter a version number in the Template version field before submitting to test a specific prompt template version (equivalent to ?template_version=N on the API).

Clearing trace data between manual runs

To start with a clean trace table:

TRUNCATE TABLE trace_events RESTART IDENTITY;

Via Docker:

docker exec -it <postgres-container> psql -U <user> -d <dbname> -c "TRUNCATE TABLE trace_events RESTART IDENTITY;"

This is safe for manual UI exploration. Automated tests mock the DB and are unaffected.


POST /generate Endpoint

The POST /generate endpoint is the public entry point for the workflow. It validates the request, runs the Agno workflow, and streams all step events as Server-Sent Events.

Request

POST /generate?template_version=2
Content-Type: application/json

{
  "destination": "Sydney",
  "trip_length": 5,
  "budget": "moderate",
  "travel_style": "family"
}

All four body fields are required. Missing or invalid fields return 422 Unprocessable Entity — never a 500.

Field Type Values Notes
destination string free text normalised to lowercase
trip_length integer ≥ 1 number of days
budget enum "budget", "moderate", "luxury" any other value → 422
travel_style string free text used for CAG semantic matching

Optional query parameter:

Parameter Type Default Effect
template_version integer none fetch a specific prompt template version instead of the active one

When template_version is omitted the active template (is_active=true) is used. Pass an explicit version to reproduce a past run or test a specific template version.

Response

HTTP/1.1 200 OK
Content-Type: text/event-stream

data: {"session_id": "...", "step": "intake", "status": "completed", ...}

data: {"session_id": "...", "step": "cag_match", "status": "completed", ...}

data: {"session_id": "...", "step": "retrieval_decision", "status": "completed", ...}

data: {"session_id": "...", "step": "generate", "status": "completed", ...}

data: {"session_id": "...", "step": "validate_output", "status": "completed", ...}
  • Each data line is a JSON-serialised SSE event matching DESIGN.md section 9.
  • session_id is generated per request (uuid4) and is identical across all events.
  • The rag_retrieve event is present only when confidence_score < 0.85.
  • Events arrive in workflow order: intake → cag_match → retrieval_decision → (rag_retrieve) → generate → validate_output.

How it works

POST /generate
  ├── FastAPI validates TravelRequest body  →  422 on bad input
  ├── generate session_id = uuid4()
  ├── _run_workflow(session_id, request)
  │     ├── travel_workflow.run(additional_data={session_id, destination, ...})
  │     │     stdout captured via contextlib.redirect_stdout to collect
  │     │     the retrieval_decision evaluator's side-effect print()
  │     ├── _collect_step_result_events(result.step_results)
  │     │     └── parses intake, cag_match, (rag_retrieve), generate, validate_output
  │     ├── parse retrieval_decision event from captured stdout
  │     └── sort all events by _STEP_ORDER → [0..5]
  └── EventSourceResponse(event_generator())
        └── yields {"data": json.dumps(event)} for each event

Try it

# Active template (default)
curl -N -X POST http://localhost:8000/generate \
  -H "Content-Type: application/json" \
  -d '{"destination": "Sydney", "trip_length": 5, "budget": "moderate", "travel_style": "family"}'

# Specific template version
curl -N -X POST "http://localhost:8000/generate?template_version=2" \
  -H "Content-Type: application/json" \
  -d '{"destination": "Sydney", "trip_length": 5, "budget": "moderate", "travel_style": "family"}'

GET /traces/{session_id}

Returns all trace_events rows for a session as a JSON array ordered by created_at.

GET /traces/abc-123-def

200 response:

[
  {
    "session_id": "abc-123-def",
    "step_name": "intake",
    "status": "completed",
    "latency_ms": 4.2,
    "prompt_template_key": null,
    "prompt_template_version": null,
    "degraded_mode": null,
    "error_type": null,
    "created_at": "..."
  },
  {
    "session_id": "abc-123-def",
    "step_name": "generate",
    "status": "completed",
    "latency_ms": 2341.8,
    "prompt_template_key": "high_confidence_minimal_context",
    "prompt_template_version": 1,
    "degraded_mode": null,
    "error_type": null,
    "created_at": "..."
  }
]

404 when no rows exist for that session_id.


RAG Knowledge Base

Step 4 (rag_retrieve) uses Agno's built-in Knowledge + PgVector rather than raw SQL, keeping retrieval logic within the Agno framework.

How it works

rag_retrieve_executor
  └── retrieval_service.retrieve_documents(query_text, db_url, api_key)
        └── knowledge_base.build_knowledge(db_url, api_key)
              └── Knowledge(vector_db=PgVector(table_name="travel_docs", embedder=GeminiEmbedder))
                    └── knowledge.search(query, max_results=5)  →  List[Document]
  • Table: travel_docs in the ai schema — created and managed automatically by Agno's PgVector. Separate from the source_documents Alembic-managed table.
  • Embedder: GeminiEmbedder — a custom Agno-compatible wrapper around google-genai using gemini-embedding-001 (768 dims, SEMANTIC_SIMILARITY), matching the model used for CAG and seed embeddings.
  • Search type: vector (cosine distance).
  • Top-k: 5 chunks by default.
  • Graceful degradation: any exception in embedding or search is caught and logged; the step always completes and emits an SSE event, returning an empty retrieved_context list on failure.

Seeding the knowledge base

scripts/seed.py inserts itinerary categories, 3 prompt templates (including the correction template for validate_output), and 13 Australian travel documents into the knowledge base using knowledge.insert():

python scripts/dev.py seed

Documents store destination and category tags in meta_data so they are accessible after retrieval.


Generate Step

Step 5 (generate) resolves the active prompt template from the database and calls the Gemini LLM.

How it works

generate_executor
  ├── reads upstream state:
  │     intake       → destination, trip_length, budget, travel_style
  │     cag_match    → matched_category, confidence_score, tone, constraints
  │     rag_retrieve → retrieved_context (empty string if step was skipped)
  ├── selects template key:
  │     confidence_score >= 0.85  →  high_confidence_minimal_context
  │     confidence_score <  0.85  →  low_confidence_with_retrieval
  ├── template_resolver.resolve_template(template_key, db_url, version=None)
  │     ├── version=None  → WHERE template_key=? AND is_active=true
  │     └── version=N     → WHERE template_key=? AND version=N
  ├── _format_template(template_text, variables)
  │     └── regex substitution — safely handles literal { } in the JSON schema example
  └── _call_llm(prompt, api_key, model)  →  raw_llm_output

Template selection

Three prompt templates are seeded in prompt_templates:

template_key Used by Purpose
high_confidence_minimal_context generate (confidence ≥ 0.85) Minimal prompt, no RAG context
low_confidence_with_retrieval generate (confidence < 0.85) RAG-augmented prompt
validate_output_correction validate_output (on first failure) Corrective retry prompt

All templates are versioned (version, is_active) and fetched from the database at runtime — never hardcoded. The resolved prompt_template_version is written to the SSE event data field for traceability.

SSE output

{
  "session_id": "...",
  "step": "generate",
  "status": "completed",
  "data": {
    "raw_llm_output": "{ ... }",
    "prompt_template_key": "high_confidence_minimal_context",
    "prompt_template_version": 1
  }
}

Validate Output Step

Step 6 (validate_output) parses raw_llm_output from the generate step against a Pydantic schema and retries once on failure.

How it works

validate_output_executor
  ├── _extract_raw_llm_output(step_input)
  │     └── reads raw_llm_output from generate SSE event
  ├── parse_llm_output(raw_text)                        [attempt 1]
  │     ├── strip markdown fences (```json … ```)
  │     ├── json.loads(cleaned_text)
  │     ├── ItineraryOutput.model_validate(data)
  │     └── content guardrail: len(daily_itinerary) == trip_length_days
  │
  ├── [success] → emit SSE with itinerary + degraded_mode=False
  │
  └── [failure] → retry once:
        ├── resolve_template("validate_output_correction", db_url)
        ├── _build_correction_prompt(template, raw_output, errors)
        ├── _call_llm(correction_prompt, ...)
        ├── parse_llm_output(retry_output)              [attempt 2]
        ├── [success] → emit SSE with itinerary + degraded_mode=True
        └── [failure] → emit SSE status="error" + degraded_mode=True
                        (no unhandled exception)

Output schema (ItineraryOutput)

class DailyItineraryItem(BaseModel):
    day: int
    morning: str
    afternoon: str
    evening: str
    estimated_cost_aud: float

class ItineraryOutput(BaseModel):
    destination: str
    trip_length_days: int
    budget_level: str
    travel_style: str
    daily_itinerary: List[DailyItineraryItem]
    total_estimated_cost_aud: float
    key_tips: List[str]

SSE output

{
  "session_id": "...",
  "step": "validate_output",
  "status": "completed",
  "data": {
    "itinerary": {
      "destination": "Sydney",
      "trip_length_days": 5,
      "...": "..."
    },
    "degraded_mode": false
  }
}

On unrecoverable failure: "status": "error", "degraded_mode": true, "error": "Validation failed after retry: ...".


Observability

Every workflow step writes one row to the trace_events table after it completes. This gives a full per-request audit trail without coupling the steps to any streaming or logging infrastructure.

How it works

write_trace(db_url, session_id, step_name, status, latency_ms, **step_fields)
  └── creates SQLAlchemy engine → inserts TraceEvent row → commits
      any DB error is caught and logged — never propagates to the caller

trace_logger.write_trace is imported by each step executor and called on every exit path — success, error, and degraded-mode retry.

trace_events schema

Column Type Steps
session_id String(100) all
step_name String(100) all
status String(50) all — "completed" or "error"
latency_ms Float all — wall-clock duration of the step
prompt_template_key String(100) generate only
prompt_template_version Integer generate only
degraded_mode Boolean validate_output only — True when a retry was attempted
error_type String(200) validate_output only — short description on failure
created_at DateTime(tz) all — server default

Example rows after a full low-confidence run

session_id             step_name            status     latency_ms  prompt_template_key              degraded_mode
─────────────────────  ───────────────────  ─────────  ──────────  ───────────────────────────────  ─────────────
abc123…                intake               completed  4.2         null                             null
abc123…                cag_match            completed  312.0       null                             null
abc123…                retrieval_decision   completed  0.1         null                             null
abc123…                rag_retrieve         completed  890.5       null                             null
abc123…                generate             completed  2341.8      low_confidence_with_retrieval    null
abc123…                validate_output      completed  12.3        null                             false

Querying traces

-- All steps for a session, in order
SELECT step_name, status, latency_ms, degraded_mode
FROM trace_events
WHERE session_id = 'abc123...'
ORDER BY created_at;

-- Degraded validate_output runs in the last hour
SELECT session_id, error_type, latency_ms, created_at
FROM trace_events
WHERE step_name = 'validate_output'
  AND degraded_mode = true
  AND created_at > now() - interval '1 hour';

Running Tests

Tests are split into two tiers:

Unit tests — no database or API key required, run anywhere:

python -m pytest tests/ -m "not integration" -v

Integration tests — require Docker (PostgreSQL) and a real GEMINI_API_KEY in .env. Includes the 3 golden tests and the end-to-end POST /generate test:

# 1. start db + apply migrations + seed data
python scripts/dev.py up
python scripts/dev.py migrate
python scripts/dev.py seed

# 2. run integration tests only (golden + observability + cag-match)
python -m pytest tests/ -m integration -v

# 3. or run everything together
python -m pytest tests/ -v

Golden and end-to-end tests are marked @pytest.mark.integration and are skipped in the unit-test run.

tests/conftest.py loads .env automatically so environment variables do not need to be exported manually before running pytest.


Project Layout

.
├── src/
│   └── app/
│       ├── main.py          # FastAPI entrypoint — GET /health, POST /generate, GET /traces/{id}
│       ├── static/
│       │   └── index.html           # Single-page browser UI (plain HTML + vanilla JS)
│       ├── models/
│       │   ├── base.py               # SQLAlchemy DeclarativeBase
│       │   ├── itinerary_category.py # itinerary_categories table + Vector(768)
│       │   ├── prompt_template.py    # prompt_templates table
│       │   ├── source_document.py    # source_documents table + Vector(768)
│       │   └── trace_event.py        # trace_events table
│       └── workflow/
│           ├── sse.py                # SSE event builder (build_sse_event, sse_event_to_json)
│           ├── intake_model.py       # TravelRequest Pydantic model (Step 1 data contract)
│           ├── travel_workflow.py    # Agno Workflow — 6 steps registered in order
│           ├── trace_logger.py       # write_trace() — inserts one TraceEvent row per step
│           ├── cag_matcher.py        # CAG logic: embed intent → pgvector search → MatchResult
│           ├── gemini_embedder.py    # Agno-compatible Gemini embedder (wraps google-genai)
│           ├── knowledge_base.py     # Factory: Knowledge + PgVector(travel_docs) + GeminiEmbedder
│           ├── retrieval_service.py  # RAG logic: knowledge.search() → list[RetrievedChunk]
│           ├── template_resolver.py  # DB lookup: resolve_template(key) → (text, version)
│           ├── output_validator.py   # ItineraryOutput Pydantic schema + parse_llm_output()
│           └── steps/
│               ├── intake.py              # Step 1 — validates + normalises input (TravelRequest)
│               ├── cag_match.py           # Step 2 — Gemini embed + cosine match → matched_category
│               ├── retrieval_decision.py  # Step 3 — confidence >= 0.85 → skip RAG, else retrieve
│               ├── rag_retrieve.py        # Step 4 — Agno Knowledge top-5 retrieval → retrieved_context
│               ├── generate.py            # Step 5 — DB prompt template + LLM call → raw_llm_output
│               └── validate_output.py     # Step 6 — Pydantic validation, retry once, structured error
├── tests/
│   ├── conftest.py              # loads .env; redirects Agno logger to stderr
│   ├── test_db_schema.py        # db-schema merge checklist (migrations, models, live DB)
│   ├── test_workflow_skeleton.py # workflow-skeleton checklist (SSE structure, 6-step run)
│   ├── test_intake.py           # intake checklist + BudgetLevel enum tests
│   ├── test_cag_match.py        # cag-match checklist (unit + integration)
│   ├── test_retrieval_decision.py # retrieval-decision checklist (threshold logic, SSE)
│   ├── test_rag_retrieve.py     # rag-retrieve checklist (top-k, graceful degradation)
│   ├── test_generate.py         # generate checklist + template version override tests
│   ├── test_validate_output.py  # validate-output checklist + content guardrail tests
│   ├── test_observability.py    # observability checklist (unit + integration)
│   ├── test_fastapi_service.py  # fastapi-service checklist (HTTP layer, 422s, SSE shape)
│   ├── test_golden.py           # 3 golden tests + 1 end-to-end POST /generate test
│   └── test_ui_api.py           # unit tests for GET /traces/{session_id} (DB mocked)
├── alembic/
│   ├── env.py               # loads .env, imports models, runs migrations
│   ├── script.py.mako       # migration file template
│   └── versions/
│       ├── 0001_enable_pgvector_create_itinerary_categories.py
│       ├── 0002_create_prompt_templates.py
│       ├── 0003_create_source_documents.py
│       └── 0004_create_trace_events.py
├── scripts/
│   ├── dev.py               # cross-platform task runner
│   └── seed.py              # seed itinerary_categories, prompt_templates, source_documents
├── docs/
│   ├── DESIGN.md            # single source of truth for system design
│   ├── CLAUDE.md            # AI usage guidelines for this repo
│   └── git-workflow.md      # feature branch order and merge checklist
├── alembic.ini              # Alembic config (DB URL injected from env)
├── docker-compose.yml       # PostgreSQL + pgvector
├── pyproject.toml           # dependencies and project metadata
├── Makefile                 # thin wrappers around scripts/dev.py
└── .env.example             # required environment variables with comments

Current Status

Branch: feat/ui — browser UI with SSE step panels, trace viewer, and golden preset buttons. Also adds GET /traces/{session_id} endpoint.

Branch Scope Status
feat/project-scaffold Dependencies, Docker, directory layout, health endpoint Done
feat/db-schema Alembic migrations, SQLAlchemy models Done
feat/db-seed Category embeddings, prompt templates, source documents Done
feat/workflow-skeleton Agno workflow with 6 stub steps, SSE emitter Done
feat/intake Input validation and normalisation (Step 1) Done
feat/cag-match Embedding-based intent classification (Step 2) Done
feat/retrieval-decision Confidence threshold routing (Step 3) Done
feat/rag-retrieve Agno Knowledge + PgVector top-5 retrieval (Step 4) Done
feat/generate DB-template prompt construction + LLM call (Step 5) Done
feat/validate-output Pydantic validation + retry logic (Step 6) Done
feat/fastapi-service POST /generate SSE endpoint Done
feat/observability trace_events writes across all steps Done
feat/testing Unit, integration, end-to-end, and golden tests Done
feat/ui Browser UI, GET /traces/{session_id}, static file serving Done

feat/testing — Branch Changes

This branch completes the full test suite and adds three functional improvements to the workflow.

Test coverage added

File Type What it covers
test_golden.py integration 3 golden tests (family, budget, low-confidence) + 1 end-to-end POST /generate
test_fastapi_service.py unit template_version query param forwarded correctly
test_generate.py unit resolve_template version override, None default
test_validate_output.py unit content guardrail pass and fail cases

Golden tests assert structure and branch correctness — not exact LLM output strings:

  1. Family trip — high-confidence path, no rag_retrieve event, structured itinerary returned
  2. Budget trip — same branch, different destination and style
  3. Low-confidencerag_retrieve event present, retrieved_context non-empty, itinerary returned

End-to-end test makes a real HTTP call to POST /generate via TestClient (no mocking of _run_workflow) and asserts all 6 step names appear in the SSE stream with correct structure.

Functional changes

budget field changed to enum (BudgetLevel):
Values "budget", "moderate", "luxury" are now enforced at the Pydantic layer. Any other string returns 422. travel_style remains free text so the CAG step can perform semantic matching on it.

Prompt template version override (?template_version=N):
POST /generate accepts an optional template_version query parameter. When set, resolve_template fetches that specific version instead of the active one — enabling reproducibility and A/B testing without changing the database. The version is propagated through additional_data to generate_executor.

Content-level output guardrail:
parse_llm_output now checks that len(daily_itinerary) == trip_length_days after Pydantic schema validation. A mismatch triggers the existing retry mechanism in validate_output_executor — no extra code path needed.

Seed script robustness fix:
scripts/seed.py detects when travel_docs exists but is missing the embedding column (caused by a partial or legacy seed run) and drops + recreates the table before inserting documents.


feat/ui — Branch Changes

This branch adds a browser UI and a traces read endpoint on top of the completed API.

New files

File Purpose
src/app/static/index.html Single-page UI — form, SSE step panels, trace table, preset buttons
tests/test_ui_api.py 13 unit tests for GET /traces/{session_id} (DB mocked)

Changes to existing files

src/app/main.py

  • Added GET /traces/{session_id} — queries trace_events for all rows matching session_id, returns JSON array ordered by created_at, 404 when none found
  • Added StaticFiles mount at / serving src/app/static/ (mounted after all API routes so API routes take precedence)

UI features

  • Form — destination, trip_length, budget (enum select), travel_style, optional template_version
  • 3 golden preset buttons — pre-fill the form with the same inputs as test_golden.py
  • SSE step panels — render in real time as each workflow step completes; generate panel highlights template key and version
  • Trace table — auto-fetched from GET /traces/{session_id} after stream completes; shows step, status, latency, template version, degraded mode

Design Docs

  • DESIGN.md — data model, workflow steps, SSE contract, prompt strategy
  • api.md — full API reference: request/response formats, SSE event shapes per step
  • git-workflow.md — branch order, scope, and merge checklist
  • CLAUDE.md — AI tool usage rules and architecture constraints

About

A small demonstration of Agno AI agent

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages