The Orchestration module coordinates the end-to-end research query pipeline using LangChain and Google Gemini. It handles document retrieval, context formatting, response generation, citation extraction, and conversation memory management.
multi_modal_rag/orchestration/
├── research_orchestrator.py # Main query pipeline
└── citation_tracker.py # Citation management
File: multi_modal_rag/orchestration/research_orchestrator.py
Orchestrates complex research queries by combining OpenSearch retrieval with LangChain-powered generation. Provides citation tracking, conversation memory, and related query suggestions.
from multi_modal_rag.orchestration import ResearchOrchestrator
from multi_modal_rag.indexing import OpenSearchManager
opensearch = OpenSearchManager()
orchestrator = ResearchOrchestrator(
gemini_api_key="YOUR_API_KEY",
opensearch_manager=opensearch
)Parameters:
gemini_api_key(str): Google Gemini API keyopensearch_manager(OpenSearchManager): Configured OpenSearch manager
Components Initialized:
- LangChain ChatGoogleGenerativeAI with
gemini-2.0-flashmodel - ConversationBufferMemory for chat history
- Temperature set to 0.3 (balanced creativity/accuracy)
Creates a LangChain prompt template for research queries.
Returns: PromptTemplate configured for research assistance
Prompt Structure:
template = """
You are a research assistant analyzing multi-modal academic content.
Context from various sources:
{context}
Previous conversation:
{chat_history}
Question: {question}
Instructions:
1. Provide a comprehensive answer based on the context
2. Cite sources using [Author, Year] format
3. Mention if information comes from videos or podcasts
4. Highlight any diagrams or visual content that supports the answer
5. Suggest related topics for further exploration
Answer:
"""Input Variables:
context: Formatted search results with citationschat_history: Previous conversation messagesquestion: User's research query
Example:
orchestrator = ResearchOrchestrator(api_key, opensearch_manager)
prompt = orchestrator.create_research_chain()
# Manually format prompt
formatted = prompt.format(
context="Context here...",
chat_history="Previous Q&A...",
question="What is attention mechanism?"
)
print(formatted)Main pipeline for processing research queries.
Parameters:
query(str): User's research questionindex_name(str): OpenSearch index to search (typically"research_assistant")
Returns: Dictionary with results:
{
'answer': str, # Generated response
'citations': List[Dict], # Extracted citations
'source_documents': List[Dict], # Retrieved documents
'related_queries': List[str] # Suggested follow-up queries
}Pipeline Steps:
- Retrieval: Search OpenSearch for relevant documents
- Context Formatting: Format results with citation markers
- Generation: Generate response using Gemini
- Citation Extraction: Extract citations from response
- Memory Update: Save to conversation history
- Related Queries: Generate follow-up suggestions
Example:
from multi_modal_rag.orchestration import ResearchOrchestrator
from multi_modal_rag.indexing import OpenSearchManager
# Initialize
opensearch = OpenSearchManager()
orchestrator = ResearchOrchestrator("your_api_key", opensearch)
# Process query
result = orchestrator.process_query(
query="How do transformers use attention mechanisms?",
index_name="research_assistant"
)
# Access results
print("Answer:")
print(result['answer'])
print("\nCitations:")
for citation in result['citations']:
print(f" - {citation['citation_text']}: {citation['title']}")
print("\nSource Documents:")
for doc in result['source_documents']:
print(f" - {doc['source']['title']} (score: {doc['score']})")
print("\nRelated Queries:")
for query in result['related_queries']:
print(f" - {query}")Output Example:
{
'answer': '''
Transformers use attention mechanisms to process sequences without recurrence.
The self-attention mechanism [Vaswani, 2017] allows each position to attend to
all positions in the previous layer. This is visualized in the architecture
diagram showing multi-head attention layers. The model computes attention
scores between query and key vectors, then uses these to weight value vectors.
Related video content [Video: Illustrated Transformer, 3Blue1Brown] provides
excellent visualizations of how attention weights are calculated.
''',
'citations': [
{
'citation_text': ('Vaswani', '2017'),
'source': {...},
'content_type': 'paper',
'url': 'https://arxiv.org/abs/1706.03762',
'title': 'Attention Is All You Need'
},
{
'citation_text': ('Illustrated Transformer, 3Blue1Brown',),
'source': {...},
'content_type': 'video',
'url': 'https://youtube.com/watch?v=...',
'title': 'Illustrated Transformer'
}
],
'source_documents': [
{
'score': 15.42,
'source': {
'title': 'Attention Is All You Need',
'content_type': 'paper',
'authors': ['Vaswani', 'Shazeer', ...],
...
}
},
...
],
'related_queries': [
'What are the different types of attention mechanisms?',
'How does multi-head attention improve model performance?',
'What are the computational costs of attention?',
'How do transformers compare to RNNs?',
'What are recent improvements to attention mechanisms?'
]
}Formats search results into context string with citation markers.
Parameters:
search_results(List[Dict]): Results from OpenSearch hybrid_search()
Returns: Formatted context string
Citation Formats:
- Papers:
[FirstAuthor, Year](e.g.,[Vaswani, 2017]) - Videos:
[Video: Channel, Title...](e.g.,[Video: 3Blue1Brown, Neural Networks...]) - Podcasts:
[Podcast: Title...](e.g.,[Podcast: Lex Fridman discusses...])
Example:
search_results = [
{
'score': 15.42,
'source': {
'content_type': 'paper',
'title': 'Attention Is All You Need',
'authors': ['Ashish Vaswani', 'Noam Shazeer'],
'publication_date': '2017-06-12',
'abstract': 'The dominant sequence transduction models...',
'diagram_descriptions': 'Architecture diagram showing...'
}
}
]
context = orchestrator.format_context_with_citations(search_results)
print(context)Output:
Source 1 [Vaswani, 2017]:
Title: Attention Is All You Need
Type: paper
Content: The dominant sequence transduction models are based on complex...
Visual Content: Architecture diagram showing encoder-decoder structure with...
Full Context Structure:
"""
Source 1 [Citation]:
Title: {title}
Type: {content_type}
Content: {first_500_chars}...
Visual Content: {diagram_descriptions}...
Source 2 [Citation]:
Title: {title}
...
"""Extracts citations from generated response and matches them to source documents.
Parameters:
response(str): Generated answer textsearch_results(List[Dict]): Original search results
Returns: List of citation dictionaries
Citation Patterns (Regex):
\[([^,\]]+),\s*(\d{4})\]- Paper citations: [Author, Year]\[Video:\s*([^\]]+)\]- Video citations: [Video: Title]\[Podcast:\s*([^\]]+)\]- Podcast citations: [Podcast: Title]
Example:
response = """
Transformers were introduced by [Vaswani, 2017] and explained visually
in [Video: Illustrated Transformer]. The podcast [Podcast: AI Explained]
also covers this topic.
"""
citations = orchestrator.extract_citations(response, search_results)
for citation in citations:
print(f"Found citation: {citation['citation_text']}")
print(f" Matched to: {citation['title']}")
print(f" Type: {citation['content_type']}")
print(f" URL: {citation['url']}")Output:
[
{
'citation_text': ('Vaswani', '2017'),
'source': {...}, # Full source document
'content_type': 'paper',
'url': 'https://arxiv.org/abs/1706.03762',
'title': 'Attention Is All You Need'
},
{
'citation_text': ('Illustrated Transformer',),
'source': {...},
'content_type': 'video',
'url': 'https://youtube.com/watch?v=...',
'title': 'Illustrated Transformer'
},
{
'citation_text': ('AI Explained',),
'source': {...},
'content_type': 'podcast',
'url': 'https://...',
'title': 'AI Explained Episode'
}
]Checks if a citation regex match corresponds to a source document.
Parameters:
citation_match(tuple): Regex match groupssource(Dict): Source document
Returns: Boolean indicating match
Matching Logic:
- Papers: Match author name in first author
- Videos: Match text in title
- Podcasts: Match text in title
Example:
# Paper citation match
citation_match = ('Vaswani', '2017')
source = {
'content_type': 'paper',
'authors': ['Ashish Vaswani', 'Noam Shazeer']
}
matches = orchestrator.citation_matches_source(citation_match, source)
# Returns: True (Vaswani in first author)
# Video citation match
citation_match = ('Illustrated Transformer',)
source = {
'content_type': 'video',
'title': 'The Illustrated Transformer'
}
matches = orchestrator.citation_matches_source(citation_match, source)
# Returns: True (text in title)Generates related research queries based on original query and response.
Parameters:
original_query(str): User's original questionresponse(str): Generated answer (first 500 chars used)
Returns: List of 5 related query strings
Prompt Template:
prompt = f"""
Based on this research query: "{original_query}"
And this response: "{response[:500]}..."
Generate 5 related research questions that would deepen understanding of this topic.
Format as a JSON list.
"""Example:
original = "How do transformers work?"
response = "Transformers use self-attention mechanisms to process sequences..."
related = orchestrator.generate_related_queries(original, response)
for query in related:
print(f" • {query}")Output:
[
"What are the key differences between transformers and RNNs?",
"How does multi-head attention improve model performance?",
"What are the computational requirements of transformer models?",
"How are transformers applied to computer vision tasks?",
"What are recent innovations in transformer architectures?"
]Fallback (if JSON parsing fails):
[
f"What are the key concepts in {original_query}?",
f"How does {original_query} relate to current research?",
f"What are recent developments in {original_query}?"
]File: multi_modal_rag/orchestration/citation_tracker.py
Tracks and manages citations across research sessions. Provides citation analytics, usage history, and bibliography export in multiple formats.
from multi_modal_rag.orchestration import CitationTracker
tracker = CitationTracker(storage_path="data/citations.json")Parameters:
storage_path(str, optional): Path to JSON storage file. Default:"data/citations.json"
Storage Structure:
{
"papers": {
"citation_id_1": {
"title": "Attention Is All You Need",
"authors": ["Ashish Vaswani", "Noam Shazeer"],
"url": "https://arxiv.org/abs/1706.03762",
"first_used": "2024-10-02T14:30:00",
"use_count": 5,
"queries": [
{"query": "How do transformers work?", "timestamp": "2024-10-02T14:30:00"},
...
]
}
},
"videos": {...},
"podcasts": {...},
"usage_history": [
{
"citation_id": "citation_id_1",
"content_type": "paper",
"query": "How do transformers work?",
"timestamp": "2024-10-02T14:30:00"
},
...
]
}Loads existing citations from storage.
Returns: Dictionary with citations or empty structure if file doesn't exist
Example:
tracker = CitationTracker("data/citations.json")
# Automatically loads on initialization
# Manual reload
citations = tracker.load_citations()
print(f"Papers: {len(citations['papers'])}")
print(f"Videos: {len(citations['videos'])}")Saves citations to storage file.
Example:
tracker = CitationTracker()
# Modify citations
tracker.citations['papers']['new_id'] = {...}
# Save changes
tracker.save_citations()Auto-save: Called automatically by add_citation()
Adds a new citation with usage tracking.
Parameters:
citation(Dict): Citation informationquery(str): Query that generated this citation
Citation Structure:
{
'content_type': 'paper', # or 'video', 'podcast'
'title': str,
'authors': List[str],
'url': str
}Example:
tracker = CitationTracker()
citation = {
'content_type': 'paper',
'title': 'Attention Is All You Need',
'authors': ['Ashish Vaswani', 'Noam Shazeer'],
'url': 'https://arxiv.org/abs/1706.03762'
}
tracker.add_citation(citation, query="How do transformers work?")Tracking Features:
- Generates unique ID for citation (MD5 hash of title + URL)
- Increments use count if citation exists
- Stores query and timestamp for each use
- Adds to usage history
- Auto-saves to storage
Generates unique ID for citation.
Parameters:
citation(Dict): Citation dictionary
Returns: 12-character MD5 hash
Example:
citation = {
'title': 'Attention Is All You Need',
'url': 'https://arxiv.org/abs/1706.03762'
}
citation_id = tracker.generate_citation_id(citation)
print(citation_id) # e.g., "a1b2c3d4e5f6"Generates citation usage report.
Returns: Dictionary with statistics:
{
'total_papers': int,
'total_videos': int,
'total_podcasts': int,
'most_cited': List[Dict], # Top 5 most used
'recent_citations': List[Dict] # Last 10 citations
}Example:
tracker = CitationTracker()
report = tracker.get_citation_report()
print(f"Total Papers: {report['total_papers']}")
print(f"Total Videos: {report['total_videos']}")
print("\nMost Cited Sources:")
for item in report['most_cited']:
print(f" {item['title']} - used {item['use_count']} times")
print("\nRecent Citations:")
for item in report['recent_citations']:
print(f" {item['query']} at {item['timestamp']}")Gets most frequently cited sources.
Parameters:
n(int, optional): Number of results. Default: 5
Returns: List of citation dictionaries sorted by use count
Example:
top_citations = tracker.get_most_cited(n=10)
for i, citation in enumerate(top_citations, 1):
print(f"{i}. {citation['title']}")
print(f" Type: {citation['type']}, Used: {citation['use_count']} times")Gets most recent citations.
Parameters:
n(int, optional): Number of results. Default: 10
Returns: List of recent citation events (reversed chronological order)
Example:
recent = tracker.get_recent_citations(n=5)
for citation in recent:
print(f"Query: {citation['query']}")
print(f"ID: {citation['citation_id']}")
print(f"Time: {citation['timestamp']}")
print()Exports citations in standard bibliography format.
Parameters:
format(str, optional): Export format ('bibtex', 'apa', 'json'). Default: 'bibtex'
Returns: Formatted bibliography string
Supported Formats:
'bibtex': BibTeX format'apa': APA citation format'json': JSON export
Example:
tracker = CitationTracker()
# BibTeX export
bibtex = tracker.export_bibliography('bibtex')
print(bibtex)
# APA export
apa = tracker.export_bibliography('apa')
print(apa)
# JSON export
json_export = tracker.export_bibliography('json')
print(json_export)Exports citations in BibTeX format.
Returns: BibTeX-formatted string
Example Output:
@article{a1b2c3d4e5f6,
title={Attention Is All You Need},
author={Ashish Vaswani and Noam Shazeer and Niki Parmar},
year={2017},
url={https://arxiv.org/abs/1706.03762}
}
@article{b2c3d4e5f6g7,
title={BERT: Pre-training of Deep Bidirectional Transformers},
author={Jacob Devlin and Ming-Wei Chang},
year={2018},
url={https://arxiv.org/abs/1810.04805}
}Usage:
bibtex = tracker.export_bibtex()
# Save to file
with open('references.bib', 'w') as f:
f.write(bibtex)
# Use in LaTeX
# \bibliography{references}Exports citations in APA format.
Returns: APA-formatted string
Example Output:
Devlin, J. et al. (2018). BERT: Pre-training of Deep Bidirectional Transformers. Retrieved from https://arxiv.org/abs/1810.04805
Vaswani, A. et al. (2017). Attention Is All You Need. Retrieved from https://arxiv.org/abs/1706.03762
Format Details:
- Single author:
LastName, F. (Year). - Multiple authors:
FirstAuthor et al. (Year). - No date: Uses
n.d. - Sorted alphabetically
from multi_modal_rag.orchestration import ResearchOrchestrator, CitationTracker
from multi_modal_rag.indexing import OpenSearchManager
# Initialize components
opensearch = OpenSearchManager()
orchestrator = ResearchOrchestrator("gemini_api_key", opensearch)
tracker = CitationTracker()
# Process query
query = "How do attention mechanisms work in transformers?"
result = orchestrator.process_query(query, "research_assistant")
# Display answer
print("Answer:")
print(result['answer'])
print()
# Track citations
for citation in result['citations']:
tracker.add_citation(citation, query)
# View citation report
report = tracker.get_citation_report()
print(f"Total citations tracked: {sum([report['total_papers'], report['total_videos'], report['total_podcasts']])}")
# Export bibliography
bibtex = tracker.export_bibliography('bibtex')
with open('references.bib', 'w') as f:
f.write(bibtex)orchestrator = ResearchOrchestrator("api_key", opensearch_manager)
queries = [
"What is a transformer model?",
"How does self-attention work?",
"What are the advantages over RNNs?"
]
for query in queries:
result = orchestrator.process_query(query, "research_assistant")
print(f"Q: {query}")
print(f"A: {result['answer']}\n")
# Memory is preserved across queries
# Later queries can reference earlier answerstracker = CitationTracker()
# Get analytics
report = tracker.get_citation_report()
print("=== Citation Analytics ===\n")
print("Most Cited Sources:")
for i, item in enumerate(report['most_cited'][:5], 1):
print(f"{i}. {item['title']} ({item['type']})")
print(f" Used {item['use_count']} times")
print()
print("Citation Distribution:")
print(f" Papers: {report['total_papers']}")
print(f" Videos: {report['total_videos']}")
print(f" Podcasts: {report['total_podcasts']}")
# Export for different uses
bibtex = tracker.export_bibliography('bibtex')
apa = tracker.export_bibliography('apa')
with open('references.bib', 'w') as f:
f.write(bibtex)
with open('references.txt', 'w') as f:
f.write(apa)LLM:
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
google_api_key=api_key,
temperature=0.3,
convert_system_message_to_human=True
)Memory:
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)Prompts:
from langchain.prompts import PromptTemplate
prompt = PromptTemplate(
template=template_string,
input_variables=["context", "chat_history", "question"]
)# Format prompt
full_prompt = prompt.format(
context=formatted_context,
chat_history=str(memory.chat_memory),
question=query
)
# Generate response
response = llm.invoke(full_prompt).content
# Update memory
memory.save_context(
{"input": query},
{"output": response}
)try:
result = orchestrator.process_query(query, index_name)
except Exception as e:
logger.error(f"Error processing query: {e}")
result = {
'answer': f"Error processing query: {str(e)}",
'citations': [],
'source_documents': [],
'related_queries': []
}try:
tracker.add_citation(citation, query)
except Exception as e:
logger.error(f"Error tracking citation: {e}")
# Continue without tracking (non-critical)from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.memory import ConversationBufferMemory
from langchain.schema import DocumentInstallation:
pip install langchain langchain-google-genaiError: json.decoder.JSONDecodeError
Cause: LLM doesn't always return valid JSON
Solution: Use fallback queries (already implemented)
Cause: Response doesn't match regex patterns
Solution: Modify patterns or prompt engineering:
# Add to prompt
"IMPORTANT: Cite sources using exactly this format: [Author, Year]"Cause: Long conversation history
Solution: Use ConversationSummaryMemory or limit messages:
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history"
)Query Processing Time:
- OpenSearch retrieval: 10-50ms
- Context formatting: 1-5ms
- Gemini generation: 2-5 seconds
- Citation extraction: 10-50ms
- Related query generation: 2-3 seconds
- Total: ~4-8 seconds per query
Optimization Tips:
- Cache frequent queries
- Reduce retrieval count (k=5 instead of k=10)
- Limit context size (truncate long documents)
- Skip related query generation for faster responses