本文件提供 Chinese GraphRAG 系統的詳細使用範例和教學,幫助您快速上手並掌握系統的各種功能。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Chinese GraphRAG 快速開始範例
"""
from chinese_graphrag import ChineseGraphRAG
def main():
# 初始化系統
graphrag = ChineseGraphRAG(config_path="config/settings.yaml")
# 索引文件
print("開始索引文件...")
graphrag.index(
input_path="./documents",
output_path="./data"
)
print("索引完成!")
# 執行查詢
question = "什麼是人工智慧?"
result = graphrag.query(question)
print(f"問題:{question}")
print(f"回答:{result.answer}")
print(f"來源:{[source.document for source in result.sources]}")
if __name__ == "__main__":
main()# 1. 初始化專案
uv run chinese-graphrag init
# 2. 準備文件
mkdir documents
echo "人工智慧是模擬人類智慧的技術。" > documents/ai.txt
echo "機器學習是人工智慧的一個分支。" > documents/ml.txt
# 3. 索引文件
uv run chinese-graphrag index --input documents --output data
# 4. 執行查詢
uv run chinese-graphrag query "什麼是人工智慧?"import os
from pathlib import Path
# 建立測試文件
documents_dir = Path("tutorial_documents")
documents_dir.mkdir(exist_ok=True)
# 建立中文文件
documents = {
"ai_basics.txt": """
人工智慧(Artificial Intelligence, AI)是電腦科學的一個分支,
致力於建立能夠執行通常需要人類智慧的任務的系統。
AI 的主要目標是讓機器能夠學習、推理、感知和決策。
""",
"machine_learning.txt": """
機器學習(Machine Learning, ML)是人工智慧的一個重要分支。
它使用統計技術讓電腦系統能夠從資料中學習,
而無需明確程式設計每個任務的執行方式。
""",
"deep_learning.txt": """
深度學習(Deep Learning, DL)是機器學習的一個子領域。
它基於人工神經網路,特別是深層神經網路。
深度學習在影像識別、自然語言處理等領域取得了重大突破。
"""
}
for filename, content in documents.items():
with open(documents_dir / filename, 'w', encoding='utf-8') as f:
f.write(content.strip())
print(f"建立了 {len(documents)} 個測試文件")from chinese_graphrag.config import GraphRAGConfig
# 建立配置
config = GraphRAGConfig(
# 模型配置
models={
"chat_model": {
"type": "openai_chat",
"model": "gpt-4.1-mini",
"api_key": "your-api-key"
},
"embedding_model": {
"type": "bge_m3",
"model": "BAAI/bge-m3",
"device": "auto"
}
},
# 文本處理配置
text_processing={
"chunk_size": 1000,
"chunk_overlap": 200,
"language": "zh"
},
# 向量資料庫配置
vector_store={
"type": "lancedb",
"uri": "./tutorial_data/lancedb"
}
)
# 儲存配置
config.save("tutorial_config.yaml")from chinese_graphrag import ChineseGraphRAG
from chinese_graphrag.monitoring import get_logger
# 設定日誌
logger = get_logger(__name__)
def run_indexing():
"""執行文件索引"""
# 初始化系統
graphrag = ChineseGraphRAG(config_path="tutorial_config.yaml")
try:
# 開始索引
logger.info("開始索引文件...")
result = graphrag.index(
input_path="tutorial_documents",
output_path="tutorial_data",
show_progress=True
)
logger.info(f"索引完成!處理了 {result.documents_count} 個文件")
logger.info(f"提取了 {result.entities_count} 個實體")
logger.info(f"發現了 {result.relationships_count} 個關係")
return result
except Exception as e:
logger.error(f"索引失敗:{e}")
raise
if __name__ == "__main__":
result = run_indexing()
print("索引結果:", result)from chinese_graphrag import ChineseGraphRAG
def basic_query_example():
"""基本查詢範例"""
# 初始化系統
graphrag = ChineseGraphRAG(config_path="tutorial_config.yaml")
# 準備查詢問題
questions = [
"什麼是人工智慧?",
"機器學習和深度學習有什麼區別?",
"AI 的主要應用領域有哪些?"
]
for question in questions:
print(f"\n問題:{question}")
print("-" * 50)
# 執行查詢
result = graphrag.query(question)
print(f"回答:{result.answer}")
print(f"搜尋類型:{result.search_type}")
print(f"回應時間:{result.response_time:.2f}秒")
# 顯示來源
if result.sources:
print("來源文件:")
for i, source in enumerate(result.sources[:3], 1):
print(f" {i}. {source.document} (相關度: {source.relevance_score:.2f})")
if __name__ == "__main__":
basic_query_example()from chinese_graphrag import ChineseGraphRAG
from chinese_graphrag.query import QueryConfig
def advanced_query_example():
"""進階查詢範例"""
graphrag = ChineseGraphRAG(config_path="tutorial_config.yaml")
# 自訂查詢配置
query_config = QueryConfig(
search_type="global", # 或 "local", "auto"
max_tokens=1500,
temperature=0.1,
top_k=10,
similarity_threshold=0.7
)
question = "深度學習在哪些領域有重要應用?"
# 執行查詢
result = graphrag.query(
question=question,
config=query_config
)
print(f"問題:{question}")
print(f"回答:{result.answer}")
# 顯示詳細資訊
print(f"\n查詢詳情:")
print(f"- 搜尋類型:{result.search_type}")
print(f"- 使用的模型:{result.model_used}")
print(f"- Token 使用量:{result.tokens_used}")
print(f"- 處理時間:{result.response_time:.2f}秒")
# 顯示中間結果
if hasattr(result, 'intermediate_results'):
print(f"\n中間結果:")
for step, data in result.intermediate_results.items():
print(f"- {step}: {len(data)} 項")
if __name__ == "__main__":
advanced_query_example()import asyncio
from pathlib import Path
from chinese_graphrag import ChineseGraphRAG
from chinese_graphrag.indexing import BatchIndexer
async def batch_indexing_example():
"""批次索引範例"""
# 準備多個文件目錄
directories = [
"documents/tech",
"documents/business",
"documents/science"
]
# 建立批次索引器
batch_indexer = BatchIndexer(
config_path="tutorial_config.yaml",
max_workers=4,
batch_size=10
)
# 執行批次索引
results = []
for directory in directories:
if Path(directory).exists():
print(f"索引目錄:{directory}")
result = await batch_indexer.index_directory(
input_path=directory,
output_path=f"data/{Path(directory).name}"
)
results.append(result)
print(f"完成:{result.documents_count} 個文件")
# 合併結果
total_docs = sum(r.documents_count for r in results)
total_entities = sum(r.entities_count for r in results)
print(f"\n批次索引完成:")
print(f"- 總文件數:{total_docs}")
print(f"- 總實體數:{total_entities}")
if __name__ == "__main__":
asyncio.run(batch_indexing_example())from chinese_graphrag import ChineseGraphRAG
from concurrent.futures import ThreadPoolExecutor
import time
def batch_query_example():
"""批次查詢範例"""
graphrag = ChineseGraphRAG(config_path="tutorial_config.yaml")
# 準備查詢列表
questions = [
"什麼是人工智慧?",
"機器學習的主要類型有哪些?",
"深度學習的優勢是什麼?",
"AI 在醫療領域的應用?",
"自然語言處理的挑戰?"
]
# 單執行緒查詢
print("單執行緒查詢:")
start_time = time.time()
single_results = []
for question in questions:
result = graphrag.query(question)
single_results.append(result)
single_time = time.time() - start_time
print(f"完成時間:{single_time:.2f}秒")
# 多執行緒查詢
print("\n多執行緒查詢:")
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
multi_results = list(executor.map(graphrag.query, questions))
multi_time = time.time() - start_time
print(f"完成時間:{multi_time:.2f}秒")
print(f"加速比:{single_time/multi_time:.2f}x")
# 顯示結果
for i, (question, result) in enumerate(zip(questions, multi_results)):
print(f"\n{i+1}. {question}")
print(f" {result.answer[:100]}...")
if __name__ == "__main__":
batch_query_example()from chinese_graphrag.embeddings import EmbeddingService
import numpy as np
from sentence_transformers import SentenceTransformer
class CustomChineseEmbedding(EmbeddingService):
"""自訂中文 Embedding 服務"""
def __init__(self, model_name: str = "shibing624/text2vec-base-chinese"):
super().__init__()
self.model = SentenceTransformer(model_name)
self.model_name = model_name
def encode_texts(self, texts: list[str]) -> np.ndarray:
"""編碼文本列表"""
# 預處理中文文本
processed_texts = [self._preprocess_chinese(text) for text in texts]
# 生成 embeddings
embeddings = self.model.encode(
processed_texts,
normalize_embeddings=True,
show_progress_bar=True
)
return embeddings
def _preprocess_chinese(self, text: str) -> str:
"""中文文本預處理"""
import re
# 移除多餘空白
text = re.sub(r'\s+', ' ', text.strip())
# 保留中文字符、數字和基本標點
text = re.sub(r'[^\u4e00-\u9fff\w\s,。!?;:]', '', text)
return text
def get_dimension(self) -> int:
"""獲取向量維度"""
return self.model.get_sentence_embedding_dimension()
# 使用自訂 embedding
def use_custom_embedding():
"""使用自訂 embedding 範例"""
from chinese_graphrag.config import GraphRAGConfig
from chinese_graphrag import ChineseGraphRAG
# 建立自訂 embedding 服務
custom_embedding = CustomChineseEmbedding()
# 配置系統使用自訂 embedding
config = GraphRAGConfig(
models={
"embedding_model": {
"type": "custom",
"service": custom_embedding
}
}
)
# 初始化系統
graphrag = ChineseGraphRAG(config=config)
# 測試 embedding
texts = ["這是一個測試", "人工智慧很有趣"]
embeddings = custom_embedding.encode_texts(texts)
print(f"Embedding 維度:{embeddings.shape}")
print(f"相似度:{np.dot(embeddings[0], embeddings[1]):.4f}")
if __name__ == "__main__":
use_custom_embedding()from chinese_graphrag.query import QueryProcessor
from chinese_graphrag.models import QueryResult
import jieba
import re
class EnhancedChineseQueryProcessor(QueryProcessor):
"""增強的中文查詢處理器"""
def __init__(self):
super().__init__()
# 載入自訂詞典
jieba.load_userdict("custom_dict.txt")
# 定義查詢類型關鍵詞
self.query_type_keywords = {
"definition": ["什麼是", "定義", "含義", "意思"],
"comparison": ["區別", "差異", "比較", "不同"],
"application": ["應用", "用途", "使用", "實例"],
"process": ["如何", "怎樣", "步驟", "流程"]
}
def process_query(self, question: str) -> dict:
"""處理查詢問題"""
# 基本預處理
processed_question = self._preprocess_question(question)
# 提取關鍵詞
keywords = self._extract_keywords(processed_question)
# 判斷查詢類型
query_type = self._determine_query_type(processed_question)
# 生成搜尋策略
search_strategy = self._generate_search_strategy(query_type, keywords)
return {
"original_question": question,
"processed_question": processed_question,
"keywords": keywords,
"query_type": query_type,
"search_strategy": search_strategy
}
def _preprocess_question(self, question: str) -> str:
"""預處理問題"""
# 移除多餘標點
question = re.sub(r'[??!!。.]+$', '', question.strip())
# 統一問號
question = re.sub(r'[??]', '?', question)
return question
def _extract_keywords(self, question: str) -> list[str]:
"""提取關鍵詞"""
# 使用 jieba 分詞
words = jieba.cut(question)
# 過濾停用詞和標點
stopwords = {"的", "是", "在", "有", "和", "與", "或", "但", "而"}
keywords = [word for word in words
if len(word) > 1 and word not in stopwords]
return keywords
def _determine_query_type(self, question: str) -> str:
"""判斷查詢類型"""
for query_type, keywords in self.query_type_keywords.items():
if any(keyword in question for keyword in keywords):
return query_type
return "general"
def _generate_search_strategy(self, query_type: str, keywords: list[str]) -> dict:
"""生成搜尋策略"""
strategies = {
"definition": {
"search_type": "local",
"focus": "entities",
"boost_keywords": keywords[:3]
},
"comparison": {
"search_type": "global",
"focus": "relationships",
"boost_keywords": keywords
},
"application": {
"search_type": "global",
"focus": "communities",
"boost_keywords": keywords
},
"general": {
"search_type": "auto",
"focus": "mixed",
"boost_keywords": keywords[:5]
}
}
return strategies.get(query_type, strategies["general"])
# 使用自訂查詢處理器
def use_custom_query_processor():
"""使用自訂查詢處理器範例"""
processor = EnhancedChineseQueryProcessor()
test_questions = [
"什麼是機器學習?",
"深度學習和機器學習有什麼區別?",
"人工智慧在醫療領域的應用有哪些?",
"如何訓練一個神經網路?"
]
for question in test_questions:
result = processor.process_query(question)
print(f"\n問題:{question}")
print(f"類型:{result['query_type']}")
print(f"關鍵詞:{result['keywords']}")
print(f"搜尋策略:{result['search_strategy']}")
if __name__ == "__main__":
use_custom_query_processor()import requests
import json
from typing import Dict, List, Optional
class ChineseGraphRAGClient:
"""Chinese GraphRAG API 客戶端"""
def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
if api_key:
self.session.headers.update({"X-API-Key": api_key})
self.session.headers.update({"Content-Type": "application/json"})
def health_check(self) -> Dict:
"""檢查 API 健康狀態"""
response = self.session.get(f"{self.base_url}/api/v1/health")
response.raise_for_status()
return response.json()
def create_index_task(self, input_path: str, output_path: str, config: Optional[Dict] = None) -> Dict:
"""建立索引任務"""
data = {
"input_path": input_path,
"output_path": output_path
}
if config:
data["config"] = config
response = self.session.post(f"{self.base_url}/api/v1/index", json=data)
response.raise_for_status()
return response.json()
def get_index_status(self, task_id: str) -> Dict:
"""獲取索引任務狀態"""
response = self.session.get(f"{self.base_url}/api/v1/index/{task_id}/status")
response.raise_for_status()
return response.json()
def query(self, question: str, search_type: str = "auto", config: Optional[Dict] = None) -> Dict:
"""執行查詢"""
data = {
"question": question,
"search_type": search_type
}
if config:
data["config"] = config
response = self.session.post(f"{self.base_url}/api/v1/query", json=data)
response.raise_for_status()
return response.json()
def batch_query(self, questions: List[str], search_type: str = "auto", config: Optional[Dict] = None) -> Dict:
"""批次查詢"""
data = {
"questions": questions,
"search_type": search_type
}
if config:
data["config"] = config
response = self.session.post(f"{self.base_url}/api/v1/query/batch", json=data)
response.raise_for_status()
return response.json()
# 使用 API 客戶端
def api_client_example():
"""API 客戶端使用範例"""
# 建立客戶端
client = ChineseGraphRAGClient(
base_url="http://localhost:8000",
api_key="your-api-key"
)
try:
# 檢查健康狀態
health = client.health_check()
print(f"API 狀態:{health['status']}")
# 建立索引任務
index_task = client.create_index_task(
input_path="./documents",
output_path="./data",
config={"chunk_size": 1000}
)
task_id = index_task["task_id"]
print(f"索引任務 ID:{task_id}")
# 等待索引完成
import time
while True:
status = client.get_index_status(task_id)
print(f"索引狀態:{status['status']} ({status.get('progress', {}).get('percentage', 0):.1f}%)")
if status["status"] in ["completed", "failed"]:
break
time.sleep(5)
if status["status"] == "completed":
# 執行查詢
result = client.query("什麼是人工智慧?")
print(f"查詢結果:{result['answer']}")
# 批次查詢
questions = ["什麼是機器學習?", "深度學習的應用?"]
batch_results = client.batch_query(questions)
for i, result in enumerate(batch_results["results"]):
print(f"問題 {i+1}:{questions[i]}")
print(f"回答:{result['answer'][:100]}...")
except requests.RequestException as e:
print(f"API 請求錯誤:{e}")
if __name__ == "__main__":
api_client_example()import asyncio
import aiohttp
from typing import Dict, List, Optional
class AsyncChineseGraphRAGClient:
"""異步 Chinese GraphRAG API 客戶端"""
def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.headers = {"Content-Type": "application/json"}
if api_key:
self.headers["X-API-Key"] = api_key
async def query(self, session: aiohttp.ClientSession, question: str, **kwargs) -> Dict:
"""執行查詢"""
data = {"question": question, **kwargs}
async with session.post(
f"{self.base_url}/api/v1/query",
json=data,
headers=self.headers
) as response:
response.raise_for_status()
return await response.json()
async def batch_query_concurrent(self, questions: List[str], max_concurrent: int = 5) -> List[Dict]:
"""並發批次查詢"""
async with aiohttp.ClientSession() as session:
# 建立信號量限制並發數
semaphore = asyncio.Semaphore(max_concurrent)
async def query_with_semaphore(question: str) -> Dict:
async with semaphore:
return await self.query(session, question)
# 並發執行查詢
tasks = [query_with_semaphore(q) for q in questions]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 處理結果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"question": questions[i],
"error": str(result)
})
else:
processed_results.append(result)
return processed_results
# 使用異步客戶端
async def async_client_example():
"""異步客戶端使用範例"""
client = AsyncChineseGraphRAGClient()
# 準備大量查詢
questions = [
"什麼是人工智慧?",
"機器學習的類型有哪些?",
"深度學習的優勢?",
"自然語言處理的挑戰?",
"計算機視覺的應用?",
"強化學習的原理?",
"神經網路的結構?",
"AI 的倫理問題?"
]
import time
start_time = time.time()
# 並發查詢
results = await client.batch_query_concurrent(questions, max_concurrent=3)
end_time = time.time()
print(f"並發查詢完成,耗時:{end_time - start_time:.2f}秒")
# 顯示結果
for i, result in enumerate(results):
if "error" in result:
print(f"{i+1}. 錯誤:{result['error']}")
else:
print(f"{i+1}. {questions[i]}")
print(f" {result['answer'][:100]}...")
if __name__ == "__main__":
asyncio.run(async_client_example())from chinese_graphrag.processors import DocumentProcessor
from chinese_graphrag.models import Document
import json
from pathlib import Path
class JSONDocumentProcessor(DocumentProcessor):
"""JSON 文件處理器"""
def __init__(self):
super().__init__()
self.supported_extensions = ['.json']
def can_process(self, file_path: str) -> bool:
"""檢查是否可以處理該檔案"""
return Path(file_path).suffix.lower() in self.supported_extensions
def process_file(self, file_path: str) -> Document:
"""處理 JSON 檔案"""
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 提取文本內容
content = self._extract_text_from_json(data)
# 建立文件物件
document = Document(
id=self._generate_id(file_path),
title=data.get('title', Path(file_path).stem),
content=content,
metadata={
'file_path': file_path,
'file_type': 'json',
'original_data': data
}
)
return document
def _extract_text_from_json(self, data: dict) -> str:
"""從 JSON 資料中提取文本"""
text_parts = []
def extract_text(obj, prefix=""):
if isinstance(obj, dict):
for key, value in obj.items():
if isinstance(value, str) and len(value.strip()) > 0:
text_parts.append(f"{prefix}{key}: {value