A modular workflow framework for AI-powered information synthesis tasks.
Many AI agent use cases follow the same fundamental pattern: analyze data from various sources and produce summaries, reports, or high-level overviews. Whether you're building a deep research tool, a RAG-based Q&A system, or any information synthesis application, the core workflow remains consistent:
- Extract - Gather information from your data source
- Summarize - Create consolidated summaries from the gathered information
- Reflect - Evaluate completeness and decide if more information is needed
DeepThink provides this standard workflow as a reusable, modular graph. You bring your data source, and DeepThink handles the rest.
pip install git+https://github.com/your-username/deep-think.gitimport asyncio
from pydantic_graph import GraphRunContext
from deep_think import DeepThink, DeepThinkState, InformationSourceType
async def my_data_source(
context: GraphRunContext[DeepThinkState], query: str
) -> list[str]:
"""Your custom data gathering function."""
# On subsequent calls, use the refined query from reflection
if context.state.reflection_query:
query = context.state.reflection_query
# Fetch data from your source (web search, vector DB, API, etc.)
results = await fetch_from_your_source(query)
return [r.content for r in results]
async def main():
deep_think = DeepThink(
gather_fn=my_data_source,
information_source_type=InformationSourceType.WEB_SEARCH,
reflection_limit=2,
)
result = await deep_think.run(query="What are the latest AI developments?")
print(result.output)
asyncio.run(main()) ┌──────────────────────────────────────┐
│ │
▼ │
┌─────────────────┐ │
│ GatherInfo │ ← You provide the │
│ (your source) │ underlying search │
└────────┬────────┘ functionality │
│ │
▼ │
┌─────────────────┐ │
│ Summarize │ ← Running summary │
│ │ │
└────────┬────────┘ │
│ │
▼ │
┌─────────────────┐ │
│ Reflect │──── Need more? ─────────────┘
│ │
└────────┬────────┘
│
│ (complete)
▼
┌─────────────────┐
│ Final Output │
└─────────────────┘
The graph automatically:
- Summarizes multiple information chunks asynchronously
- Maintains a running summary across reflection cycles
- Uses the reflection agent to identify gaps and generate optimized follow-up queries
- Adapts query format based on your information source type (web search vs vector DB)
| Parameter | Type | Default | Description |
|---|---|---|---|
gather_fn |
Callable |
Required | Async function that fetches information from your data source |
information_source_type |
InformationSourceType |
WEB_SEARCH |
Type of source (WEB_SEARCH or VECTOR_DB) |
reflection_limit |
int |
2 |
Maximum reflection cycles before completing |
The reflection agent optimizes its follow-up queries based on your source type:
- WEB_SEARCH: Generates keyword-focused search queries with operators
- VECTOR_DB: Generates semantic, concept-focused queries for embedding similarity
Model names follow the pydantic-ai model format: provider:model-name. You must also set the corresponding provider API key.
# Model configuration (pydantic-ai format: provider:model-name)
SUMMARIZER_MODEL_NAME="openai:gpt-4.1"
REFLECTION_MODEL_NAME="openai:gpt-4.1"
# Provider API key (required for the provider you're using)
OPENAI_API_KEY="sk-..."
# Or for other providers:
# ANTHROPIC_API_KEY="sk-..."
# GOOGLE_API_KEY="..."Supported providers include openai, anthropic, google, groq, mistral, and more. See the pydantic-ai documentation for the full list.
Your gather function is the only customization point. It must:
- Accept
context: GraphRunContext[DeepThinkState]as the first argument - Accept any additional keyword arguments you need
- Return
list[str]- a list of text chunks from your source - Be async
The context gives you access to the state, including:
context.state.reflection_query- The refined query from the reflection agent (use this on subsequent calls)context.state.reflection_reasoning- Why the reflection agent requested more information
from ddgs import DDGS
async def web_search(
context: GraphRunContext[DeepThinkState], query: str, max_results: int = 5
) -> list[str]:
if context.state.reflection_query:
query = context.state.reflection_query
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
return [f"{r['title']}\n{r['body']}" for r in results]async def vector_search(
context: GraphRunContext[DeepThinkState], query: str, collection: str
) -> list[str]:
if context.state.reflection_query:
query = context.state.reflection_query
results = await vector_db.search(collection, query, limit=10)
return [doc.content for doc in results]The run() method returns a DeepThinkResult:
result = await deep_think.run(query="...")
result.output # The final summary/report
result.reflections_performed # Number of reflection cycles that occurred
result.state # Full state object with intermediate data