-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRAG.py
More file actions
156 lines (134 loc) · 5.35 KB
/
Copy pathRAG.py
File metadata and controls
156 lines (134 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
轻量级 RAG – 支持 TXT / PDF / DOCX / PPTX / XLSX / HTML / MD / PY
"""
import sys, pathlib, re, textwrap
from typing import List
import numpy as np
from openai import OpenAI
# ==== 本地 Ollama OpenAI 兼容接口 ====
client = OpenAI(api_key="ollama", base_url="http://127.0.0.1:11434/v1")
# ==== 配置 ====
EMBED_MODEL = "nomic-embed-text" # 嵌入模型
LLM_MODEL = "qwen3:0.6b" # 语言模型
CHUNK_TOKENS = 350 # 每个文档块的token数
CHUNK_OVERLAP = 30 # 文档块重叠token数
EMBED_BATCH = 64 # 嵌入批处理大小
TOP_K = 4 # 检索返回的文档片段数量
# ---------- 读文件并转纯文本 ----------
def file_to_text(path: str) -> str:
p = pathlib.Path(path)
ext = p.suffix.lower()
# 纯文本、Markdown、Python
if ext in {".txt", ".md", ".markdown", ".py"}:
return p.read_text(encoding="utf-8", errors="ignore")
if ext == ".pdf":
from pypdf import PdfReader
return "\n".join(page.extract_text() or "" for page in PdfReader(p).pages)
if ext == ".docx":
import docx
return "\n".join(par.text for par in docx.Document(p).paragraphs)
if ext == ".pptx":
from pptx import Presentation
return "\n".join(sh.text for s in Presentation(p).slides for sh in s.shapes if getattr(sh, "text", ""))
if ext == ".xlsx":
import openpyxl
wb = openpyxl.load_workbook(p, data_only=True)
return "\n".join(str(c) for ws in wb.worksheets for row in ws.iter_rows(values_only=True) for c in row if c)
if ext in {".html", ".htm"}:
from bs4 import BeautifulSoup
return BeautifulSoup(p.read_text(encoding="utf-8", errors="ignore"), "lxml").get_text("\n")
raise ValueError(f"暂不支持格式: {path}")
# ---------- Token 计数 ----------
def _count_tokens(text: str) -> int:
try:
import tiktoken
return len(tiktoken.get_encoding("cl100k_base").encode(text))
except Exception:
return len(re.findall(r"\S+", text))
# ---------- 文本切块 ----------
def _split_text(text: str, size=CHUNK_TOKENS, overlap=CHUNK_OVERLAP):
text = re.sub(r"\s+", " ", text).strip()
if not text:
return
sentences = re.split(r"(?<=[。.!?])\s+", text) # 句子级切分
buf, buf_tokens = [], 0
for sent in sentences:
t = _count_tokens(sent)
if buf_tokens + t > size:
yield " ".join(buf)
while buf and buf_tokens > overlap:
buf_tokens -= _count_tokens(buf.pop(0))
buf.append(sent)
buf_tokens += t
if buf:
yield " ".join(buf)
# ---------- 嵌入 ----------
def _embed(texts: List[str]) -> np.ndarray:
idx_map, non_empty = [], []
for i, t in enumerate(texts):
if t.strip():
idx_map.append(i)
non_empty.append(t.strip())
out = np.zeros((len(texts), 768), dtype=np.float32)
for i in range(0, len(non_empty), EMBED_BATCH):
batch = non_empty[i:i+EMBED_BATCH]
vecs = np.array([d.embedding for d in client.embeddings.create(model=EMBED_MODEL, input=batch).data], dtype=np.float32)
vecs /= np.linalg.norm(vecs, axis=1, keepdims=True)
out[idx_map[i:i+EMBED_BATCH]] = vecs
return out
# ---------- 构建向量库 ----------
def build_corpus(files: List[str]):
chunks, meta = [], []
for f in files:
for idx, chunk in enumerate(_split_text(file_to_text(f))):
chunks.append(chunk)
meta.append((pathlib.Path(f).name, idx))
return chunks, _embed(chunks), meta
# ---------- 检索 ----------
def retrieve(query: str, chunks, vecs, meta, k=TOP_K):
if vecs.size == 0:
return []
sims = vecs @ _embed([query])[0]
top = (-sims).argsort()[: min(k, len(sims))]
return [(chunks[i], sims[i], *meta[i]) for i in top]
# ---------- 流式回答 ----------
SYS_PROMPT = "你是一位严谨的中文问答助手,只能依据参考资料作答。"
def rag_answer_stream(question, chunks, vecs, meta):
ctx = "\n".join(
f"[{i+1}]《{f}》段落{idx}:{textwrap.shorten(t, width=350, placeholder='…')}"
for i, (t, _, f, idx) in enumerate(retrieve(question, chunks, vecs, meta))
)
prompt = f"{SYS_PROMPT}\n\n参考资料:\n{ctx}\n\n用户问题:{question}"
stream = client.chat.completions.create(
model=LLM_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
# ---------- CLI ----------
def main():
if len(sys.argv) < 2:
print("用法: python rag.py <文件1> <文件2> ...")
sys.exit(0)
print("正在加载文件并构建向量库…")
chunks, vecs, meta = build_corpus(sys.argv[1:])
print(f"已生成 {len(chunks)} 段文本向量。Ctrl-C / 回车 退出。")
try:
while True:
q = input("\n问题> ").strip()
if not q:
break
print("回答:", end="", flush=True)
for s in rag_answer_stream(q, chunks, vecs, meta):
print(s, end="", flush=True)
print()
except (KeyboardInterrupt, EOFError):
print("\n再见!")
if __name__ == "__main__":
main()