-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhaystack_rag.py
More file actions
180 lines (142 loc) Β· 5.76 KB
/
haystack_rag.py
File metadata and controls
180 lines (142 loc) Β· 5.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""
RAG System using Haystack Pipeline
A proper implementation using Haystack's document store and retriever.
"""
import os
from haystack import Pipeline
from haystack.document_stores import InMemoryDocumentStore
from haystack.nodes import BM25Retriever, TextConverter, PreProcessor
from haystack.utils import print_answers
class HaystackRAG:
"""
RAG system using Haystack pipeline components.
"""
def __init__(self, document_dir: str = "documents"):
"""Initialize the Haystack RAG system."""
self.document_dir = document_dir
self.document_store = None
self.retriever = None
self.pipeline = None
self.setup_pipeline()
def setup_pipeline(self):
"""Set up the Haystack pipeline."""
try:
print("π§ Setting up Haystack pipeline...")
# Create document store
self.document_store = InMemoryDocumentStore(use_bm25=True)
# Create retriever
self.retriever = BM25Retriever(document_store=self.document_store)
# Create pipeline
self.pipeline = Pipeline()
self.pipeline.add_node(component=self.retriever, name="Retriever", inputs=["Query"])
print("β
Haystack pipeline setup completed!")
except Exception as e:
print(f"β Error setting up pipeline: {e}")
print("π‘ Make sure Haystack is properly installed")
def load_documents(self):
"""Load documents from directory."""
if not os.path.exists(self.document_dir):
print(f"β Document directory '{self.document_dir}' not found!")
return
print("π Loading documents...")
documents = []
for filename in os.listdir(self.document_dir):
if filename.endswith('.txt'):
file_path = os.path.join(self.document_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
documents.append({
'content': content,
'meta': {'filename': filename, 'source': file_path}
})
print(f"β Loaded: {filename}")
except Exception as e:
print(f"β Error loading {filename}: {e}")
if documents:
# Add documents to document store
self.document_store.write_documents(documents)
print(f"β
Successfully indexed {len(documents)} documents!")
else:
print("β No documents found to index!")
def search(self, query: str, top_k: int = 3):
"""
Search for relevant documents using Haystack.
Args:
query: Search query
top_k: Number of results to return
Returns:
Search results
"""
try:
results = self.pipeline.run(query=query, params={"Retriever": {"top_k": top_k}})
return results
except Exception as e:
print(f"β Error during search: {e}")
return None
def ask_question(self, question: str, top_k: int = 3):
"""
Ask a question and get an answer using Haystack RAG.
Args:
question: The question to ask
top_k: Number of documents to retrieve
Returns:
Dictionary containing the answer and relevant documents
"""
print(f"\nβ Question: {question}")
print("-" * 60)
# Search for relevant documents
results = self.search(question, top_k)
if not results or not results.get('documents'):
print("β No relevant documents found.")
return {
'answer': "I couldn't find any relevant information to answer your question.",
'documents': [],
'sources': []
}
# Extract documents and sources
documents = results.get('documents', [])
sources = [doc.meta.get('filename', 'Unknown') for doc in documents]
# Generate simple answer
context_parts = []
for doc in documents:
context_parts.append(f"From {doc.meta.get('filename', 'Unknown')}:\n{doc.content[:300]}...")
context = "\n\n".join(context_parts)
answer = f"Based on the available information:\n\n{context}"
print(f"π€ Answer: {answer}")
print(f"π Sources: {', '.join(sources)}")
return {
'answer': answer,
'documents': documents,
'sources': sources
}
def main():
"""Main function to demonstrate the Haystack RAG system."""
print("π€ Haystack RAG System Demo")
print("=" * 50)
# Initialize RAG system
rag = HaystackRAG(document_dir="documents")
if not rag.pipeline:
print("β Pipeline setup failed. Please check your Haystack installation.")
return
# Load documents
rag.load_documents()
if not rag.document_store.get_document_count():
print("β No documents loaded. Please check your documents directory.")
return
# Demo questions
questions = [
"What is artificial intelligence?",
"What are neural networks?",
"What is machine learning?",
"How does deep learning work?",
"What are the applications of AI?"
]
print("\n" + "="*60)
print("HAYSTACK RAG DEMO")
print("="*60)
for question in questions:
result = rag.ask_question(question)
print("\n" + "="*60)
if __name__ == "__main__":
main()