-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuild_graph.py
More file actions
executable file
·437 lines (370 loc) · 15.6 KB
/
build_graph.py
File metadata and controls
executable file
·437 lines (370 loc) · 15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
#!/usr/bin/env python3
"""
Build Temporal GraphRAG knowledge graph from documents.
This script:
1. Loads documents from various sources (ECT-QA corpus, text files, or directories)
2. Creates TemporalGraphRAG from config.yaml (uses tgrag.create_temporal_graphrag_from_config)
3. Builds the temporal knowledge graph
4. Saves everything to the output directory
Usage:
# Set API keys (provider-specific)
export OPENAI_API_KEY="your-key-here" # For OpenAI provider
export GEMINI_API_KEY="your-key-here" # For Gemini provider
# etc.
# Run with default config (from tgrag/configs/config.yaml)
python build_graph.py --output_dir ./graph_output --num_docs 3
# Build from a single text file
python build_graph.py --output_dir ./graph_output --corpus_path ./my_document.txt
# Build from a directory of text files
python build_graph.py --output_dir ./graph_output --corpus_path ./my_documents/
# Override config values
python build_graph.py --output_dir ./graph_output --num_docs 3 --chunk_size 1000
"""
import os
import sys
import json
import gzip
import argparse
import logging
from pathlib import Path
from typing import List, Dict
from dotenv import load_dotenv
# Configure logging - default to ERROR to reduce noise, but allow DEBUG via environment variable
debug_mode = os.getenv("TG_RAG_DEBUG", "false").lower() == "true"
log_level = logging.DEBUG if debug_mode else logging.ERROR
logging.basicConfig(
level=log_level,
format='%(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
if debug_mode:
print("🔍 Debug mode enabled - verbose logging active")
# Load environment variables from .env file if present
load_dotenv()
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
# Import from tgrag package (simplified API)
from tgrag import create_temporal_graphrag_from_config
def load_documents_from_corpus(corpus_path: Path, num_docs: int = 3) -> List[Dict]:
"""
Load documents from the ECT-QA corpus.
Args:
corpus_path: Path to the corpus file (base.jsonl.gz)
num_docs: Number of documents to load
Returns:
List of document dictionaries
"""
if not corpus_path.exists():
raise FileNotFoundError(f"Corpus file not found: {corpus_path}")
documents = []
try:
with gzip.open(corpus_path, 'rt', encoding='utf-8') as f:
for i, line in enumerate(f):
if i >= num_docs:
break
doc = json.loads(line)
documents.append(doc)
print(f"✅ Loaded {len(documents)} documents from corpus")
return documents
except Exception as e:
raise RuntimeError(f"Error loading corpus: {e}")
def load_documents_from_txt_file(txt_path: Path) -> List[Dict]:
"""
Load a single text file as a document.
Supports common text formats: .txt, .md, .rst, .text, .log, and files without extensions.
Args:
txt_path: Path to the text file
Returns:
List containing a single document dictionary
"""
if not txt_path.exists():
raise FileNotFoundError(f"Text file not found: {txt_path}")
try:
with open(txt_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if not content:
print(f"⚠️ Warning: File {txt_path} is empty, skipping")
return []
# Use filename (without extension) as title
title = txt_path.stem if txt_path.suffix else txt_path.name
return [{
'title': title,
'doc': content
}]
except UnicodeDecodeError:
raise RuntimeError(f"File {txt_path} is not a valid text file (binary?)")
except Exception as e:
raise RuntimeError(f"Error loading text file {txt_path}: {e}")
def load_documents_from_txt_directory(txt_dir: Path) -> List[Dict]:
"""
Load all text-based files from a directory as documents.
Supports common text formats: .txt, .md, .rst, .text, .log, and files without extensions.
Other file types are ignored.
Args:
txt_dir: Path to the directory containing text files
Returns:
List of document dictionaries
"""
if not txt_dir.exists():
raise FileNotFoundError(f"Directory not found: {txt_dir}")
if not txt_dir.is_dir():
raise ValueError(f"Path is not a directory: {txt_dir}")
# Supported text file extensions
TEXT_EXTENSIONS = {'.txt', '.md', '.rst', '.text', '.log', ''}
# Find all text files recursively
all_files = list(txt_dir.rglob("*"))
text_files = [
f for f in all_files
if f.is_file() and (f.suffix.lower() in TEXT_EXTENSIONS or f.suffix == '')
]
if not text_files:
# Check if there are any files at all to provide a helpful error message
non_dir_files = [f for f in all_files if f.is_file()]
if non_dir_files:
file_extensions = {f.suffix for f in non_dir_files if f.suffix}
raise ValueError(
f"No supported text files found in directory: {txt_dir}\n"
f"Found {len(non_dir_files)} file(s) with extension(s): {', '.join(sorted(file_extensions)) or 'none'}\n"
f"Supported extensions: {', '.join(sorted(TEXT_EXTENSIONS - {''})) or 'none'} (and files without extensions)"
)
else:
raise ValueError(f"No text files found in directory: {txt_dir}")
# Count non-text files for informational message
non_text_files = [f for f in all_files if f.is_file() and f.suffix.lower() not in TEXT_EXTENSIONS and f.suffix != '']
if non_text_files:
print(f"ℹ️ Found {len(non_text_files)} non-text file(s) in directory (ignored)")
documents = []
for text_file in sorted(text_files):
try:
with open(text_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
if not content:
print(f"⚠️ Warning: File {text_file} is empty, skipping")
continue
# Use relative path from txt_dir as title (preserves subdirectory structure)
rel_path = text_file.relative_to(txt_dir)
# Remove extension for title, but keep the path structure
title = str(rel_path.with_suffix('')) if rel_path.suffix else str(rel_path)
documents.append({
'title': title,
'doc': content
})
except UnicodeDecodeError:
print(f"⚠️ Warning: File {text_file} is not a valid text file (binary?), skipping")
continue
except Exception as e:
print(f"⚠️ Warning: Error loading {text_file}: {e}, skipping")
continue
print(f"✅ Loaded {len(documents)} documents from {len(text_files)} text files")
return documents
def prepare_documents_for_insertion(documents: List[Dict]) -> List[Dict]:
"""
Convert documents to the format expected by TemporalGraphRAG.insert().
Automatically detects the document format and processes accordingly.
Args:
documents: List of documents (either from corpus or txt files)
Returns:
List of documents in format {"title": str, "doc": str}
"""
if not documents:
return []
# Auto-detect format: check if first document has 'title' and 'doc' keys (text format)
# or 'cleaned_content'/'raw_content' keys (corpus format)
first_doc = documents[0]
is_corpus_format = 'cleaned_content' in first_doc or 'raw_content' in first_doc
if not is_corpus_format:
# Already in the correct format (from txt files)
# Just validate and return
for doc in documents:
if 'title' not in doc or 'doc' not in doc:
raise ValueError(f"Document missing required keys 'title' or 'doc': {list(doc.keys())}")
return documents
# Process corpus format documents
prepared_docs = []
for doc in documents:
content = doc.get('cleaned_content', doc.get('raw_content', ''))
if not content:
print(f"⚠️ Warning: Document {doc.get('company_name', 'Unknown')} has no content, skipping")
continue
# Create a descriptive title
company = doc.get('company_name', 'Unknown')
year = doc.get('year', '')
quarter = doc.get('quarter', '')
if year and quarter:
title = f"{company} {year} Q{quarter.upper()}"
elif year:
title = f"{company} {year}"
else:
title = company
prepared_docs.append({
'title': title,
'doc': content
})
return prepared_docs
def main():
"""Main function to build the graph."""
parser = argparse.ArgumentParser(
description="Build Temporal GraphRAG knowledge graph from documents (ECT-QA corpus, text files, or directories) using config.yaml",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'--config',
type=str,
default='tgrag/configs/config.yaml',
help='Path to configuration file (default: tgrag/configs/config.yaml)'
)
parser.add_argument(
'--output_dir',
type=str,
default=None,
help='Output directory for graph storage (overrides config.working_dir if set)'
)
parser.add_argument(
'--num_docs',
type=int,
default=3,
help='Number of documents to process from the corpus'
)
parser.add_argument(
'--corpus_path',
type=str,
default='ect-qa/corpus/base.jsonl.gz',
help='Path to the corpus file (.jsonl.gz), text file (.txt/.md/.rst/.text/.log), or directory of text files (overrides config.corpus_path if set)'
)
parser.add_argument(
'--chunk_size',
type=int,
default=None,
help='Override chunk size from config'
)
parser.add_argument(
'--chunk_overlap',
type=int,
default=None,
help='Override chunk overlap from config'
)
args = parser.parse_args()
# Prepare override config
override_config = {}
if args.corpus_path:
override_config['corpus_path'] = args.corpus_path
if args.chunk_size:
override_config['chunk_size'] = args.chunk_size
if args.chunk_overlap:
override_config['chunk_overlap'] = args.chunk_overlap
if args.output_dir:
override_config['working_dir'] = args.output_dir
# Create TemporalGraphRAG from config (simplified!)
print("="*60)
print("Loading Configuration and Initializing TemporalGraphRAG")
print("="*60)
print(f"Config file: {args.config}")
if override_config:
print(f"Overrides: {override_config}")
print()
try:
graph_rag = create_temporal_graphrag_from_config(
config_path=args.config,
config_type="building",
override_config=override_config if override_config else None
)
print("✅ TemporalGraphRAG initialized from config")
print(f" Working directory: {graph_rag.working_dir}")
print(f" Chunk size: {graph_rag.chunk_token_size} tokens")
print(f" Chunk overlap: {graph_rag.chunk_overlap_token_size} tokens")
print(f" Entity summarization: {'Disabled' if graph_rag.disable_entity_summarization else 'Enabled'}")
print(f" Community summary: {'Enabled' if graph_rag.enable_community_summary else 'Disabled'}")
except ValueError as e:
print(f"❌ Error: {e}")
sys.exit(1)
except Exception as e:
print(f"❌ Error initializing TemporalGraphRAG: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Load documents from corpus path (use config or override)
from tgrag.src.config.config_loader import ConfigLoader
config_loader = ConfigLoader(config_path=args.config)
config = config_loader.get_config("building", override_args=override_config if override_config else None)
corpus_path = Path(config.get('corpus_path', args.corpus_path))
# Detect input type and load accordingly
try:
if corpus_path.is_file():
if corpus_path.suffix == '.gz' or corpus_path.suffixes[-2:] == ['.jsonl', '.gz']:
# JSONL.gz corpus file (e.g., ECT-QA)
print(f"📚 Loading from corpus file: {corpus_path}")
documents = load_documents_from_corpus(corpus_path, args.num_docs)
elif corpus_path.suffix.lower() in {'.txt', '.md', '.rst', '.text', '.log'} or corpus_path.suffix == '':
# Single text file
print(f"📄 Loading from text file: {corpus_path}")
documents = load_documents_from_txt_file(corpus_path)
else:
raise ValueError(
f"Unsupported file type: {corpus_path.suffix}\n"
f"Supported: .jsonl.gz (corpus), .txt/.md/.rst/.text/.log (text files), or files without extensions"
)
elif corpus_path.is_dir():
# Directory of text files
print(f"📁 Loading from directory: {corpus_path}")
documents = load_documents_from_txt_directory(corpus_path)
else:
raise FileNotFoundError(f"Path not found: {corpus_path}")
if not documents:
print("❌ No documents loaded")
sys.exit(1)
except Exception as e:
print(f"❌ Error loading documents: {e}")
sys.exit(1)
# Prepare documents (auto-detects format)
prepared_docs = prepare_documents_for_insertion(documents)
print(f"✅ Prepared {len(prepared_docs)} documents for insertion")
# Insert documents
print("\n" + "="*60)
print("Inserting documents and building graph...")
print("="*60)
print(f"Processing {len(prepared_docs)} documents...")
print("This may take minutes to hours depending on document size and LLM response time.")
print()
try:
graph_rag.insert(prepared_docs)
print("\n✅ Graph building completed successfully!")
except Exception as e:
print(f"\n❌ Error during graph building: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# Clean up HTTP clients to avoid unclosed session warnings
try:
from tgrag.src.llm.client import get_client_manager
import asyncio
client_manager = get_client_manager()
# Create event loop if needed and close clients
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running, schedule cleanup
asyncio.create_task(client_manager.close_clients())
else:
# If loop is not running, run cleanup
loop.run_until_complete(client_manager.close_clients())
except RuntimeError:
# No event loop, create one temporarily
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(client_manager.close_clients())
loop.close()
except Exception:
# Ignore cleanup errors
pass
# Summary
print("\n" + "="*60)
print("BUILD SUMMARY")
print("="*60)
print(f"✅ Documents processed: {len(prepared_docs)}")
print(f"✅ Graph stored in: {Path(graph_rag.working_dir).absolute()}")
print(f"✅ Working directory: {graph_rag.working_dir}")
print(f"✅ Configuration: {args.config}")
print("="*60)
if __name__ == "__main__":
main()