-
Notifications
You must be signed in to change notification settings - Fork 660
extract images of attachments uploaded during conversations #3217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
7edb3c9
d654342
10acafe
27ce29e
9c4c602
6e8795c
6379103
e052c2f
e8381f1
49335a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| import base64 | ||
| import concurrent.futures | ||
| import io | ||
| import json | ||
| import logging | ||
| import os | ||
| import shutil | ||
|
|
@@ -19,10 +20,10 @@ | |
| from transformers import CLIPProcessor, CLIPModel | ||
| from nexent.data_process.core import DataProcessCore | ||
|
|
||
| from consts.const import CLIP_MODEL_PATH, IMAGE_FILTER, MAX_CONCURRENT_CONVERSIONS, REDIS_BACKEND_URL, REDIS_URL | ||
| from consts.const import CLIP_MODEL_PATH, IMAGE_FILTER, MAX_CONCURRENT_CONVERSIONS, REDIS_BACKEND_URL, REDIS_URL, TABLE_TRANSFORMER_MODEL_PATH, UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH | ||
| from consts.exceptions import OfficeConversionException | ||
| from consts.model import BatchTaskRequest | ||
| from database.attachment_db import delete_file, file_exists, get_file_size_from_minio, get_file_stream, upload_file | ||
| from database.attachment_db import build_s3_url, delete_file, file_exists, get_file_size_from_minio, get_file_stream, upload_file, upload_fileobj | ||
| from utils.file_management_utils import convert_office_to_pdf | ||
| from data_process.app import app as celery_app | ||
| from data_process.tasks import submit_process_forward_chain | ||
|
|
@@ -600,20 +601,78 @@ async def process_uploaded_text_file(self, file_content: bytes, filename: str, c | |
| f"Processing uploaded file: {filename} using SDK DataProcessCore") | ||
|
|
||
| data_processor = DataProcessCore() | ||
| chunks, _ = data_processor.file_process( | ||
| text_chunks, images_chunks = data_processor.file_process( | ||
| file_data=file_content, | ||
| filename=filename, | ||
| chunking_strategy=chunking_strategy | ||
| chunking_strategy=chunking_strategy, | ||
| model_type = "vlm", | ||
| table_transformer_model_path=TABLE_TRANSFORMER_MODEL_PATH, | ||
| unstructured_default_model_initialize_params_json_path=UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH | ||
| ) | ||
|
|
||
| full_text = "" | ||
| chunk_texts: List[str] = [] | ||
| for chunk in chunks: | ||
| for chunk in text_chunks: | ||
| if 'content' in chunk: | ||
| chunk_content = chunk['content'] | ||
| full_text += chunk_content + "\n" | ||
| chunk_texts.append(chunk_content) | ||
|
|
||
| # process images if any | ||
| image_descriptions: List[str] = [] | ||
| images_list_urls = [] | ||
| image_info = [] | ||
| if images_chunks: | ||
| folder = "images_in_attachments" | ||
| for idx, img_data in enumerate(images_chunks): | ||
| if not isinstance(img_data, dict): | ||
| logger.warning(f"Skipping image entry at index {idx}: unexpected type {type(img_data)}") | ||
| continue | ||
|
|
||
| if "image_bytes" not in img_data: | ||
| logger.warning(f"Skipping image entry at index {idx}: missing image_bytes") | ||
| continue | ||
|
|
||
| # upload image to MinIO | ||
| img_obj = io.BytesIO(img_data["image_bytes"]) | ||
| result = upload_fileobj( | ||
| file_obj=img_obj, | ||
| file_name=f"{idx}.{img_data['image_format']}", | ||
| prefix=folder | ||
| ) | ||
|
|
||
| image_url = build_s3_url(result.get("object_name", "")) | ||
|
|
||
| # create description string | ||
| position = img_data["position"] | ||
| coords = position["coordinates"] | ||
| desc = ( | ||
| f"--- Image {idx+1} ---\n" | ||
| f"Page {position.get('page_number', 'unknown')} | " | ||
| f"Box: ({coords.get('x1', '')}, {coords.get('y1', '')}) -> ({coords.get('x2', '')}, {coords.get('y2', '')})\n" | ||
| f"URL: {image_url}" | ||
| ) | ||
| image_descriptions.append(desc) | ||
|
|
||
| images_list_urls.append(image_url) | ||
|
|
||
| image_info.append({ | ||
| "content": json.dumps({ | ||
| "source_file": filename, | ||
| "position": position, | ||
| "image_url": image_url}), | ||
| "source_type": "minio", | ||
| "image_url": image_url, | ||
| "filename": filename, | ||
| "page": position["page_number"] | ||
| }) | ||
|
|
||
| # Append image descriptions to the chunk list and full text | ||
| if image_descriptions: | ||
| separator = f"\n\n=== Image information for {filename} ===\n\n" | ||
| full_text += separator + "\n\n".join(image_descriptions) | ||
| chunk_texts.extend(image_descriptions) | ||
|
|
||
| processing_time = time.time() - start_time | ||
| logger.info( | ||
| f"Successfully processed uploaded file: {filename}, extracted {len(full_text)} characters in {processing_time:.2f}s" | ||
|
|
@@ -624,8 +683,9 @@ async def process_uploaded_text_file(self, file_content: bytes, filename: str, c | |
| "task_id": None, | ||
| "filename": filename, | ||
| "text": full_text.strip(), | ||
| "images_info": [images_list_urls, image_info], | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| "chunks": chunk_texts, | ||
| "chunks_count": len(chunks), | ||
| "chunks_count": len(text_chunks) + len(images_chunks), | ||
| "text_length": len(full_text.strip()), | ||
| "processing_time": processing_time, | ||
| "chunking_strategy": chunking_strategy | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,16 +4,20 @@ | |
| Extracts content from text files (excluding images) and analyzes it using a large language model. | ||
| Supports files from S3, HTTP, and HTTPS URLs. | ||
| """ | ||
| import json | ||
| import logging | ||
| from typing import List, Optional | ||
|
|
||
| from jinja2 import Template, StrictUndefined | ||
| from pydantic import Field | ||
| import zipfile | ||
| import io | ||
| import olefile | ||
| from smolagents.tools import Tool | ||
|
|
||
| from ...core.utils.observer import MessageObserver, ProcessType | ||
| from ...core.utils.prompt_template_utils import get_prompt_template | ||
| from ...core.utils.tools_common_message import ToolCategory, ToolSign | ||
| from ...core.utils.tools_common_message import ToolCategory, ToolSign, SearchResultTextMessage | ||
| from ...storage import MinIOStorageClient | ||
| from ...multi_modal.load_save_object import LoadSaveObjectManager | ||
| from ...utils.http_client_manager import http_client_manager | ||
|
|
@@ -32,7 +36,7 @@ class AnalyzeTextFileTool(Tool): | |
| "The tool will extract text content from each file and return an analysis based on your question." | ||
| ) | ||
|
|
||
| description_zh = "从文本文件中提取内容,并根据你的问题使用大语言模型进行分析。支持来自 S3、HTTP 和 HTTPS URL 的多个文件。支持 s3://bucket/key、/bucket/key、http:// 和 https:// URL。该工具将从每个文件中提取文本内容,并根据你的问题返回分析结果。" | ||
| description_zh = "从文件中提取内容,并根据你的问题使用大语言模型进行分析。支持来自 S3、HTTP 和 HTTPS URL 的多个文件。支持 s3://bucket/key、/bucket/key、http:// 和 https:// URL。该工具将从每个文件中提取文本内容以及图片元数据,并根据你的问题返回分析结果。" | ||
|
|
||
| inputs = { | ||
| "file_url_list": { | ||
|
|
@@ -148,8 +152,12 @@ def _forward_impl( | |
|
|
||
| for index, single_file in enumerate(file_url_list, start=1): | ||
| logger.info( | ||
| f"Extracting text content from file #{index}, query: {query}") | ||
| filename = f"file_{index}.txt" | ||
| f"Extracting text content and image info from file #{index}, query: {query}") | ||
|
|
||
| # detect file type | ||
| file_extension = self.detect_file_type(single_file) | ||
|
|
||
| filename = f"file_{index}.{file_extension}" | ||
|
|
||
| # Step 1: Get file content | ||
| raw_text = self.process_text_file(filename, single_file) | ||
|
|
@@ -206,6 +214,21 @@ def process_text_file(self, filename: str, file_content: bytes,) -> str: | |
|
|
||
| if response.status_code == 200: | ||
| result = response.json() | ||
|
|
||
| # process image information | ||
| images_list_url, image_info = result.get("images_info", ([], [])) | ||
| if images_list_url: | ||
| search_images_list_json = json.dumps( | ||
| {"images_url": images_list_url}, ensure_ascii=False | ||
| ) | ||
| self.observer.add_message( | ||
| "", ProcessType.PICTURE_WEB, search_images_list_json | ||
| ) | ||
| if image_info: | ||
| search_results_json = self._build_search_results(image_info) | ||
| search_results_data = json.dumps(search_results_json, ensure_ascii=False) | ||
| self.observer.add_message("", ProcessType.SEARCH_CONTENT, search_results_data) | ||
|
|
||
| raw_text = result.get("text", "") | ||
| logger.info( | ||
| f"File processed successfully: {raw_text[:200]}...{raw_text[-200:]}..., length: {len(raw_text)}") | ||
|
|
@@ -245,3 +268,58 @@ def analyze_file(self, query: str, raw_text: str,): | |
| user_prompt=user_prompt | ||
| ) | ||
| return result.content, truncation_percentage | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| def detect_file_type(self, file_bytes: bytes) -> str: | ||
| if file_bytes.startswith(b"%PDF"): | ||
| return "pdf" | ||
|
|
||
| try: | ||
| # doc/xls/ppt | ||
| if file_bytes.startswith(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1"): | ||
| ole = olefile.OleFileIO(io.BytesIO(file_bytes)) | ||
|
|
||
| for stream, file_type in { | ||
| "WordDocument": "doc", | ||
| "Workbook": "xls", | ||
| "Book": "xls", | ||
| "PowerPoint Document": "ppt", | ||
| }.items(): | ||
| if ole.exists(stream): | ||
| return file_type | ||
|
|
||
| # docx/xlsx/pptx | ||
| elif file_bytes.startswith(b"PK"): | ||
| names = set(zipfile.ZipFile(io.BytesIO(file_bytes)).namelist()) | ||
|
|
||
| for marker, file_type in { | ||
| "word/document.xml": "docx", | ||
| "xl/workbook.xml": "xlsx", | ||
| "ppt/presentation.xml": "pptx", | ||
| }.items(): | ||
| if marker in names: | ||
| return file_type | ||
|
|
||
| except olefile.OleFileError: | ||
| logger.error("Failed to determine file extension, defaulting to txt type.") | ||
|
|
||
| return "txt" | ||
|
|
||
|
|
||
| def _build_search_results(self, image_info): | ||
| search_results_json = [] | ||
| for index, single_image in enumerate(image_info): | ||
| search_result_message = SearchResultTextMessage( | ||
| title=single_image.get("filename", ""), | ||
| url=single_image.get("image_url", ""), | ||
| text=single_image.get("content", ""), | ||
| source_type=single_image.get("source_type", ""), | ||
| filename=single_image.get("filename", ""), | ||
| score_details={}, | ||
| cite_index=single_image.get("page", 0) + index, | ||
| search_type=self.name, | ||
| tool_sign=self.tool_sign, | ||
| ) | ||
|
|
||
| search_results_json.append(search_result_message.to_dict()) | ||
|
|
||
| return search_results_json | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
图片上传到 MinIO 时没有错误处理。如果上传失败,整个文件处理流程会中断,用户连文本内容都拿不到。建议 try/except 包裹,失败的图片跳过并记录 warning,保证文本内容仍可用。