-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdry_run.py
More file actions
86 lines (67 loc) · 2.5 KB
/
dry_run.py
File metadata and controls
86 lines (67 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
Dry-run example of the agentic pipeline.
This demonstrates the full orchestration without needing Streamlit.
Run with: python dry_run.py
"""
import os
import json
import sys
from sqlalchemy import create_engine
# Ensure OPENAI_API_KEY is set for this demo
if not os.environ.get("OPENAI_API_KEY"):
print("ERROR: OPENAI_API_KEY environment variable not set")
print("Please set it: export OPENAI_API_KEY='sk-...'")
sys.exit(1)
from agentic_pipeline import run_agentic_pipeline
# Point to the sample SQLite database
DB_URL = "sqlite:////workspaces/nattu22/data.db"
# Example queries to run
EXAMPLE_QUERIES = [
"How many orders were completed in total?",
"What is the average order value by country?",
"List all products that are low in stock (less than 100 units)",
"What are the top 3 customers by spending?",
]
def pretty_print_result(result: dict):
"""Pretty-print the pipeline result."""
print("\n" + "=" * 80)
print(f"QUERY: {result['user_query']}")
print("=" * 80)
if result.get("error"):
print(f"❌ ERROR: {result['error']}")
return
# Print final result
if result.get("final_result"):
final = result["final_result"]
print(f"\n📊 SQL Generated:\n{final['sql']}\n")
print(f"📈 Rows Returned: {final['rows_returned']}")
print(f"\n📝 Summary:\n{json.dumps(final['summary'], indent=2)}")
print(f"\n💡 Narrative Insights:\n{final['narrative']}")
# Print trace summary
print(f"\n🔍 Execution Trace ({len(result['traces'])} steps):")
for i, trace in enumerate(result["traces"], 1):
status = "✓" if not trace["error"] else "✗"
print(
f" {i}. {status} {trace['step_name']} ({trace['duration_ms']:.1f}ms)"
)
if trace["error"]:
print(f" Error: {trace['error']}")
if trace["reasoning"]:
print(f" Reasoning: {trace['reasoning']}")
print("\n" + "=" * 80)
def main():
"""Run dry-run examples."""
# Create engine from sample DB
engine = create_engine(DB_URL)
print("🚀 Agentic Pipeline Dry-Run")
print(f"Database: {DB_URL}")
print(f"Dialect: sqlite")
print()
# Run a few example queries
for query in EXAMPLE_QUERIES[:1]: # Run just the first one for quick demo
print(f"\n▶️ Running: {query}")
result = run_agentic_pipeline(query, engine, dialect="sqlite")
pretty_print_result(result)
print("\n✅ Dry-run completed!")
if __name__ == "__main__":
main()