From ed3e5f5df1dabc9206e3b3685ac2c321e3cd6f43 Mon Sep 17 00:00:00 2001 From: Karm Soni Date: Tue, 31 Mar 2026 13:27:16 +0530 Subject: [PATCH 1/5] feat: enhance transaction parsing to support multiple file attachments --- .../transaction_parser/__init__.py | 55 ++++++++++++++----- .../ai_integration/parser.py | 52 +++++++++++++----- .../ai_integration/prompts.py | 10 +++- .../controllers/transaction.py | 41 +++++++++++--- .../transaction_parser_settings.json | 9 +++ .../overrides/communication.py | 33 ++++++++++- .../utils/file_processor.py | 19 +++++++ 7 files changed, 177 insertions(+), 42 deletions(-) diff --git a/transaction_parser/transaction_parser/__init__.py b/transaction_parser/transaction_parser/__init__.py index 83555ce..fcc2c56 100644 --- a/transaction_parser/transaction_parser/__init__.py +++ b/transaction_parser/transaction_parser/__init__.py @@ -34,30 +34,49 @@ def parse(transaction, country, file_url, ai_model=None, page_limit=None): def _parse( country, transaction, - file_url, + file_urls, ai_model=None, page_limit=None, user=None, party=None, company=None, + communication_name=None, ): try: - file = None - filename = file_url.split("/")[-1] + if ( + isinstance(file_urls, str) + and file_urls.startswith("[") + and file_urls.endswith("]") + ): + file_urls = frappe.parse_json(file_urls) - file = frappe.get_last_doc("File", filters={"file_url": file_url}) - filename = file.file_name + elif isinstance(file_urls, str): + file_urls = [file_urls] + + file_names = frappe.get_list( + "File", filters={"file_url": ("in", file_urls)}, pluck="name" + ) + + files = [] + for file_name in file_names: + file = frappe.get_doc("File", file_name) + files.append(file) controller = get_controller(country, transaction)(party=party, company=company) - doc = controller.generate(file, ai_model, page_limit) + doc = controller.generate(files, ai_model, page_limit, communication_name) + filenames = ( + ", ".join([f.file_name for f in files]) + if len(files) > 1 + else files[0].file_name + ) notification = { "document_type": TRANSACTION_MAP[transaction], "document_name": doc.name, "subject": _("{0} {1} generated from {2}").format( _(TRANSACTION_MAP[transaction]), doc.name, - filename, + filenames, ), } @@ -69,19 +88,27 @@ def _parse( and frappe.flags.skip_duplicate_error ): notification = { - "document_type": "File", - "document_name": file.name if file else filename, - "subject": _("Duplicate entry found for {0}").format(filename), + "document_type": "Communication" if communication_name else "File", + "document_name": ( + communication_name if communication_name else files[0].name + ), + "subject": _("Duplicate entry found for {0}").format(file_urls), "message": str(e), } return error_log = frappe.log_error( "Transaction Parser API Error", - reference_doctype="File", - reference_name=file.name if file else filename, + reference_doctype="Communication" if communication_name else "File", + reference_name=( + communication_name + if communication_name + else files[0].name + if files + else None + ), ) - message = _("Failed to generate {0} from {1}").format(_(transaction), filename) + message = _("Failed to generate {0} from {1}").format(_(transaction), file_urls) notification = { "document_type": error_log.doctype, @@ -90,7 +117,7 @@ def _parse( "message": str(e), } - email_failure(user, message, str(e), file_url) + email_failure(user, message, str(e), file_urls) finally: if notification: diff --git a/transaction_parser/transaction_parser/ai_integration/parser.py b/transaction_parser/transaction_parser/ai_integration/parser.py index f74eec2..d7cd2f1 100644 --- a/transaction_parser/transaction_parser/ai_integration/parser.py +++ b/transaction_parser/transaction_parser/ai_integration/parser.py @@ -34,18 +34,32 @@ def parse( document_type: str, document_schema: dict, document_data: str, - file_doc_name: str | None = None, + doc_name: str | None = None, + file_count: int = 1, + is_communication: bool = False, ) -> dict: - messages = self._build_messages(document_type, document_schema, document_data) - response = self.send_message(messages=messages, file_doc_name=file_doc_name) + messages = self._build_messages( + document_type, document_schema, document_data, file_count + ) + + response = self.send_message( + messages=messages, + doc_name=doc_name, + is_communication=is_communication, + ) + return self.get_content(response) def _build_messages( - self, document_type: str, document_schema: dict, document_data: str + self, + document_type: str, + document_schema: dict, + document_data: str, + file_count: int = 1, ) -> tuple: """Build the message structure for AI API call.""" system_prompt = get_system_prompt(document_schema) - user_prompt = get_user_prompt(document_type, document_data) + user_prompt = get_user_prompt(document_type, document_data, file_count) return ( { @@ -58,9 +72,14 @@ def _build_messages( }, ) - def send_message(self, messages: tuple, file_doc_name: str | None = None) -> dict: + def send_message( + self, + messages: tuple, + doc_name: str | None = None, + is_communication: bool = False, + ) -> dict: """Send messages to AI API and handle the response.""" - log = self._create_log_entry(file_doc_name) + log = self._create_log_entry(doc_name, is_communication) try: response = self._make_api_call(messages) @@ -81,16 +100,19 @@ def send_message(self, messages: tuple, file_doc_name: str | None = None) -> dic finally: enqueue_integration_request(**log) - def _create_log_entry(self, file_doc_name: str | None) -> frappe._dict: + def _create_log_entry( + self, doc_name: str | None, is_communication: bool = False + ) -> frappe._dict: """Create a log entry for the API call.""" log = frappe._dict(url=self.model.base_url) - if file_doc_name: - log.update( - { - "reference_doctype": "File", - "reference_name": file_doc_name, - } - ) + + log.update( + { + "reference_doctype": "Communication" if is_communication else "File", + "reference_name": doc_name, + } + ) + return log def _make_api_call(self, messages: tuple) -> Any: diff --git a/transaction_parser/transaction_parser/ai_integration/prompts.py b/transaction_parser/transaction_parser/ai_integration/prompts.py index 95cf2e0..9e01e5a 100644 --- a/transaction_parser/transaction_parser/ai_integration/prompts.py +++ b/transaction_parser/transaction_parser/ai_integration/prompts.py @@ -34,10 +34,16 @@ def get_system_prompt(document_schema: dict) -> str: {document_schema}""" -def get_user_prompt(document_type: str, document_data: str) -> str: +def get_user_prompt(document_type: str, document_data: str, file_count: int = 1) -> str: input_doc_type = INPUT_DOCUMENTS.get(document_type, "document") - return f"""Generate {document_type} for given {input_doc_type} according to above JSON schema. + base_instruction = f"Generate {document_type} for given {input_doc_type} according to above JSON schema." + + if file_count > 1: + multi_file_instruction = f"\n\nIMPORTANT: The data below contains information from {file_count} related documents. Consolidate and merge the information from all documents to create a single unified {document_type}. Combine item lists, sum totals appropriately, and merge party/address information." + base_instruction += multi_file_instruction + + return f"""{base_instruction} Document data is given below: {document_data}""" diff --git a/transaction_parser/transaction_parser/controllers/transaction.py b/transaction_parser/transaction_parser/controllers/transaction.py index 05a59c0..8ae5e7c 100644 --- a/transaction_parser/transaction_parser/controllers/transaction.py +++ b/transaction_parser/transaction_parser/controllers/transaction.py @@ -29,12 +29,20 @@ def __init__( self.company = company def generate( - self, file, ai_model: str | None = None, page_limit: int | None = None + self, + files, + ai_model: str | None = None, + page_limit: int | None = None, + communication_name: str | None = None, ): self.initialize() - self.file = file + if isinstance(files, str): + files = [files] + + self.files = files self.ai_model = ai_model + self.communication_name = communication_name self.data = self._parse_file_content(ai_model, page_limit) self.doc = frappe.get_doc({"doctype": self.DOCTYPE}) self.doc.is_created_by_transaction_parser = 1 @@ -49,7 +57,8 @@ def generate( def initialize(self) -> None: # file processing - self.file = None + self.files = None + self.communication_name = None # output schema self.schema = None @@ -72,14 +81,27 @@ def initialize(self) -> None: def _parse_file_content( self, ai_model: str | None = None, page_limit: int | None = None ) -> dict: - content = FileProcessor().get_content(self.file, page_limit) + file_processor = FileProcessor() + doc_name = None + file_count = len(self.files) + + if len(self.files) > 1: + content = file_processor.get_combined_content(self.files, page_limit) + doc_name = self.communication_name + + else: + content = file_processor.get_content(self.files, page_limit) + doc_name = self.files[0].name + schema = self.get_schema() return AIParser(ai_model, self.settings).parse( document_type=self.DOCTYPE, document_schema=schema, document_data=content, - file_doc_name=self.file.name, + doc_name=doc_name, + file_count=file_count, + is_communication=bool(self.communication_name), ) ################################### @@ -271,9 +293,12 @@ def _set_flags(self) -> None: self.doc.flags.ignore_links = True def _attach_file(self) -> None: - self.file.attached_to_doctype = self.DOCTYPE - self.file.attached_to_name = self.doc.name - self.file.save() + files_to_attach = self.files if isinstance(self.files, list) else [self.files] + + for file_doc in files_to_attach: + file_doc.attached_to_doctype = self.DOCTYPE + file_doc.attached_to_name = self.doc.name + file_doc.save() def set_exchange_rate(self, from_currency, date, args): company_currency = erpnext.get_company_currency(self.doc.company) diff --git a/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json b/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json index 73a5a16..007ec5a 100644 --- a/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json +++ b/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json @@ -15,6 +15,7 @@ "email_configuration_section", "parse_incoming_emails", "parse_party_emails", + "process_one_document_per_communication", "incoming_email_accounts", "party_emails", "custom_fields_tab", @@ -113,6 +114,14 @@ "fieldtype": "Check", "label": "Parse Party Emails" }, + { + "default": "1", + "depends_on": "eval: doc.parse_incoming_emails", + "description": "When enabled, all attachments from a communication are combined to create a single document. When disabled, each attachment creates a separate document.", + "fieldname": "process_one_document_per_communication", + "fieldtype": "Check", + "label": "Process One Document Per Communication" + }, { "fieldname": "email_configuration_section", "fieldtype": "Section Break", diff --git a/transaction_parser/transaction_parser/overrides/communication.py b/transaction_parser/transaction_parser/overrides/communication.py index 679ae95..48e8e12 100644 --- a/transaction_parser/transaction_parser/overrides/communication.py +++ b/transaction_parser/transaction_parser/overrides/communication.py @@ -98,8 +98,18 @@ def _process_attachments( else: country = frappe.db.get_value("Company", company, "country") + supported_extensions = {"pdf", "xlsx", "xls", "csv"} + filtered_attachments = [ + attachment + for attachment in attachments + if attachment.file_url.split(".")[-1].lower() in supported_extensions + ] + + if not filtered_attachments: + return + sorted_attachments = sorted( - attachments, + filtered_attachments, key=lambda attachment: {"xlsx": 0, "csv": 1, "pdf": 2}.get( attachment.file_url.split(".")[-1].lower(), 3 ), @@ -122,16 +132,33 @@ def _process_attachments( def _parse_attachments( doc, country, transaction_type, attachments, ai_model, user, party, company ): - for attachment in attachments: + settings = frappe.get_cached_doc("Transaction Parser Settings") + + if settings.process_one_document_per_communication: + file_urls = [attachment.file_url for attachment in attachments] _parse( country=country, transaction=transaction_type, - file_url=attachment.file_url, + file_urls=file_urls, ai_model=ai_model, user=user, party=party, company=company, + communication_name=doc.name, ) frappe.db.commit() + else: + for attachment in attachments: + _parse( + country=country, + transaction=transaction_type, + file_urls=attachment.file_url, + ai_model=ai_model, + user=user, + party=party, + company=company, + communication_name=doc.name, + ) + frappe.db.commit() doc.db_set("is_processed_by_transaction_parser", 1) diff --git a/transaction_parser/transaction_parser/utils/file_processor.py b/transaction_parser/transaction_parser/utils/file_processor.py index 8a3ccd9..46a0eac 100644 --- a/transaction_parser/transaction_parser/utils/file_processor.py +++ b/transaction_parser/transaction_parser/utils/file_processor.py @@ -22,6 +22,25 @@ def get_content(self, doc, page_limit=None): else: frappe.throw(_("Only PDF, CSV, and Excel files are supported")) + def get_combined_content(self, docs, page_limit=None): + """Combine content from multiple files with clear separators.""" + if not docs: + frappe.throw(_("No files provided for processing")) + + if not isinstance(docs, list): + docs = [docs] + + if len(docs) == 1: + return self.get_content(docs[0], page_limit) + + combined_parts = [] + for index, doc in enumerate(docs, 1): + separator = f"{'=' * 60}\nDocument {index}: {doc.file_name} ({doc.file_type})\n{'=' * 60}" + content = self.get_content(doc, page_limit) + combined_parts.append(f"{separator}\n\n{content}") + + return "\n\n".join(combined_parts) + def _process_pdf(self, doc, page_limit=None): """Process PDF files with OCR and page limiting.""" self.file = io.BytesIO(doc.get_content()) From 995852f4251e597de1ae616ae1028fd2714ff6c9 Mon Sep 17 00:00:00 2001 From: Karm Soni Date: Wed, 1 Apr 2026 00:18:02 +0530 Subject: [PATCH 2/5] feat: refactor file processing to support multiple attachments and improve response merging --- .../ai_integration/parser.py | 8 +- .../ai_integration/prompts.py | 10 +- .../controllers/transaction.py | 42 +- .../transaction_parser_settings.json | 374 ++++++++--------- .../overrides/communication.py | 1 + .../utils/file_processor.py | 19 - .../utils/response_merger.py | 386 ++++++++++++++++++ 7 files changed, 609 insertions(+), 231 deletions(-) create mode 100644 transaction_parser/transaction_parser/utils/response_merger.py diff --git a/transaction_parser/transaction_parser/ai_integration/parser.py b/transaction_parser/transaction_parser/ai_integration/parser.py index d7cd2f1..7cbe23e 100644 --- a/transaction_parser/transaction_parser/ai_integration/parser.py +++ b/transaction_parser/transaction_parser/ai_integration/parser.py @@ -35,12 +35,9 @@ def parse( document_schema: dict, document_data: str, doc_name: str | None = None, - file_count: int = 1, is_communication: bool = False, ) -> dict: - messages = self._build_messages( - document_type, document_schema, document_data, file_count - ) + messages = self._build_messages(document_type, document_schema, document_data) response = self.send_message( messages=messages, @@ -55,11 +52,10 @@ def _build_messages( document_type: str, document_schema: dict, document_data: str, - file_count: int = 1, ) -> tuple: """Build the message structure for AI API call.""" system_prompt = get_system_prompt(document_schema) - user_prompt = get_user_prompt(document_type, document_data, file_count) + user_prompt = get_user_prompt(document_type, document_data) return ( { diff --git a/transaction_parser/transaction_parser/ai_integration/prompts.py b/transaction_parser/transaction_parser/ai_integration/prompts.py index 9e01e5a..95cf2e0 100644 --- a/transaction_parser/transaction_parser/ai_integration/prompts.py +++ b/transaction_parser/transaction_parser/ai_integration/prompts.py @@ -34,16 +34,10 @@ def get_system_prompt(document_schema: dict) -> str: {document_schema}""" -def get_user_prompt(document_type: str, document_data: str, file_count: int = 1) -> str: +def get_user_prompt(document_type: str, document_data: str) -> str: input_doc_type = INPUT_DOCUMENTS.get(document_type, "document") - base_instruction = f"Generate {document_type} for given {input_doc_type} according to above JSON schema." - - if file_count > 1: - multi_file_instruction = f"\n\nIMPORTANT: The data below contains information from {file_count} related documents. Consolidate and merge the information from all documents to create a single unified {document_type}. Combine item lists, sum totals appropriately, and merge party/address information." - base_instruction += multi_file_instruction - - return f"""{base_instruction} + return f"""Generate {document_type} for given {input_doc_type} according to above JSON schema. Document data is given below: {document_data}""" diff --git a/transaction_parser/transaction_parser/controllers/transaction.py b/transaction_parser/transaction_parser/controllers/transaction.py index 8ae5e7c..69c5fd7 100644 --- a/transaction_parser/transaction_parser/controllers/transaction.py +++ b/transaction_parser/transaction_parser/controllers/transaction.py @@ -7,6 +7,9 @@ from transaction_parser.transaction_parser.ai_integration.parser import AIParser from transaction_parser.transaction_parser.utils import to_dict from transaction_parser.transaction_parser.utils.file_processor import FileProcessor +from transaction_parser.transaction_parser.utils.response_merger import ( + ResponseMerger, +) class Transaction: @@ -81,29 +84,46 @@ def initialize(self) -> None: def _parse_file_content( self, ai_model: str | None = None, page_limit: int | None = None ) -> dict: - file_processor = FileProcessor() - doc_name = None - file_count = len(self.files) - if len(self.files) > 1: - content = file_processor.get_combined_content(self.files, page_limit) - doc_name = self.communication_name + return self._parse_multiple_files(ai_model, page_limit) - else: - content = file_processor.get_content(self.files, page_limit) - doc_name = self.files[0].name + return self._parse_single_file(self.files[0], ai_model, page_limit) + def _parse_single_file( + self, + file, + ai_model: str | None = None, + page_limit: int | None = None, + ) -> dict: + content = FileProcessor().get_content(file, page_limit) schema = self.get_schema() return AIParser(ai_model, self.settings).parse( document_type=self.DOCTYPE, document_schema=schema, document_data=content, - doc_name=doc_name, - file_count=file_count, + doc_name=self.communication_name or file.name, is_communication=bool(self.communication_name), ) + def _parse_multiple_files( + self, ai_model: str | None = None, page_limit: int | None = None + ) -> dict: + response = self._parse_single_file(self.files[0], ai_model, page_limit) + merger = ResponseMerger( + response, + schema=self.get_schema(), + ) + + for file in self.files[1:]: + if merger.is_complete(): + break + + new_response = self._parse_single_file(file, ai_model, page_limit) + merger.merge(new_response) + + return merger.response + ################################### ########## Output Schema ########## ################################### diff --git a/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json b/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json index 007ec5a..96706b2 100644 --- a/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json +++ b/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json @@ -1,189 +1,189 @@ { - "actions": [], - "allow_rename": 1, - "creation": "2025-03-11 16:54:51.622854", - "doctype": "DocType", - "engine": "InnoDB", - "field_order": [ - "api_tab", - "enabled", - "ai_model_section", - "default_ai_model", - "api_keys", - "transaction_configurations_section", - "invoice_lookback_count", - "email_configuration_section", - "parse_incoming_emails", - "parse_party_emails", - "process_one_document_per_communication", - "incoming_email_accounts", - "party_emails", - "custom_fields_tab", - "custom_fields_section", - "column_break_kdtw", - "base_schema", - "tax_schema", - "address_schema", - "party_schema", - "item_schema" - ], - "fields": [ - { - "fieldname": "api_tab", - "fieldtype": "Tab Break", - "label": "API" - }, - { - "default": "0", - "fieldname": "enabled", - "fieldtype": "Check", - "label": "Enabled" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "custom_fields_tab", - "fieldtype": "Tab Break", - "label": "Customizations" - }, - { - "fieldname": "custom_fields_section", - "fieldtype": "Section Break", - "label": "Custom Fields" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "column_break_kdtw", - "fieldtype": "Column Break" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "base_schema", - "fieldtype": "JSON", - "label": "Base Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "tax_schema", - "fieldtype": "JSON", - "label": "Tax Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "address_schema", - "fieldtype": "JSON", - "label": "Address Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "party_schema", - "fieldtype": "JSON", - "label": "Party Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "item_schema", - "fieldtype": "JSON", - "label": "Item Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "default_ai_model", - "fieldtype": "Select", - "label": "Default AI Model", - "mandatory_depends_on": "eval: doc.enabled", - "options": "DeepSeek Chat\nDeepSeek Reasoner\nOpenAI gpt-4o\nOpenAI gpt-4o-mini\nOpenAI gpt-5\nOpenAI gpt-5-mini\nGoogle Gemini Pro\nGoogle Gemini Flash" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "api_keys", - "fieldtype": "Table", - "label": "API Keys", - "options": "Transaction Parser API Key Item" - }, - { - "default": "0", - "fieldname": "parse_incoming_emails", - "fieldtype": "Check", - "label": "Parse Incoming Emails" - }, - { - "default": "0", - "depends_on": "eval: doc.parse_incoming_emails", - "description": "Directly Parse Emails from Party", - "fieldname": "parse_party_emails", - "fieldtype": "Check", - "label": "Parse Party Emails" - }, - { - "default": "1", - "depends_on": "eval: doc.parse_incoming_emails", - "description": "When enabled, all attachments from a communication are combined to create a single document. When disabled, each attachment creates a separate document.", - "fieldname": "process_one_document_per_communication", - "fieldtype": "Check", - "label": "Process One Document Per Communication" - }, - { - "fieldname": "email_configuration_section", - "fieldtype": "Section Break", - "label": "Incoming Email Configurations" - }, - { - "fieldname": "ai_model_section", - "fieldtype": "Section Break", - "label": "AI Model Configurations" - }, - { - "depends_on": "eval: doc.parse_incoming_emails", - "description": "If an email is received on any of these Email Accounts, an attempt will be made to generate the specified Transaction from its attachments.", - "fieldname": "incoming_email_accounts", - "fieldtype": "Table", - "label": "Incoming Email Accounts", - "mandatory_depends_on": "eval: doc.parse_incoming_emails && !doc.parse_party_emails", - "options": "Transaction Parser Email Account" - }, - { - "depends_on": "eval: doc.parse_incoming_emails", - "description": "Mapping Party based on Email Address", - "fieldname": "party_emails", - "fieldtype": "Table", - "label": "Party Emails", - "options": "Transaction Parser Party Email" - }, - { - "fieldname": "invoice_lookback_count", - "fieldtype": "Int", - "in_list_view": 1, - "label": "Number of Past Invoices to Consider for Item Code Selection", - "reqd": 1 - }, - { - "fieldname": "transaction_configurations_section", - "fieldtype": "Section Break", - "label": "Transaction Configurations" - } - ], - "index_web_pages_for_search": 1, - "issingle": 1, - "links": [], - "modified": "2025-09-08 08:48:58.870032", - "modified_by": "Administrator", - "module": "Transaction Parser", - "name": "Transaction Parser Settings", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "print": 1, - "read": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "row_format": "Dynamic", - "sort_field": "modified", - "sort_order": "DESC", - "states": [] + "actions": [], + "allow_rename": 1, + "creation": "2025-03-11 16:54:51.622854", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "api_tab", + "enabled", + "ai_model_section", + "default_ai_model", + "api_keys", + "transaction_configurations_section", + "invoice_lookback_count", + "email_configuration_section", + "parse_incoming_emails", + "parse_party_emails", + "process_one_document_per_communication", + "incoming_email_accounts", + "party_emails", + "custom_fields_tab", + "custom_fields_section", + "column_break_kdtw", + "base_schema", + "tax_schema", + "address_schema", + "party_schema", + "item_schema" + ], + "fields": [ + { + "fieldname": "api_tab", + "fieldtype": "Tab Break", + "label": "API" + }, + { + "default": "0", + "fieldname": "enabled", + "fieldtype": "Check", + "label": "Enabled" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "custom_fields_tab", + "fieldtype": "Tab Break", + "label": "Customizations" + }, + { + "fieldname": "custom_fields_section", + "fieldtype": "Section Break", + "label": "Custom Fields" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "column_break_kdtw", + "fieldtype": "Column Break" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "base_schema", + "fieldtype": "JSON", + "label": "Base Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "tax_schema", + "fieldtype": "JSON", + "label": "Tax Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "address_schema", + "fieldtype": "JSON", + "label": "Address Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "party_schema", + "fieldtype": "JSON", + "label": "Party Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "item_schema", + "fieldtype": "JSON", + "label": "Item Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "default_ai_model", + "fieldtype": "Select", + "label": "Default AI Model", + "mandatory_depends_on": "eval: doc.enabled", + "options": "DeepSeek Chat\nDeepSeek Reasoner\nOpenAI gpt-4o\nOpenAI gpt-4o-mini\nOpenAI gpt-5\nOpenAI gpt-5-mini\nGoogle Gemini Pro\nGoogle Gemini Flash" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "api_keys", + "fieldtype": "Table", + "label": "API Keys", + "options": "Transaction Parser API Key Item" + }, + { + "default": "0", + "fieldname": "parse_incoming_emails", + "fieldtype": "Check", + "label": "Parse Incoming Emails" + }, + { + "default": "0", + "depends_on": "eval: doc.parse_incoming_emails", + "description": "Directly Parse Emails from Party", + "fieldname": "parse_party_emails", + "fieldtype": "Check", + "label": "Parse Party Emails" + }, + { + "default": "1", + "depends_on": "eval: doc.parse_incoming_emails", + "description": "When enabled, all attachments from a communication are combined to create a single document. When disabled, each attachment creates a separate document.", + "fieldname": "process_one_document_per_communication", + "fieldtype": "Check", + "label": "Process One Document Per Communication" + }, + { + "fieldname": "email_configuration_section", + "fieldtype": "Section Break", + "label": "Incoming Email Configurations" + }, + { + "fieldname": "ai_model_section", + "fieldtype": "Section Break", + "label": "AI Model Configurations" + }, + { + "depends_on": "eval: doc.parse_incoming_emails", + "description": "If an email is received on any of these Email Accounts, an attempt will be made to generate the specified Transaction from its attachments.", + "fieldname": "incoming_email_accounts", + "fieldtype": "Table", + "label": "Incoming Email Accounts", + "mandatory_depends_on": "eval: doc.parse_incoming_emails && !doc.parse_party_emails", + "options": "Transaction Parser Email Account" + }, + { + "depends_on": "eval: doc.parse_incoming_emails", + "description": "Mapping Party based on Email Address", + "fieldname": "party_emails", + "fieldtype": "Table", + "label": "Party Emails", + "options": "Transaction Parser Party Email" + }, + { + "fieldname": "invoice_lookback_count", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Number of Past Invoices to Consider for Item Code Selection", + "reqd": 1 + }, + { + "fieldname": "transaction_configurations_section", + "fieldtype": "Section Break", + "label": "Transaction Configurations" + } + ], + "index_web_pages_for_search": 1, + "issingle": 1, + "links": [], + "modified": "2025-09-08 08:48:58.870032", + "modified_by": "Administrator", + "module": "Transaction Parser", + "name": "Transaction Parser Settings", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "print": 1, + "read": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "sort_field": "modified", + "sort_order": "DESC", + "states": [] } \ No newline at end of file diff --git a/transaction_parser/transaction_parser/overrides/communication.py b/transaction_parser/transaction_parser/overrides/communication.py index 48e8e12..99c1b7e 100644 --- a/transaction_parser/transaction_parser/overrides/communication.py +++ b/transaction_parser/transaction_parser/overrides/communication.py @@ -126,6 +126,7 @@ def _process_attachments( party=party, company=default_company if not company else company, queue="long", + now=True, ) diff --git a/transaction_parser/transaction_parser/utils/file_processor.py b/transaction_parser/transaction_parser/utils/file_processor.py index 46a0eac..8a3ccd9 100644 --- a/transaction_parser/transaction_parser/utils/file_processor.py +++ b/transaction_parser/transaction_parser/utils/file_processor.py @@ -22,25 +22,6 @@ def get_content(self, doc, page_limit=None): else: frappe.throw(_("Only PDF, CSV, and Excel files are supported")) - def get_combined_content(self, docs, page_limit=None): - """Combine content from multiple files with clear separators.""" - if not docs: - frappe.throw(_("No files provided for processing")) - - if not isinstance(docs, list): - docs = [docs] - - if len(docs) == 1: - return self.get_content(docs[0], page_limit) - - combined_parts = [] - for index, doc in enumerate(docs, 1): - separator = f"{'=' * 60}\nDocument {index}: {doc.file_name} ({doc.file_type})\n{'=' * 60}" - content = self.get_content(doc, page_limit) - combined_parts.append(f"{separator}\n\n{content}") - - return "\n\n".join(combined_parts) - def _process_pdf(self, doc, page_limit=None): """Process PDF files with OCR and page limiting.""" self.file = io.BytesIO(doc.get_content()) diff --git a/transaction_parser/transaction_parser/utils/response_merger.py b/transaction_parser/transaction_parser/utils/response_merger.py new file mode 100644 index 0000000..0398a41 --- /dev/null +++ b/transaction_parser/transaction_parser/utils/response_merger.py @@ -0,0 +1,386 @@ +from dataclasses import dataclass +from typing import Any + +from frappe import _dict + + +@dataclass +class FieldType: + """Base class for field types in schema.""" + + required: bool + + def is_empty(self, value: Any) -> bool: + """Check if a value is considered empty.""" + return ( + value is None + or value == "" + or (isinstance(value, list | dict) and len(value) == 0) + ) + + +@dataclass +class PrimitiveField(FieldType): + """Represents a primitive field (string, number, date, etc.).""" + + pass + + +@dataclass +class ObjectField(FieldType): + """Represents a nested object field with child fields.""" + + children: dict[str, FieldType] + + +@dataclass +class ListField(FieldType): + """Represents a list/array field.""" + + item_type: FieldType # Type of items in the list + + +class SchemaParser: + """Parses document schema and builds structured field type hierarchy.""" + + def parse(self, schema: dict) -> dict[str, FieldType]: + """ + Parse schema dictionary into structured field types. + + Args: + schema: Schema dictionary to parse + + Returns: + Dictionary mapping field names to FieldType instances + """ + fields = {} + + for key, value in schema.items(): + fields[key] = self._parse_field(value) + + return fields + + def _parse_field(self, schema_value: Any) -> FieldType: + """ + Parse a single field value into appropriate FieldType. + + Args: + schema_value: Value from schema (string, list, or dict) + + Returns: + Appropriate FieldType instance + """ + # List field: schema value is a list + if isinstance(schema_value, list): + required = self._is_required(schema_value) + + # Empty list or primitive list items + if not schema_value: + return ListField( + required=required, item_type=PrimitiveField(required=True) + ) + + item_schema = schema_value[0] + + # List of objects (e.g., item_list) + if isinstance(item_schema, dict): + item_fields = self.parse(item_schema) + return ListField( + required=required, + item_type=ObjectField(required=True, children=item_fields), + ) + + # List of primitives (e.g., emails, phones) + else: + return ListField( + required=required, + item_type=PrimitiveField(required=self._is_required(item_schema)), + ) + + # Object field: schema value is a dict + elif isinstance(schema_value, dict): + children = self.parse(schema_value) + return ObjectField(required=True, children=children) + + # Primitive field: schema value is a string + else: + return PrimitiveField(required=self._is_required(schema_value)) + + def _is_required(self, schema_value: Any) -> bool: + """ + Check if field is required based on schema notation. + + Args: + schema_value: Schema value (string or other type) + + Returns: + True if required, False if optional + """ + if isinstance(schema_value, str): + return "| null" not in schema_value and "| 0" not in schema_value + + return True + + +class ResponseMerger: + """Schema-driven merger for AI responses from multiple attachments.""" + + def __init__(self, response: dict, schema: dict): + """ + Initialize ResponseMerger with a base response and optional schema. + + Args: + response: Initial response dict from first file parsing + schema: Document schema dict for automatic field detection + """ + self.response = _dict(response) if isinstance(response, dict) else response + self.schema = schema + + # Parse schema into structured field types + parser = SchemaParser() + self.fields = parser.parse(schema) + + def is_complete(self) -> bool: + """ + Check if all required fields are filled. + + Returns: + True if complete, False otherwise + """ + return len(self.get_missing_fields()) == 0 + + def get_missing_fields(self) -> list[str]: + """ + Get list of missing required field paths. + + Returns: + List of dot-separated field paths that are missing + """ + missing = [] + self._check_missing_fields(self.fields, self.response, "", missing) + return missing + + def _check_missing_fields( + self, + fields: dict[str, FieldType], + data: dict, + path_prefix: str, + missing: list[str], + ) -> None: + """ + Recursively check for missing required fields. + + Args: + fields: Field type definitions + data: Current data dict to check + path_prefix: Current path prefix for nested fields + missing: List to append missing field paths to + """ + for key, field_type in fields.items(): + field_path = f"{path_prefix}.{key}" if path_prefix else key + value = data.get(key) if isinstance(data, dict) else None + + # Check primitive fields + if isinstance(field_type, PrimitiveField): + if field_type.required and field_type.is_empty(value): + missing.append(field_path) + + # Check object fields recursively + elif isinstance(field_type, ObjectField): + if field_type.required and field_type.is_empty(value): + missing.append(field_path) + + elif value: + self._check_missing_fields( + field_type.children, value, field_path, missing + ) + + # List fields are checked during merging, not at top level + # (we care about item contents, not just list existence) + + def merge(self, new_response: dict) -> None: + """ + Merge new response into existing response. + + Args: + new_response: New response dict to merge from + """ + self._merge_fields(self.fields, self.response, new_response) + + def _merge_fields( + self, + fields: dict[str, FieldType], + target: dict, + source: dict, + ) -> None: + """ + Merge source data into target based on field definitions. + + Args: + fields: Field type definitions + target: Target dict to merge into + source: Source dict to merge from + """ + for key, field_type in fields.items(): + source_value = source.get(key) + + # Skip if source doesn't have this field + if source_value is None: + continue + + # Handle primitive fields + if isinstance(field_type, PrimitiveField): + self._merge_primitive(target, key, source_value) + + # Handle object fields + elif isinstance(field_type, ObjectField): + self._merge_object(field_type, target, key, source_value) + + # Handle list fields + elif isinstance(field_type, ListField): + self._merge_list(field_type, target, key, source_value) + + def _merge_primitive(self, target: dict, key: str, source_value: Any) -> None: + """ + Merge primitive field value. + + Args: + target: Target dict + key: Field key + source_value: Value from source + """ + # Only fill if target is empty + if not target.get(key) and source_value: + target[key] = source_value + + def _merge_object( + self, + field_type: ObjectField, + target: dict, + key: str, + source_value: dict, + ) -> None: + """ + Merge object field recursively. + + Args: + field_type: ObjectField definition + target: Target dict + key: Field key + source_value: Object value from source + """ + # Ensure target has the object + if key not in target: + target[key] = {} + + # Recursively merge children + self._merge_fields(field_type.children, target[key], source_value) + + def _merge_list( + self, + field_type: ListField, + target: dict, + key: str, + source_value: list, + ) -> None: + """ + Merge list field by matching items. + + Args: + field_type: ListField definition + target: Target dict + key: Field key + source_value: List value from source + """ + target_list = target.get(key, []) + + # If target list is empty, use source list + if not target_list: + target[key] = source_value + return + + # If source list is empty, nothing to merge + if not source_value: + return + + # Only merge if items are objects (not primitive lists) + if not isinstance(field_type.item_type, ObjectField): + return + + # Match and merge items + for target_item in target_list: + matched_item = self._find_matching_item( + target_item, source_value, field_type.item_type + ) + if matched_item: + self._merge_fields( + field_type.item_type.children, target_item, matched_item + ) + + def _find_matching_item( + self, + target_item: dict, + source_items: list[dict], + item_type: ObjectField, + ) -> dict | None: + """ + Find matching item in source list using intelligent matching. + + Matching strategy: + - Extract key fields from item schema (party_item_code, quantity, rate, description) + - Require at least 2 matching fields for a match + + Args: + target_item: Item to find match for + source_items: List of candidate items + item_type: ObjectField describing item structure + + Returns: + Matching item or None + """ + # Define priority key fields for matching + key_field_names = ["party_item_code", "quantity", "rate", "description"] + + # Filter to only fields that exist in schema + available_keys = [ + name for name in key_field_names if name in item_type.children + ] + + # Try to find match + for source_item in source_items: + if self._items_match(target_item, source_item, available_keys): + return source_item + + return None + + def _items_match( + self, + item1: dict, + item2: dict, + key_fields: list[str], + ) -> bool: + """ + Check if two items match based on key fields. + + Requires at least 2 matching fields. + + Args: + item1: First item + item2: Second item + key_fields: List of key field names to check + + Returns: + True if items match, False otherwise + """ + matches = 0 + + for field in key_fields: + value1 = item1.get(field) + value2 = item2.get(field) + + # Both must exist and be equal + if value1 and value2 and value1 == value2: + matches += 1 + + # Require at least 2 matching fields + return matches >= 2 From a7d019432bb810c2a023f699c4b33ea2408ae285 Mon Sep 17 00:00:00 2001 From: Karm Soni Date: Wed, 1 Apr 2026 12:39:03 +0530 Subject: [PATCH 3/5] feat: enhance response merging and attachment processing for improved file handling --- .../transaction_parser/__init__.py | 19 +- .../controllers/transaction.py | 7 + .../overrides/communication.py | 9 +- .../utils/response_merger.py | 206 +++--------------- 4 files changed, 53 insertions(+), 188 deletions(-) diff --git a/transaction_parser/transaction_parser/__init__.py b/transaction_parser/transaction_parser/__init__.py index fcc2c56..a327757 100644 --- a/transaction_parser/transaction_parser/__init__.py +++ b/transaction_parser/transaction_parser/__init__.py @@ -54,7 +54,24 @@ def _parse( file_urls = [file_urls] file_names = frappe.get_list( - "File", filters={"file_url": ("in", file_urls)}, pluck="name" + "File", + filters={"file_url": ("in", file_urls)}, + fields=["name", "file_type"], + order_by="creation desc", + group_by="file_url", + ) + + # xlsx/xls first, then pdf, then csv. If no xlsx/xls, csv takes its place. + file_types = {(f.file_type or "").lower() for f in file_names} + has_spreadsheet = file_types & {"xlsx", "xls"} + + if has_spreadsheet: + FILE_TYPE_PRIORITY = {"xlsx": 0, "xls": 0, "pdf": 1, "csv": 2} + else: + FILE_TYPE_PRIORITY = {"csv": 0, "pdf": 1} + + file_names.sort( + key=lambda f: FILE_TYPE_PRIORITY.get((f.file_type or "").lower(), 99) ) files = [] diff --git a/transaction_parser/transaction_parser/controllers/transaction.py b/transaction_parser/transaction_parser/controllers/transaction.py index 69c5fd7..e3cc57b 100644 --- a/transaction_parser/transaction_parser/controllers/transaction.py +++ b/transaction_parser/transaction_parser/controllers/transaction.py @@ -113,6 +113,7 @@ def _parse_multiple_files( merger = ResponseMerger( response, schema=self.get_schema(), + match_keys=self.get_match_keys(), ) for file in self.files[1:]: @@ -128,6 +129,12 @@ def _parse_multiple_files( ########## Output Schema ########## ################################### + def get_match_keys(self) -> dict[str, list[str]]: + """Return list field name -> key fields used to match items during merge.""" + return { + "item_list": ["party_item_code", "quantity", "rate", "description"], + } + def get_schema(self) -> dict: if not self.schema: self.schema = self._get_schema() diff --git a/transaction_parser/transaction_parser/overrides/communication.py b/transaction_parser/transaction_parser/overrides/communication.py index 99c1b7e..5ddc78d 100644 --- a/transaction_parser/transaction_parser/overrides/communication.py +++ b/transaction_parser/transaction_parser/overrides/communication.py @@ -108,19 +108,12 @@ def _process_attachments( if not filtered_attachments: return - sorted_attachments = sorted( - filtered_attachments, - key=lambda attachment: {"xlsx": 0, "csv": 1, "pdf": 2}.get( - attachment.file_url.split(".")[-1].lower(), 3 - ), - ) - frappe.enqueue( "transaction_parser.transaction_parser.overrides.communication._parse_attachments", doc=doc, country=country, transaction_type=transaction_type, - attachments=sorted_attachments, + attachments=filtered_attachments, ai_model=settings.default_ai_model, user=user, party=party, diff --git a/transaction_parser/transaction_parser/utils/response_merger.py b/transaction_parser/transaction_parser/utils/response_merger.py index 0398a41..826750c 100644 --- a/transaction_parser/transaction_parser/utils/response_merger.py +++ b/transaction_parser/transaction_parser/utils/response_merger.py @@ -11,7 +11,6 @@ class FieldType: required: bool def is_empty(self, value: Any) -> bool: - """Check if a value is considered empty.""" return ( value is None or value == "" @@ -41,18 +40,10 @@ class ListField(FieldType): class SchemaParser: - """Parses document schema and builds structured field type hierarchy.""" + """Parses a schema dict into a structured FieldType hierarchy.""" def parse(self, schema: dict) -> dict[str, FieldType]: - """ - Parse schema dictionary into structured field types. - - Args: - schema: Schema dictionary to parse - - Returns: - Dictionary mapping field names to FieldType instances - """ + """Parse schema dict into field name -> FieldType mapping.""" fields = {} for key, value in schema.items(): @@ -61,101 +52,55 @@ def parse(self, schema: dict) -> dict[str, FieldType]: return fields def _parse_field(self, schema_value: Any) -> FieldType: - """ - Parse a single field value into appropriate FieldType. - - Args: - schema_value: Value from schema (string, list, or dict) - - Returns: - Appropriate FieldType instance - """ - # List field: schema value is a list + """Determine and return the appropriate FieldType for a schema value.""" if isinstance(schema_value, list): - required = self._is_required(schema_value) - - # Empty list or primitive list items if not schema_value: - return ListField( - required=required, item_type=PrimitiveField(required=True) - ) + return ListField(required=True, item_type=PrimitiveField(required=True)) item_schema = schema_value[0] - # List of objects (e.g., item_list) if isinstance(item_schema, dict): item_fields = self.parse(item_schema) return ListField( - required=required, + required=True, item_type=ObjectField(required=True, children=item_fields), ) - - # List of primitives (e.g., emails, phones) else: return ListField( - required=required, - item_type=PrimitiveField(required=self._is_required(item_schema)), + required=True, + item_type=PrimitiveField(required=True), ) - # Object field: schema value is a dict elif isinstance(schema_value, dict): children = self.parse(schema_value) return ObjectField(required=True, children=children) - # Primitive field: schema value is a string else: - return PrimitiveField(required=self._is_required(schema_value)) - - def _is_required(self, schema_value: Any) -> bool: - """ - Check if field is required based on schema notation. - - Args: - schema_value: Schema value (string or other type) - - Returns: - True if required, False if optional - """ - if isinstance(schema_value, str): - return "| null" not in schema_value and "| 0" not in schema_value - - return True + return PrimitiveField(required=True) class ResponseMerger: """Schema-driven merger for AI responses from multiple attachments.""" - def __init__(self, response: dict, schema: dict): - """ - Initialize ResponseMerger with a base response and optional schema. - - Args: - response: Initial response dict from first file parsing - schema: Document schema dict for automatic field detection - """ + def __init__( + self, + response: dict, + schema: dict, + match_keys: dict[str, list[str]] | None = None, + ): self.response = _dict(response) if isinstance(response, dict) else response self.schema = schema + self.match_keys = match_keys or {} - # Parse schema into structured field types parser = SchemaParser() self.fields = parser.parse(schema) def is_complete(self) -> bool: - """ - Check if all required fields are filled. - - Returns: - True if complete, False otherwise - """ + """Return True if all required fields are filled.""" return len(self.get_missing_fields()) == 0 def get_missing_fields(self) -> list[str]: - """ - Get list of missing required field paths. - - Returns: - List of dot-separated field paths that are missing - """ + """Return dot-separated paths of missing required fields.""" missing = [] self._check_missing_fields(self.fields, self.response, "", missing) return missing @@ -167,44 +112,26 @@ def _check_missing_fields( path_prefix: str, missing: list[str], ) -> None: - """ - Recursively check for missing required fields. - - Args: - fields: Field type definitions - data: Current data dict to check - path_prefix: Current path prefix for nested fields - missing: List to append missing field paths to - """ for key, field_type in fields.items(): field_path = f"{path_prefix}.{key}" if path_prefix else key value = data.get(key) if isinstance(data, dict) else None - # Check primitive fields if isinstance(field_type, PrimitiveField): if field_type.required and field_type.is_empty(value): missing.append(field_path) - # Check object fields recursively elif isinstance(field_type, ObjectField): if field_type.required and field_type.is_empty(value): missing.append(field_path) - elif value: self._check_missing_fields( field_type.children, value, field_path, missing ) # List fields are checked during merging, not at top level - # (we care about item contents, not just list existence) def merge(self, new_response: dict) -> None: - """ - Merge new response into existing response. - - Args: - new_response: New response dict to merge from - """ + """Merge new_response into the existing response.""" self._merge_fields(self.fields, self.response, new_response) def _merge_fields( @@ -213,43 +140,22 @@ def _merge_fields( target: dict, source: dict, ) -> None: - """ - Merge source data into target based on field definitions. - - Args: - fields: Field type definitions - target: Target dict to merge into - source: Source dict to merge from - """ for key, field_type in fields.items(): source_value = source.get(key) - # Skip if source doesn't have this field if source_value is None: continue - # Handle primitive fields if isinstance(field_type, PrimitiveField): self._merge_primitive(target, key, source_value) - # Handle object fields elif isinstance(field_type, ObjectField): self._merge_object(field_type, target, key, source_value) - # Handle list fields elif isinstance(field_type, ListField): self._merge_list(field_type, target, key, source_value) def _merge_primitive(self, target: dict, key: str, source_value: Any) -> None: - """ - Merge primitive field value. - - Args: - target: Target dict - key: Field key - source_value: Value from source - """ - # Only fill if target is empty if not target.get(key) and source_value: target[key] = source_value @@ -260,20 +166,9 @@ def _merge_object( key: str, source_value: dict, ) -> None: - """ - Merge object field recursively. - - Args: - field_type: ObjectField definition - target: Target dict - key: Field key - source_value: Object value from source - """ - # Ensure target has the object - if key not in target: + if key not in target or target[key] is None: target[key] = {} - # Recursively merge children self._merge_fields(field_type.children, target[key], source_value) def _merge_list( @@ -283,34 +178,25 @@ def _merge_list( key: str, source_value: list, ) -> None: - """ - Merge list field by matching items. - - Args: - field_type: ListField definition - target: Target dict - key: Field key - source_value: List value from source - """ target_list = target.get(key, []) - # If target list is empty, use source list if not target_list: target[key] = source_value return - # If source list is empty, nothing to merge if not source_value: return - # Only merge if items are objects (not primitive lists) if not isinstance(field_type.item_type, ObjectField): return - # Match and merge items + key_fields = self.match_keys.get(key, []) + if not key_fields: + return + for target_item in target_list: matched_item = self._find_matching_item( - target_item, source_value, field_type.item_type + target_item, source_value, key_fields ) if matched_item: self._merge_fields( @@ -321,34 +207,11 @@ def _find_matching_item( self, target_item: dict, source_items: list[dict], - item_type: ObjectField, + key_fields: list[str], ) -> dict | None: - """ - Find matching item in source list using intelligent matching. - - Matching strategy: - - Extract key fields from item schema (party_item_code, quantity, rate, description) - - Require at least 2 matching fields for a match - - Args: - target_item: Item to find match for - source_items: List of candidate items - item_type: ObjectField describing item structure - - Returns: - Matching item or None - """ - # Define priority key fields for matching - key_field_names = ["party_item_code", "quantity", "rate", "description"] - - # Filter to only fields that exist in schema - available_keys = [ - name for name in key_field_names if name in item_type.children - ] - - # Try to find match + # Require at least 2 matching key fields to avoid false positives for source_item in source_items: - if self._items_match(target_item, source_item, available_keys): + if self._items_match(target_item, source_item, key_fields): return source_item return None @@ -359,28 +222,13 @@ def _items_match( item2: dict, key_fields: list[str], ) -> bool: - """ - Check if two items match based on key fields. - - Requires at least 2 matching fields. - - Args: - item1: First item - item2: Second item - key_fields: List of key field names to check - - Returns: - True if items match, False otherwise - """ matches = 0 for field in key_fields: value1 = item1.get(field) value2 = item2.get(field) - # Both must exist and be equal if value1 and value2 and value1 == value2: matches += 1 - # Require at least 2 matching fields return matches >= 2 From 7110c5e3431d3f14bcbc81edc0323ea6769d749c Mon Sep 17 00:00:00 2001 From: Karm Soni Date: Fri, 3 Apr 2026 13:36:47 +0530 Subject: [PATCH 4/5] feat: implement custom exception handling for file processing errors and enhance error logging --- transaction_parser/exceptions.py | 5 +++ .../transaction_parser/__init__.py | 44 +++++++++++-------- .../ai_integration/parser.py | 11 ++--- .../controllers/transaction.py | 43 ++++++++++++------ .../overrides/communication.py | 1 - .../utils/file_processor.py | 18 +++++--- 6 files changed, 75 insertions(+), 47 deletions(-) create mode 100644 transaction_parser/exceptions.py diff --git a/transaction_parser/exceptions.py b/transaction_parser/exceptions.py new file mode 100644 index 0000000..a6458d7 --- /dev/null +++ b/transaction_parser/exceptions.py @@ -0,0 +1,5 @@ +import frappe + + +class FileProcessingError(frappe.ValidationError): + """Custom exception for file processing errors.""" diff --git a/transaction_parser/transaction_parser/__init__.py b/transaction_parser/transaction_parser/__init__.py index a327757..b6b16e3 100644 --- a/transaction_parser/transaction_parser/__init__.py +++ b/transaction_parser/transaction_parser/__init__.py @@ -80,7 +80,7 @@ def _parse( files.append(file) controller = get_controller(country, transaction)(party=party, company=company) - doc = controller.generate(files, ai_model, page_limit, communication_name) + doc = controller.generate(files, ai_model, page_limit) filenames = ( ", ".join([f.file_name for f in files]) @@ -99,33 +99,39 @@ def _parse( except Exception as e: notification = None + reference_doctype = "Communication" if communication_name else "File" + reference_docname = ( + communication_name + if communication_name + else (files[0].name if files else None) + ) if ( isinstance(e, frappe.DuplicateEntryError) and frappe.flags.skip_duplicate_error ): + subject = _("Duplicate {0} found for {1}").format( + _(TRANSACTION_MAP[transaction]), + f"{reference_doctype} {reference_docname}", + ) + notification = { - "document_type": "Communication" if communication_name else "File", - "document_name": ( - communication_name if communication_name else files[0].name - ), - "subject": _("Duplicate entry found for {0}").format(file_urls), + "document_type": reference_doctype, + "document_name": reference_docname, + "subject": subject, "message": str(e), } - return - - error_log = frappe.log_error( - "Transaction Parser API Error", - reference_doctype="Communication" if communication_name else "File", - reference_name=( - communication_name - if communication_name - else files[0].name - if files - else None - ), + + if not (error_log := getattr(e, "error_log", None)): + error_log = frappe.log_error( + "Transaction Parser Error", + reference_doctype=reference_doctype, + reference_name=reference_docname, + ) + + message = _("Failed to generate {0} from {1}").format( + TRANSACTION_MAP[transaction], f"{reference_doctype} {reference_docname}" ) - message = _("Failed to generate {0} from {1}").format(_(transaction), file_urls) notification = { "document_type": error_log.doctype, diff --git a/transaction_parser/transaction_parser/ai_integration/parser.py b/transaction_parser/transaction_parser/ai_integration/parser.py index 7cbe23e..84d1343 100644 --- a/transaction_parser/transaction_parser/ai_integration/parser.py +++ b/transaction_parser/transaction_parser/ai_integration/parser.py @@ -35,14 +35,12 @@ def parse( document_schema: dict, document_data: str, doc_name: str | None = None, - is_communication: bool = False, ) -> dict: messages = self._build_messages(document_type, document_schema, document_data) response = self.send_message( messages=messages, doc_name=doc_name, - is_communication=is_communication, ) return self.get_content(response) @@ -72,10 +70,9 @@ def send_message( self, messages: tuple, doc_name: str | None = None, - is_communication: bool = False, ) -> dict: """Send messages to AI API and handle the response.""" - log = self._create_log_entry(doc_name, is_communication) + log = self._create_log_entry(doc_name) try: response = self._make_api_call(messages) @@ -96,15 +93,13 @@ def send_message( finally: enqueue_integration_request(**log) - def _create_log_entry( - self, doc_name: str | None, is_communication: bool = False - ) -> frappe._dict: + def _create_log_entry(self, doc_name: str | None) -> frappe._dict: """Create a log entry for the API call.""" log = frappe._dict(url=self.model.base_url) log.update( { - "reference_doctype": "Communication" if is_communication else "File", + "reference_doctype": "File", "reference_name": doc_name, } ) diff --git a/transaction_parser/transaction_parser/controllers/transaction.py b/transaction_parser/transaction_parser/controllers/transaction.py index e3cc57b..86f53ac 100644 --- a/transaction_parser/transaction_parser/controllers/transaction.py +++ b/transaction_parser/transaction_parser/controllers/transaction.py @@ -2,8 +2,10 @@ import frappe from erpnext.setup.utils import get_exchange_rate from erpnext.stock.get_item_details import get_item_details +from httpx import HTTPError from rapidfuzz import fuzz, process +from transaction_parser.exceptions import FileProcessingError from transaction_parser.transaction_parser.ai_integration.parser import AIParser from transaction_parser.transaction_parser.utils import to_dict from transaction_parser.transaction_parser.utils.file_processor import FileProcessor @@ -36,7 +38,6 @@ def generate( files, ai_model: str | None = None, page_limit: int | None = None, - communication_name: str | None = None, ): self.initialize() @@ -45,7 +46,6 @@ def generate( self.files = files self.ai_model = ai_model - self.communication_name = communication_name self.data = self._parse_file_content(ai_model, page_limit) self.doc = frappe.get_doc({"doctype": self.DOCTYPE}) self.doc.is_created_by_transaction_parser = 1 @@ -61,7 +61,6 @@ def generate( def initialize(self) -> None: # file processing self.files = None - self.communication_name = None # output schema self.schema = None @@ -95,16 +94,34 @@ def _parse_single_file( ai_model: str | None = None, page_limit: int | None = None, ) -> dict: - content = FileProcessor().get_content(file, page_limit) - schema = self.get_schema() - - return AIParser(ai_model, self.settings).parse( - document_type=self.DOCTYPE, - document_schema=schema, - document_data=content, - doc_name=self.communication_name or file.name, - is_communication=bool(self.communication_name), - ) + try: + content = FileProcessor().get_content(file, page_limit) + schema = self.get_schema() + + return AIParser(ai_model, self.settings).parse( + document_type=self.DOCTYPE, + document_schema=schema, + document_data=content, + doc_name=file.name, + ) + + except FileProcessingError as e: + error_log = frappe.log_error( + title="File processing error in Transaction Parser", + reference_doctype="File", + reference_name=file.name, + ) + e.error_log = error_log + raise e + + except HTTPError as e: + error_log = frappe.log_error( + title="Transaction Parser API error", + reference_doctype="File", + reference_name=file.name, + ) + e.error_log = error_log + raise e def _parse_multiple_files( self, ai_model: str | None = None, page_limit: int | None = None diff --git a/transaction_parser/transaction_parser/overrides/communication.py b/transaction_parser/transaction_parser/overrides/communication.py index 5ddc78d..7579fe2 100644 --- a/transaction_parser/transaction_parser/overrides/communication.py +++ b/transaction_parser/transaction_parser/overrides/communication.py @@ -151,7 +151,6 @@ def _parse_attachments( user=user, party=party, company=company, - communication_name=doc.name, ) frappe.db.commit() diff --git a/transaction_parser/transaction_parser/utils/file_processor.py b/transaction_parser/transaction_parser/utils/file_processor.py index 8a3ccd9..72351bc 100644 --- a/transaction_parser/transaction_parser/utils/file_processor.py +++ b/transaction_parser/transaction_parser/utils/file_processor.py @@ -10,17 +10,23 @@ read_xlsx_file_from_attached_file, ) +from transaction_parser.exceptions import FileProcessingError + class FileProcessor: """Process files: PDF (trim pages, apply OCR), CSV/Excel (parse data), extract content.""" def get_content(self, doc, page_limit=None): - if doc.file_type == "PDF": - return self._process_pdf(doc, page_limit) - elif doc.file_type in ["CSV", "XLSX", "XLS"]: - return self._process_spreadsheet(doc) - else: - frappe.throw(_("Only PDF, CSV, and Excel files are supported")) + try: + if doc.file_type == "PDF": + return self._process_pdf(doc, page_limit) + elif doc.file_type in ["CSV", "XLSX", "XLS"]: + return self._process_spreadsheet(doc) + else: + frappe.throw(_("Only PDF, CSV, and Excel files are supported")) + + except Exception as e: + raise FileProcessingError from e def _process_pdf(self, doc, page_limit=None): """Process PDF files with OCR and page limiting.""" From f4d9ddb90d83cc8debc4ac10e44e46295d53cedb Mon Sep 17 00:00:00 2001 From: Karm Soni Date: Fri, 3 Apr 2026 13:55:56 +0530 Subject: [PATCH 5/5] refactor: rename parameter 'doc_name' to 'file_doc_name' for consistency in AIParser and Transaction classes --- .../transaction_parser/ai_integration/parser.py | 12 ++++++------ .../transaction_parser/controllers/transaction.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/transaction_parser/transaction_parser/ai_integration/parser.py b/transaction_parser/transaction_parser/ai_integration/parser.py index 84d1343..cf55e43 100644 --- a/transaction_parser/transaction_parser/ai_integration/parser.py +++ b/transaction_parser/transaction_parser/ai_integration/parser.py @@ -34,13 +34,13 @@ def parse( document_type: str, document_schema: dict, document_data: str, - doc_name: str | None = None, + file_doc_name: str | None = None, ) -> dict: messages = self._build_messages(document_type, document_schema, document_data) response = self.send_message( messages=messages, - doc_name=doc_name, + file_doc_name=file_doc_name, ) return self.get_content(response) @@ -69,10 +69,10 @@ def _build_messages( def send_message( self, messages: tuple, - doc_name: str | None = None, + file_doc_name: str | None = None, ) -> dict: """Send messages to AI API and handle the response.""" - log = self._create_log_entry(doc_name) + log = self._create_log_entry(file_doc_name) try: response = self._make_api_call(messages) @@ -93,14 +93,14 @@ def send_message( finally: enqueue_integration_request(**log) - def _create_log_entry(self, doc_name: str | None) -> frappe._dict: + def _create_log_entry(self, file_doc_name: str | None) -> frappe._dict: """Create a log entry for the API call.""" log = frappe._dict(url=self.model.base_url) log.update( { "reference_doctype": "File", - "reference_name": doc_name, + "reference_name": file_doc_name, } ) diff --git a/transaction_parser/transaction_parser/controllers/transaction.py b/transaction_parser/transaction_parser/controllers/transaction.py index 86f53ac..e6aae36 100644 --- a/transaction_parser/transaction_parser/controllers/transaction.py +++ b/transaction_parser/transaction_parser/controllers/transaction.py @@ -102,7 +102,7 @@ def _parse_single_file( document_type=self.DOCTYPE, document_schema=schema, document_data=content, - doc_name=file.name, + file_doc_name=file.name, ) except FileProcessingError as e: