""" AGENTIC PIPELINE ARCHITECTURE & USAGE GUIDE
This document explains the complete agentic pipeline orchestration, its components, and how to use it effectively. """
""" The NLP-to-SQL Agent uses THREE ORCHESTRATION MODES:
┌─────────────────────────────────────────────────────────────────────────────┐ │ MODE 1: DIRECT PIPELINE (Simplest) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌──────────┐ ┌──────────┐ ┌───────────┐ │ │ │ User Query │───→│ Generate │───→│ Validate │───→│ Execute │ │ │ │ (NL) │ │ SQL │ │ SQL │ │ Query │ │ │ └─────────────┘ └──────────┘ └──────────┘ └───────────┘ │ │ ↓ │ │ (retry up to 5x) │ │ │ │ Good for: Quick queries, simple use cases │ │ Output: SQL, Results, Summary, Narrative │ └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ MODE 2: LANGCHAIN AGENT (Multi-tool orchestration) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ ┌──────────────┐ │ │ User Query ─────→│ LangChain │◄──────────────────┐ │ │ (NL) │ REACT Agent │ │ │ │ └──────────────┘ │ │ │ ↓ │ │ │ ┌──────────────┼──────────────┐ │ │ │ ↓ ↓ ↓ │ │ │ [Tool 1] [Tool 2] [Tool 3] │ │ │ generate_sql validate_sql execute_sql narrative_insights │ │ │ │ │ │ │ │ └──────────────┼──────────────┘──────────────┘ │ │ ↓ │ │ Agent Result │ │ │ │ Good for: Complex reasoning, tool chaining │ │ Output: Tool calls trace, final result │ └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ MODE 3: FULL AGENTIC PIPELINE (Recommended - Full tracing & reasoning) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Step 1: Introspect Schema │ │ ├─ reasoning: "Fetch DB structure" │ │ ├─ input: engine │ │ └─ output: tables, columns, foreign keys │ │ └─ duration_ms, error │ │ │ │ Step 2: Build Semantic Model │ │ ├─ reasoning: "Enrich schema with sample data" │ │ ├─ input: engine, schema │ │ └─ output: tables with samples (3 rows per table) │ │ └─ duration_ms, error │ │ │ │ Step 3: Generate SQL from NL │ │ ├─ reasoning: "Call OpenAI, retry up to 5x" │ │ ├─ input: user_query, semantic, dialect │ │ └─ output: SQL, attempts │ │ └─ duration_ms, error │ │ │ │ Step 4: Validate SQL Syntax │ │ ├─ reasoning: "Parse with sqlglot" │ │ ├─ input: SQL │ │ └─ output: status (valid/error) │ │ └─ duration_ms, error │ │ │ │ Step 5: Verify SQL Policy │ │ ├─ reasoning: "Ensure read-only, reject CRUD/DDL" │ │ ├─ input: SQL │ │ └─ output: status (approved/rejected) │ │ └─ duration_ms, error │ │ │ │ Step 6: Execute SQL │ │ ├─ reasoning: "Run with 1000-row limit" │ │ ├─ input: SQL │ │ └─ output: rows_returned, columns │ │ └─ duration_ms, error │ │ │ │ Step 7: Summarize Results │ │ ├─ reasoning: "Compute describe() and percentiles or aggregates" │ │ ├─ input: DataFrame │ │ └─ output: summary dict (describe, numeric_describe, head) │ │ └─ duration_ms, error │ │ │ │ Step 8: Generate Narrative Insights │ │ ├─ reasoning: "Use OpenAI to produce findings" │ │ ├─ input: summary, user_query │ │ └─ output: narrative (human-readable insights) │ │ └─ duration_ms, error │ │ │ │ Good for: Production, debugging, transparency, audit trails │ │ Output: Final result + Complete trace of all steps │ └─────────────────────────────────────────────────────────────────────────────┘ """
""" DATABASE LAYER (db.py) ├─ load_db_urls() → Load DB URLs from env JSON ├─ create_engines() → Create SQLAlchemy engines ├─ introspect_schema() → Extract tables, columns, FK, types ├─ sample_table() → Get 3-5 sample rows from a table └─ execute_read_query() → Execute query with optional row limit
SEMANTIC LAYER (semantic.py) └─ build_semantic_model() → Enrich schema with samples for LLM context
NLP→SQL GENERATION (nlp_sql.py) ├─ _call_openai_system() → Call ChatGPT API ├─ clean_response_sql() → Strip markdown fences ├─ validate_sql() → Parse with sqlglot, check syntax ├─ is_select_only() → Verify AST is SELECT-like └─ generate_sql_from_nl() → Main function: NL → SQL with retries
SAFETY/VERIFICATION (agent.py) ├─ _normalize() → Strip comments, deduplicate ├─ verify_sql_query() → Check policy (read-only, no CRUD/DDL) │ ├─ Reject disallowed keywords: INSERT, UPDATE, DELETE, DROP, etc. │ ├─ Allow: SELECT, WITH, EXPLAIN, DESCRIBE, SHOW, PRAGMA │ └─ Reject: multiple statements, empty SQL └─ Main safeguards: ├─ Keyword allowlist/blocklist ├─ sqlglot AST type checking └─ Comment injection prevention
INSIGHTS GENERATION (insights.py) ├─ summarize_dataframe() → Compute describe() or aggregates │ ├─ If n_rows ≤ 5000 and columns ≤ 100: full describe() │ └─ Else: quick aggregates (head, numeric_describe) └─ narrative_insights_from_summary() → OpenAI summarizes findings
ORCHESTRATION (agentic_pipeline.py) ├─ StepTrace dataclass → Capture step metadata ├─ AgenticPipeline class → Main orchestrator │ ├─ _trace_step() → Begin tracing │ ├─ _end_trace() → End tracing & record │ └─ run() → Execute 8-step pipeline └─ run_agentic_pipeline() → Convenience function
LANGGCHAIN INTEGRATION (agent_chain.py) ├─ _make_tools() → Create Tool objects for LangChain ├─ run_agent() → Initialize & run agent └─ Uses: Tool.from_function, ChatOpenAI, initialize_agent
STREAMLIT UI (app.py) ├─ Database selector & configuration ├─ Schema introspection display ├─ Semantic model builder ├─ NL query input ├─ Orchestration mode radio (Direct / LangChain / Pipeline) ├─ Result display (SQL, rows, summary, narrative) └─ Trace visualization (collapsible steps) """
""" Example: User Query = "How many orders were completed?"
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 1: Introspect Schema (Duration: 45ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: engine │ Output: { │ "tables": { │ "orders": { │ "columns": [ │ {"name": "id", "type": "INTEGER"}, │ {"name": "status", "type": "TEXT"}, │ ... │ ] │ }, │ ... │ } │ } │ Error: None │ Reasoning: Fetch database schema and table info └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 2: Build Semantic Model (Duration: 120ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: schema, engine, sample_rows=3 │ Output: { │ "tables": { │ "orders": { │ "columns": [...], │ "sample": { │ "id": [1, 2, 3], │ "status": ["completed", "pending", "shipped"], │ ... │ } │ } │ } │ } │ Error: None │ Reasoning: Enrich schema with sample data for LLM context └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 3: Generate SQL from NL (Duration: 850ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: { │ "query": "How many orders were completed?", │ "dialect": "sqlite" │ } │ Process: │ - Attempt 1 (Failed): Generated "SELCT COUNT() FROM orders WHERE ..." │ Error: Parse failed (typo: SELCT) │ - Attempt 2 (Success): Generated "SELECT COUNT() FROM orders WHERE status='completed'" │ Validated: ✓ Syntax OK, ✓ Is SELECT │ │ Output: { │ "sql": "SELECT COUNT(*) FROM orders WHERE status='completed'", │ "attempts": 2, │ "error": None │ } │ Reasoning: Call OpenAI, retry up to 5x if validation fails └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 4: Validate SQL Syntax (Duration: 12ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: "SELECT COUNT(*) FROM orders WHERE status='completed'" │ Process: sqlglot.parse_one() → AST validation │ Output: status = "valid" │ Error: None │ Reasoning: Use sqlglot to parse and validate structure └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 5: Verify SQL Policy (Duration: 8ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: "SELECT COUNT(*) FROM orders WHERE status='completed'" │ Process: │ - Strip comments: ✓ │ - Check first word: "SELECT" ✓ (allowed) │ - Search disallowed keywords: INSERT, UPDATE, DELETE, DROP → None found ✓ │ - Parse AST type: "select" ✓ (allowed) │ │ Output: status = "approved" │ Error: None │ Reasoning: Ensure read-only, reject CRUD/DDL keywords └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 6: Execute SQL (Duration: 34ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: { │ "sql": "SELECT COUNT() FROM orders WHERE status='completed'", │ "limit": 1000 │ } │ Output: { │ "rows_returned": 1, │ "columns": ["COUNT()"] │ } │ Result DataFrame: │ COUNT(*) │ 42 │ │ Error: None │ Reasoning: Run validated SQL with 1000-row limit └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 7: Summarize Results (Duration: 28ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: DataFrame (1 row, 1 numeric column) │ Process: │ - Row count: 1 (small) → Use full describe() │ - Numeric columns: ["COUNT()"] │ │ Output: { │ "n_rows": 1, │ "detailed_describe": { │ "COUNT()": { │ "count": 1.0, │ "mean": 42.0, │ "std": NaN, │ "min": 42.0, │ "25%": 42.0, │ "50%": 42.0, │ "75%": 42.0, │ "max": 42.0 │ } │ } │ } │ Error: None │ Reasoning: Compute describe() and percentiles or aggregates └─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐ │ STEP 8: Generate Narrative Insights (Duration: 680ms) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Input: { │ "query": "How many orders were completed?", │ "summary": { │ "n_rows": 1, │ "detailed_describe": {...} │ } │ } │ Process: Call OpenAI to summarize findings │ Output: "Based on the database analysis, 42 orders have been completed. │ This represents a solid completion rate. Consider analyzing │ pending orders to understand any bottlenecks in the order │ fulfillment process." │ Error: None │ Reasoning: Use OpenAI to produce human-readable findings └─────────────────────────────────────────────────────────────────────────────┘
FINAL RESULT: { "user_query": "How many orders were completed?", "final_result": { "sql": "SELECT COUNT(*) FROM orders WHERE status='completed'", "rows_returned": 1, "summary": { ... }, "narrative": "Based on the database analysis, ..." }, "traces": [ {"step_name": "introspect_schema", "duration_ms": 45, ...}, {"step_name": "build_semantic_model", "duration_ms": 120, ...}, {"step_name": "generate_sql", "duration_ms": 850, ...}, {"step_name": "validate_sql_syntax", "duration_ms": 12, ...}, {"step_name": "verify_sql_policy", "duration_ms": 8, ...}, {"step_name": "execute_sql", "duration_ms": 34, ...}, {"step_name": "summarize_results", "duration_ms": 28, ...}, {"step_name": "narrative_insights", "duration_ms": 680, ...} ], "error": None }
Total Pipeline Duration: ~1.8 seconds """
""" EXAMPLE 1: Direct Mode (in Streamlit) ─────────────────────────────────────
- Open http://localhost:8501
- In sidebar, enter: export DB_URLS='{"sqlite":"sqlite:////tmp/data.db"}'
- Select "Direct" mode
- Ask: "Show top 10 customers by spending"
- See: SQL, rows, summary, narrative
EXAMPLE 2: Dry-Run (Command Line) ────────────────────────────────── $ python setup_sample_db.py # Create sample DB $ python dry_run.py # Run pipeline on sample queries
Output: Query: How many orders were completed in total? Generated SQL: SELECT COUNT(*) FROM orders WHERE status='completed' Rows Returned: 1 Summary: {n_rows: 1, detailed_describe: {...}} Narrative: Based on the database analysis, 42 orders... Execution Trace: [step_name, duration_ms, error, reasoning]
EXAMPLE 3: Python API ───────────────────── from sqlalchemy import create_engine from agentic_pipeline import run_agentic_pipeline
engine = create_engine("sqlite:////tmp/data.db") result = run_agentic_pipeline("Show orders by country", engine, dialect="sqlite")
print(result["final_result"]["sql"]) # SQL generated print(result["final_result"]["narrative"]) # Insights print(result["traces"]) # Full trace
EXAMPLE 4: Testing ────────────────── $ pytest test_agent.py -v
Tests: test_select_allowed ✓ test_insert_rejected ✓ test_update_rejected ✓ test_delete_rejected ✓ test_drop_rejected ✓ test_alter_rejected ✓ test_multiple_statements_rejected ✓ test_with_clause_allowed ✓ test_explain_allowed ✓ test_describe_allowed ✓ test_show_allowed ✓ test_comment_stripping ✓ test_block_comment_stripping ✓ """
""" TRACE STRUCTURE: { "step_name": str, # E.g., "generate_sql" "input_data": Any, # Input to the step "output_data": Any, # Output from the step "error": str | None, # Error message if failed "duration_ms": float, # Execution time "reasoning": str # Why this step was executed }
INTERPRETING TRACES:
- If any step has error != None, the pipeline stopped there
- Look at duration_ms to identify slow steps (e.g., OpenAI calls)
- Check reasoning to understand agent's decision-making
- Use input_data/output_data to trace data transformations
COMMON ISSUES & DEBUGGING: ──────────────────────────── Issue: Step "generate_sql" has error "Parse error" → OpenAI generated invalid SQL. Check step 3 attempts. → Try: Simplify query, clarify schema in semantic model
Issue: Step "verify_sql_policy" has error "Disallowed keyword" → Generated SQL includes INSERT, UPDATE, DELETE, etc. → Check OpenAI system prompt, add examples
Issue: Step "execute_sql" is very slow (>5s) → Query might be doing full table scan → Check: Add LIMIT to query, add indexes, sample smaller dataset
Issue: Step "narrative_insights" has error "API rate limit" → OpenAI API rate limited → Check: API quotas, retry logic, batch requests """
""" PATTERN 1: Sync Pipeline in Web Service ──────────────────────────────────────── from fastapi import FastAPI from agentic_pipeline import run_agentic_pipeline
@app.post("/query") def query_db(query: str, db: str = "default"): engine = engines[db] result = run_agentic_pipeline(query, engine) return result
PATTERN 2: Async Pipeline with Background Jobs ──────────────────────────────────────────────── import asyncio from celery import Celery
@celery_app.task def run_query_task(query: str, db_url: str): engine = create_engine(db_url) result = run_agentic_pipeline(query, engine) # Store result in cache/DB cache.set(f"query:{query}", result)
PATTERN 3: Streaming Traces ───────────────────────────── from agentic_pipeline import AgenticPipeline
pipeline = AgenticPipeline(engine) for step in pipeline.run(query)["traces"]: print(f"Step {step['step_name']} complete in {step['duration_ms']}ms") # Stream to WebSocket, update UI in real-time
PATTERN 4: Custom Tool Integration ────────────────────────────────────
class CustomPipeline(AgenticPipeline): def run(self, user_query): # Add custom pre-processing result = super().run(user_query) # Add custom post-processing result["custom_field"] = process(result) return result """
""" For Production Deployment:
Security: [x] Validate all SQL is read-only (SELECT/DESCRIBE/EXPLAIN) [x] Reject CRUD/DDL keywords (INSERT, UPDATE, DELETE, DROP, ALTER) [x] Strip comments to prevent injection bypass [x] Reject multiple statements [ ] Use read-only DB user account [ ] Add query timeout (e.g., 30s max) [ ] Add result row limit (e.g., 10,000 max) [ ] Enable query logging & audit trail [ ] Add IP whitelist for DB connections
API Safety: [ ] Rate limit OpenAI API calls (cache results) [ ] Implement request/response logging [ ] Add request signing (HMAC, JWT) [ ] Use API key rotation [ ] Monitor for cost overruns
Data: [ ] Never log full query results (PII) [ ] Encrypt sensitive data in transit (TLS) [ ] Encrypt database passwords in env vars (Vault, SecretsManager) [ ] Implement row-level security (RLS) filters
Monitoring: [ ] Alert on failed validations [ ] Track pipeline execution times [ ] Monitor OpenAI API usage & cost [ ] Set up error tracking (Sentry) [ ] Log all schema introspections """