-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtools.py
More file actions
196 lines (159 loc) · 7 KB
/
tools.py
File metadata and controls
196 lines (159 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# Tencent is pleased to support the open source community by making tRPC-Agent-Python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# tRPC-Agent-Python is licensed under Apache-2.0.
""" Custom components for LangchainKnowledge. """
import asyncio
from typing import Any
from typing import AsyncIterator
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Sequence
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import BaseDocumentTransformer
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
class CustomDocumentLoader(BaseLoader):
"""An example document loader that reads a file line by line."""
def __init__(self, file_path: str) -> None:
"""Initialize the loader with a file path.
Args:
file_path: The path to the file to load.
"""
self.file_path = file_path
def lazy_load(self) -> Iterator[Document]: # <-- Does not take any arguments
"""A lazy loader that reads a file line by line.
When you're implementing lazy load methods, you should use a generator
to yield documents one by one.
"""
with open(self.file_path, encoding="utf-8") as f:
line_number = 0
for line in f:
yield Document(
page_content=line,
metadata={
"line_number": line_number,
"source": self.file_path
},
)
line_number += 1
# alazy_load is OPTIONAL.
# If you leave out the implementation, a default implementation which delegates to lazy_load will be used!
async def alazy_load(self) -> AsyncIterator[Document]: # <-- Does not take any arguments
# """An async lazy loader that reads a file line by line."""
try:
# Requires aiofiles
# https://github.com/Tinche/aiofiles
import aiofiles
async with aiofiles.open(self.file_path, encoding="utf-8") as f:
line_number = 0
async for line in f:
yield Document(
page_content=line,
metadata={
"line_number": line_number,
"source": self.file_path
},
)
line_number += 1
except ImportError:
# Fallback to super class implementation if aiofiles is not available
async for item in super().alazy_load():
yield item
class CustomTextSplitter(BaseDocumentTransformer):
"""Interface for splitting text into chunks."""
def __init__(self, separator: str) -> None:
"""Create a new TextSplitter."""
self.separator = separator
def transform_documents(self, documents: Sequence[Document], **kwargs: Any) -> Sequence[Document]:
"""Transform a list of documents.
Args:
documents: A sequence of Documents to be transformed.
Returns:
A sequence of transformed Documents.
"""
transformed_docs = []
for doc in documents:
# Split the document content by separator
text_chunks = doc.page_content.split(self.separator)
# Create new documents for each chunk
for i, chunk in enumerate(text_chunks):
# Skip empty chunks
if chunk.strip():
# Create new document with the chunk content
new_doc = Document(
page_content=chunk.strip(),
metadata={
**doc.metadata, # Preserve original metadata
"chunk_index": i, # Add chunk index
"original_doc_id": id(doc), # Reference to original document
})
transformed_docs.append(new_doc)
return transformed_docs
async def atransform_documents(self, documents: Sequence[Document], **kwargs: Any) -> Sequence[Document]:
"""Asynchronously transform a list of documents.
Args:
documents: A sequence of Documents to be transformed.
Returns:
A sequence of transformed Documents.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.transform_documents, documents, **kwargs)
class ToyRetriever(BaseRetriever):
"""A toy retriever that contains the top k documents that contain the user query.
This retriever only implements the sync method _get_relevant_documents.
If the retriever were to involve file access or network access, it could benefit
from a native async implementation of `_aget_relevant_documents`.
As usual, with Runnables, there's a default async implementation that's provided
that delegates to the sync implementation running on another thread.
"""
documents: List[Document]
"""List of documents to retrieve from."""
k: int
"""Number of top results to return"""
def _get_relevant_documents(self, query: str, *, run_manager: CallbackManagerForRetrieverRun) -> List[Document]:
"""Sync implementations for retriever."""
matching_documents = []
for document in self.documents:
if len(matching_documents) >= self.k:
return matching_documents
if query.lower() in document.page_content.lower():
matching_documents.append(document)
return matching_documents
# Optional: Provide a more efficient native implementation by overriding
# _aget_relevant_documents
# async def _aget_relevant_documents(
# self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
# ) -> List[Document]:
# """Asynchronously get documents relevant to a query.
# Args:
# query: String to find relevant documents for
# run_manager: The callbacks handler to use
# Returns:
# List of relevant documents
# """
# Optional: If you want to use retriever with vectorstore together in LangChainKnowledge,
# you should implement this method
@classmethod
def from_documents(
cls,
documents: Iterable[Document],
**kwargs: Any,
) -> "ToyRetriever":
"""
Create a ToyRetriever from a list of Documents.
Args:
documents: A list of Documents to vectorize.
**kwargs: Any other arguments to pass to the retriever.
Returns:
A ToyRetriever instance.
"""
# Extract k parameter from kwargs, default to 3
k = kwargs.pop('k', 3)
# Convert documents to list if it's an iterable
doc_list = list(documents)
# Create and return ToyRetriever instance
return cls(documents=doc_list, k=k, **kwargs)