diff --git a/transaction_parser/exceptions.py b/transaction_parser/exceptions.py new file mode 100644 index 0000000..a6458d7 --- /dev/null +++ b/transaction_parser/exceptions.py @@ -0,0 +1,5 @@ +import frappe + + +class FileProcessingError(frappe.ValidationError): + """Custom exception for file processing errors.""" diff --git a/transaction_parser/transaction_parser/__init__.py b/transaction_parser/transaction_parser/__init__.py index 83555ce..b6b16e3 100644 --- a/transaction_parser/transaction_parser/__init__.py +++ b/transaction_parser/transaction_parser/__init__.py @@ -34,54 +34,104 @@ def parse(transaction, country, file_url, ai_model=None, page_limit=None): def _parse( country, transaction, - file_url, + file_urls, ai_model=None, page_limit=None, user=None, party=None, company=None, + communication_name=None, ): try: - file = None - filename = file_url.split("/")[-1] + if ( + isinstance(file_urls, str) + and file_urls.startswith("[") + and file_urls.endswith("]") + ): + file_urls = frappe.parse_json(file_urls) + + elif isinstance(file_urls, str): + file_urls = [file_urls] + + file_names = frappe.get_list( + "File", + filters={"file_url": ("in", file_urls)}, + fields=["name", "file_type"], + order_by="creation desc", + group_by="file_url", + ) + + # xlsx/xls first, then pdf, then csv. If no xlsx/xls, csv takes its place. + file_types = {(f.file_type or "").lower() for f in file_names} + has_spreadsheet = file_types & {"xlsx", "xls"} + + if has_spreadsheet: + FILE_TYPE_PRIORITY = {"xlsx": 0, "xls": 0, "pdf": 1, "csv": 2} + else: + FILE_TYPE_PRIORITY = {"csv": 0, "pdf": 1} - file = frappe.get_last_doc("File", filters={"file_url": file_url}) - filename = file.file_name + file_names.sort( + key=lambda f: FILE_TYPE_PRIORITY.get((f.file_type or "").lower(), 99) + ) + + files = [] + for file_name in file_names: + file = frappe.get_doc("File", file_name) + files.append(file) controller = get_controller(country, transaction)(party=party, company=company) - doc = controller.generate(file, ai_model, page_limit) + doc = controller.generate(files, ai_model, page_limit) + filenames = ( + ", ".join([f.file_name for f in files]) + if len(files) > 1 + else files[0].file_name + ) notification = { "document_type": TRANSACTION_MAP[transaction], "document_name": doc.name, "subject": _("{0} {1} generated from {2}").format( _(TRANSACTION_MAP[transaction]), doc.name, - filename, + filenames, ), } except Exception as e: notification = None + reference_doctype = "Communication" if communication_name else "File" + reference_docname = ( + communication_name + if communication_name + else (files[0].name if files else None) + ) if ( isinstance(e, frappe.DuplicateEntryError) and frappe.flags.skip_duplicate_error ): + subject = _("Duplicate {0} found for {1}").format( + _(TRANSACTION_MAP[transaction]), + f"{reference_doctype} {reference_docname}", + ) + notification = { - "document_type": "File", - "document_name": file.name if file else filename, - "subject": _("Duplicate entry found for {0}").format(filename), + "document_type": reference_doctype, + "document_name": reference_docname, + "subject": subject, "message": str(e), } - return - error_log = frappe.log_error( - "Transaction Parser API Error", - reference_doctype="File", - reference_name=file.name if file else filename, + if not (error_log := getattr(e, "error_log", None)): + error_log = frappe.log_error( + "Transaction Parser Error", + reference_doctype=reference_doctype, + reference_name=reference_docname, + ) + + message = _("Failed to generate {0} from {1}").format( + TRANSACTION_MAP[transaction], f"{reference_doctype} {reference_docname}" ) - message = _("Failed to generate {0} from {1}").format(_(transaction), filename) notification = { "document_type": error_log.doctype, @@ -90,7 +140,7 @@ def _parse( "message": str(e), } - email_failure(user, message, str(e), file_url) + email_failure(user, message, str(e), file_urls) finally: if notification: diff --git a/transaction_parser/transaction_parser/ai_integration/parser.py b/transaction_parser/transaction_parser/ai_integration/parser.py index f74eec2..cf55e43 100644 --- a/transaction_parser/transaction_parser/ai_integration/parser.py +++ b/transaction_parser/transaction_parser/ai_integration/parser.py @@ -37,11 +37,19 @@ def parse( file_doc_name: str | None = None, ) -> dict: messages = self._build_messages(document_type, document_schema, document_data) - response = self.send_message(messages=messages, file_doc_name=file_doc_name) + + response = self.send_message( + messages=messages, + file_doc_name=file_doc_name, + ) + return self.get_content(response) def _build_messages( - self, document_type: str, document_schema: dict, document_data: str + self, + document_type: str, + document_schema: dict, + document_data: str, ) -> tuple: """Build the message structure for AI API call.""" system_prompt = get_system_prompt(document_schema) @@ -58,7 +66,11 @@ def _build_messages( }, ) - def send_message(self, messages: tuple, file_doc_name: str | None = None) -> dict: + def send_message( + self, + messages: tuple, + file_doc_name: str | None = None, + ) -> dict: """Send messages to AI API and handle the response.""" log = self._create_log_entry(file_doc_name) @@ -84,13 +96,14 @@ def send_message(self, messages: tuple, file_doc_name: str | None = None) -> dic def _create_log_entry(self, file_doc_name: str | None) -> frappe._dict: """Create a log entry for the API call.""" log = frappe._dict(url=self.model.base_url) - if file_doc_name: - log.update( - { - "reference_doctype": "File", - "reference_name": file_doc_name, - } - ) + + log.update( + { + "reference_doctype": "File", + "reference_name": file_doc_name, + } + ) + return log def _make_api_call(self, messages: tuple) -> Any: diff --git a/transaction_parser/transaction_parser/controllers/transaction.py b/transaction_parser/transaction_parser/controllers/transaction.py index 05a59c0..e6aae36 100644 --- a/transaction_parser/transaction_parser/controllers/transaction.py +++ b/transaction_parser/transaction_parser/controllers/transaction.py @@ -2,11 +2,16 @@ import frappe from erpnext.setup.utils import get_exchange_rate from erpnext.stock.get_item_details import get_item_details +from httpx import HTTPError from rapidfuzz import fuzz, process +from transaction_parser.exceptions import FileProcessingError from transaction_parser.transaction_parser.ai_integration.parser import AIParser from transaction_parser.transaction_parser.utils import to_dict from transaction_parser.transaction_parser.utils.file_processor import FileProcessor +from transaction_parser.transaction_parser.utils.response_merger import ( + ResponseMerger, +) class Transaction: @@ -29,11 +34,17 @@ def __init__( self.company = company def generate( - self, file, ai_model: str | None = None, page_limit: int | None = None + self, + files, + ai_model: str | None = None, + page_limit: int | None = None, ): self.initialize() - self.file = file + if isinstance(files, str): + files = [files] + + self.files = files self.ai_model = ai_model self.data = self._parse_file_content(ai_model, page_limit) self.doc = frappe.get_doc({"doctype": self.DOCTYPE}) @@ -49,7 +60,7 @@ def generate( def initialize(self) -> None: # file processing - self.file = None + self.files = None # output schema self.schema = None @@ -72,20 +83,75 @@ def initialize(self) -> None: def _parse_file_content( self, ai_model: str | None = None, page_limit: int | None = None ) -> dict: - content = FileProcessor().get_content(self.file, page_limit) - schema = self.get_schema() - - return AIParser(ai_model, self.settings).parse( - document_type=self.DOCTYPE, - document_schema=schema, - document_data=content, - file_doc_name=self.file.name, + if len(self.files) > 1: + return self._parse_multiple_files(ai_model, page_limit) + + return self._parse_single_file(self.files[0], ai_model, page_limit) + + def _parse_single_file( + self, + file, + ai_model: str | None = None, + page_limit: int | None = None, + ) -> dict: + try: + content = FileProcessor().get_content(file, page_limit) + schema = self.get_schema() + + return AIParser(ai_model, self.settings).parse( + document_type=self.DOCTYPE, + document_schema=schema, + document_data=content, + file_doc_name=file.name, + ) + + except FileProcessingError as e: + error_log = frappe.log_error( + title="File processing error in Transaction Parser", + reference_doctype="File", + reference_name=file.name, + ) + e.error_log = error_log + raise e + + except HTTPError as e: + error_log = frappe.log_error( + title="Transaction Parser API error", + reference_doctype="File", + reference_name=file.name, + ) + e.error_log = error_log + raise e + + def _parse_multiple_files( + self, ai_model: str | None = None, page_limit: int | None = None + ) -> dict: + response = self._parse_single_file(self.files[0], ai_model, page_limit) + merger = ResponseMerger( + response, + schema=self.get_schema(), + match_keys=self.get_match_keys(), ) + for file in self.files[1:]: + if merger.is_complete(): + break + + new_response = self._parse_single_file(file, ai_model, page_limit) + merger.merge(new_response) + + return merger.response + ################################### ########## Output Schema ########## ################################### + def get_match_keys(self) -> dict[str, list[str]]: + """Return list field name -> key fields used to match items during merge.""" + return { + "item_list": ["party_item_code", "quantity", "rate", "description"], + } + def get_schema(self) -> dict: if not self.schema: self.schema = self._get_schema() @@ -271,9 +337,12 @@ def _set_flags(self) -> None: self.doc.flags.ignore_links = True def _attach_file(self) -> None: - self.file.attached_to_doctype = self.DOCTYPE - self.file.attached_to_name = self.doc.name - self.file.save() + files_to_attach = self.files if isinstance(self.files, list) else [self.files] + + for file_doc in files_to_attach: + file_doc.attached_to_doctype = self.DOCTYPE + file_doc.attached_to_name = self.doc.name + file_doc.save() def set_exchange_rate(self, from_currency, date, args): company_currency = erpnext.get_company_currency(self.doc.company) diff --git a/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json b/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json index 73a5a16..96706b2 100644 --- a/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json +++ b/transaction_parser/transaction_parser/doctype/transaction_parser_settings/transaction_parser_settings.json @@ -1,180 +1,189 @@ { - "actions": [], - "allow_rename": 1, - "creation": "2025-03-11 16:54:51.622854", - "doctype": "DocType", - "engine": "InnoDB", - "field_order": [ - "api_tab", - "enabled", - "ai_model_section", - "default_ai_model", - "api_keys", - "transaction_configurations_section", - "invoice_lookback_count", - "email_configuration_section", - "parse_incoming_emails", - "parse_party_emails", - "incoming_email_accounts", - "party_emails", - "custom_fields_tab", - "custom_fields_section", - "column_break_kdtw", - "base_schema", - "tax_schema", - "address_schema", - "party_schema", - "item_schema" - ], - "fields": [ - { - "fieldname": "api_tab", - "fieldtype": "Tab Break", - "label": "API" - }, - { - "default": "0", - "fieldname": "enabled", - "fieldtype": "Check", - "label": "Enabled" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "custom_fields_tab", - "fieldtype": "Tab Break", - "label": "Customizations" - }, - { - "fieldname": "custom_fields_section", - "fieldtype": "Section Break", - "label": "Custom Fields" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "column_break_kdtw", - "fieldtype": "Column Break" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "base_schema", - "fieldtype": "JSON", - "label": "Base Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "tax_schema", - "fieldtype": "JSON", - "label": "Tax Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "address_schema", - "fieldtype": "JSON", - "label": "Address Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "party_schema", - "fieldtype": "JSON", - "label": "Party Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "item_schema", - "fieldtype": "JSON", - "label": "Item Schema" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "default_ai_model", - "fieldtype": "Select", - "label": "Default AI Model", - "mandatory_depends_on": "eval: doc.enabled", - "options": "DeepSeek Chat\nDeepSeek Reasoner\nOpenAI gpt-4o\nOpenAI gpt-4o-mini\nOpenAI gpt-5\nOpenAI gpt-5-mini\nGoogle Gemini Pro\nGoogle Gemini Flash" - }, - { - "depends_on": "eval: doc.enabled", - "fieldname": "api_keys", - "fieldtype": "Table", - "label": "API Keys", - "options": "Transaction Parser API Key Item" - }, - { - "default": "0", - "fieldname": "parse_incoming_emails", - "fieldtype": "Check", - "label": "Parse Incoming Emails" - }, - { - "default": "0", - "depends_on": "eval: doc.parse_incoming_emails", - "description": "Directly Parse Emails from Party", - "fieldname": "parse_party_emails", - "fieldtype": "Check", - "label": "Parse Party Emails" - }, - { - "fieldname": "email_configuration_section", - "fieldtype": "Section Break", - "label": "Incoming Email Configurations" - }, - { - "fieldname": "ai_model_section", - "fieldtype": "Section Break", - "label": "AI Model Configurations" - }, - { - "depends_on": "eval: doc.parse_incoming_emails", - "description": "If an email is received on any of these Email Accounts, an attempt will be made to generate the specified Transaction from its attachments.", - "fieldname": "incoming_email_accounts", - "fieldtype": "Table", - "label": "Incoming Email Accounts", - "mandatory_depends_on": "eval: doc.parse_incoming_emails && !doc.parse_party_emails", - "options": "Transaction Parser Email Account" - }, - { - "depends_on": "eval: doc.parse_incoming_emails", - "description": "Mapping Party based on Email Address", - "fieldname": "party_emails", - "fieldtype": "Table", - "label": "Party Emails", - "options": "Transaction Parser Party Email" - }, - { - "fieldname": "invoice_lookback_count", - "fieldtype": "Int", - "in_list_view": 1, - "label": "Number of Past Invoices to Consider for Item Code Selection", - "reqd": 1 - }, - { - "fieldname": "transaction_configurations_section", - "fieldtype": "Section Break", - "label": "Transaction Configurations" - } - ], - "index_web_pages_for_search": 1, - "issingle": 1, - "links": [], - "modified": "2025-09-08 08:48:58.870032", - "modified_by": "Administrator", - "module": "Transaction Parser", - "name": "Transaction Parser Settings", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "print": 1, - "read": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "row_format": "Dynamic", - "sort_field": "modified", - "sort_order": "DESC", - "states": [] + "actions": [], + "allow_rename": 1, + "creation": "2025-03-11 16:54:51.622854", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "api_tab", + "enabled", + "ai_model_section", + "default_ai_model", + "api_keys", + "transaction_configurations_section", + "invoice_lookback_count", + "email_configuration_section", + "parse_incoming_emails", + "parse_party_emails", + "process_one_document_per_communication", + "incoming_email_accounts", + "party_emails", + "custom_fields_tab", + "custom_fields_section", + "column_break_kdtw", + "base_schema", + "tax_schema", + "address_schema", + "party_schema", + "item_schema" + ], + "fields": [ + { + "fieldname": "api_tab", + "fieldtype": "Tab Break", + "label": "API" + }, + { + "default": "0", + "fieldname": "enabled", + "fieldtype": "Check", + "label": "Enabled" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "custom_fields_tab", + "fieldtype": "Tab Break", + "label": "Customizations" + }, + { + "fieldname": "custom_fields_section", + "fieldtype": "Section Break", + "label": "Custom Fields" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "column_break_kdtw", + "fieldtype": "Column Break" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "base_schema", + "fieldtype": "JSON", + "label": "Base Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "tax_schema", + "fieldtype": "JSON", + "label": "Tax Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "address_schema", + "fieldtype": "JSON", + "label": "Address Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "party_schema", + "fieldtype": "JSON", + "label": "Party Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "item_schema", + "fieldtype": "JSON", + "label": "Item Schema" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "default_ai_model", + "fieldtype": "Select", + "label": "Default AI Model", + "mandatory_depends_on": "eval: doc.enabled", + "options": "DeepSeek Chat\nDeepSeek Reasoner\nOpenAI gpt-4o\nOpenAI gpt-4o-mini\nOpenAI gpt-5\nOpenAI gpt-5-mini\nGoogle Gemini Pro\nGoogle Gemini Flash" + }, + { + "depends_on": "eval: doc.enabled", + "fieldname": "api_keys", + "fieldtype": "Table", + "label": "API Keys", + "options": "Transaction Parser API Key Item" + }, + { + "default": "0", + "fieldname": "parse_incoming_emails", + "fieldtype": "Check", + "label": "Parse Incoming Emails" + }, + { + "default": "0", + "depends_on": "eval: doc.parse_incoming_emails", + "description": "Directly Parse Emails from Party", + "fieldname": "parse_party_emails", + "fieldtype": "Check", + "label": "Parse Party Emails" + }, + { + "default": "1", + "depends_on": "eval: doc.parse_incoming_emails", + "description": "When enabled, all attachments from a communication are combined to create a single document. When disabled, each attachment creates a separate document.", + "fieldname": "process_one_document_per_communication", + "fieldtype": "Check", + "label": "Process One Document Per Communication" + }, + { + "fieldname": "email_configuration_section", + "fieldtype": "Section Break", + "label": "Incoming Email Configurations" + }, + { + "fieldname": "ai_model_section", + "fieldtype": "Section Break", + "label": "AI Model Configurations" + }, + { + "depends_on": "eval: doc.parse_incoming_emails", + "description": "If an email is received on any of these Email Accounts, an attempt will be made to generate the specified Transaction from its attachments.", + "fieldname": "incoming_email_accounts", + "fieldtype": "Table", + "label": "Incoming Email Accounts", + "mandatory_depends_on": "eval: doc.parse_incoming_emails && !doc.parse_party_emails", + "options": "Transaction Parser Email Account" + }, + { + "depends_on": "eval: doc.parse_incoming_emails", + "description": "Mapping Party based on Email Address", + "fieldname": "party_emails", + "fieldtype": "Table", + "label": "Party Emails", + "options": "Transaction Parser Party Email" + }, + { + "fieldname": "invoice_lookback_count", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Number of Past Invoices to Consider for Item Code Selection", + "reqd": 1 + }, + { + "fieldname": "transaction_configurations_section", + "fieldtype": "Section Break", + "label": "Transaction Configurations" + } + ], + "index_web_pages_for_search": 1, + "issingle": 1, + "links": [], + "modified": "2025-09-08 08:48:58.870032", + "modified_by": "Administrator", + "module": "Transaction Parser", + "name": "Transaction Parser Settings", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "print": 1, + "read": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "sort_field": "modified", + "sort_order": "DESC", + "states": [] } \ No newline at end of file diff --git a/transaction_parser/transaction_parser/overrides/communication.py b/transaction_parser/transaction_parser/overrides/communication.py index 679ae95..7579fe2 100644 --- a/transaction_parser/transaction_parser/overrides/communication.py +++ b/transaction_parser/transaction_parser/overrides/communication.py @@ -98,40 +98,60 @@ def _process_attachments( else: country = frappe.db.get_value("Company", company, "country") - sorted_attachments = sorted( - attachments, - key=lambda attachment: {"xlsx": 0, "csv": 1, "pdf": 2}.get( - attachment.file_url.split(".")[-1].lower(), 3 - ), - ) + supported_extensions = {"pdf", "xlsx", "xls", "csv"} + filtered_attachments = [ + attachment + for attachment in attachments + if attachment.file_url.split(".")[-1].lower() in supported_extensions + ] + + if not filtered_attachments: + return frappe.enqueue( "transaction_parser.transaction_parser.overrides.communication._parse_attachments", doc=doc, country=country, transaction_type=transaction_type, - attachments=sorted_attachments, + attachments=filtered_attachments, ai_model=settings.default_ai_model, user=user, party=party, company=default_company if not company else company, queue="long", + now=True, ) def _parse_attachments( doc, country, transaction_type, attachments, ai_model, user, party, company ): - for attachment in attachments: + settings = frappe.get_cached_doc("Transaction Parser Settings") + + if settings.process_one_document_per_communication: + file_urls = [attachment.file_url for attachment in attachments] _parse( country=country, transaction=transaction_type, - file_url=attachment.file_url, + file_urls=file_urls, ai_model=ai_model, user=user, party=party, company=company, + communication_name=doc.name, ) frappe.db.commit() + else: + for attachment in attachments: + _parse( + country=country, + transaction=transaction_type, + file_urls=attachment.file_url, + ai_model=ai_model, + user=user, + party=party, + company=company, + ) + frappe.db.commit() doc.db_set("is_processed_by_transaction_parser", 1) diff --git a/transaction_parser/transaction_parser/utils/file_processor.py b/transaction_parser/transaction_parser/utils/file_processor.py index 8a3ccd9..72351bc 100644 --- a/transaction_parser/transaction_parser/utils/file_processor.py +++ b/transaction_parser/transaction_parser/utils/file_processor.py @@ -10,17 +10,23 @@ read_xlsx_file_from_attached_file, ) +from transaction_parser.exceptions import FileProcessingError + class FileProcessor: """Process files: PDF (trim pages, apply OCR), CSV/Excel (parse data), extract content.""" def get_content(self, doc, page_limit=None): - if doc.file_type == "PDF": - return self._process_pdf(doc, page_limit) - elif doc.file_type in ["CSV", "XLSX", "XLS"]: - return self._process_spreadsheet(doc) - else: - frappe.throw(_("Only PDF, CSV, and Excel files are supported")) + try: + if doc.file_type == "PDF": + return self._process_pdf(doc, page_limit) + elif doc.file_type in ["CSV", "XLSX", "XLS"]: + return self._process_spreadsheet(doc) + else: + frappe.throw(_("Only PDF, CSV, and Excel files are supported")) + + except Exception as e: + raise FileProcessingError from e def _process_pdf(self, doc, page_limit=None): """Process PDF files with OCR and page limiting.""" diff --git a/transaction_parser/transaction_parser/utils/response_merger.py b/transaction_parser/transaction_parser/utils/response_merger.py new file mode 100644 index 0000000..826750c --- /dev/null +++ b/transaction_parser/transaction_parser/utils/response_merger.py @@ -0,0 +1,234 @@ +from dataclasses import dataclass +from typing import Any + +from frappe import _dict + + +@dataclass +class FieldType: + """Base class for field types in schema.""" + + required: bool + + def is_empty(self, value: Any) -> bool: + return ( + value is None + or value == "" + or (isinstance(value, list | dict) and len(value) == 0) + ) + + +@dataclass +class PrimitiveField(FieldType): + """Represents a primitive field (string, number, date, etc.).""" + + pass + + +@dataclass +class ObjectField(FieldType): + """Represents a nested object field with child fields.""" + + children: dict[str, FieldType] + + +@dataclass +class ListField(FieldType): + """Represents a list/array field.""" + + item_type: FieldType # Type of items in the list + + +class SchemaParser: + """Parses a schema dict into a structured FieldType hierarchy.""" + + def parse(self, schema: dict) -> dict[str, FieldType]: + """Parse schema dict into field name -> FieldType mapping.""" + fields = {} + + for key, value in schema.items(): + fields[key] = self._parse_field(value) + + return fields + + def _parse_field(self, schema_value: Any) -> FieldType: + """Determine and return the appropriate FieldType for a schema value.""" + if isinstance(schema_value, list): + if not schema_value: + return ListField(required=True, item_type=PrimitiveField(required=True)) + + item_schema = schema_value[0] + + if isinstance(item_schema, dict): + item_fields = self.parse(item_schema) + return ListField( + required=True, + item_type=ObjectField(required=True, children=item_fields), + ) + else: + return ListField( + required=True, + item_type=PrimitiveField(required=True), + ) + + elif isinstance(schema_value, dict): + children = self.parse(schema_value) + return ObjectField(required=True, children=children) + + else: + return PrimitiveField(required=True) + + +class ResponseMerger: + """Schema-driven merger for AI responses from multiple attachments.""" + + def __init__( + self, + response: dict, + schema: dict, + match_keys: dict[str, list[str]] | None = None, + ): + self.response = _dict(response) if isinstance(response, dict) else response + self.schema = schema + self.match_keys = match_keys or {} + + parser = SchemaParser() + self.fields = parser.parse(schema) + + def is_complete(self) -> bool: + """Return True if all required fields are filled.""" + return len(self.get_missing_fields()) == 0 + + def get_missing_fields(self) -> list[str]: + """Return dot-separated paths of missing required fields.""" + missing = [] + self._check_missing_fields(self.fields, self.response, "", missing) + return missing + + def _check_missing_fields( + self, + fields: dict[str, FieldType], + data: dict, + path_prefix: str, + missing: list[str], + ) -> None: + for key, field_type in fields.items(): + field_path = f"{path_prefix}.{key}" if path_prefix else key + value = data.get(key) if isinstance(data, dict) else None + + if isinstance(field_type, PrimitiveField): + if field_type.required and field_type.is_empty(value): + missing.append(field_path) + + elif isinstance(field_type, ObjectField): + if field_type.required and field_type.is_empty(value): + missing.append(field_path) + elif value: + self._check_missing_fields( + field_type.children, value, field_path, missing + ) + + # List fields are checked during merging, not at top level + + def merge(self, new_response: dict) -> None: + """Merge new_response into the existing response.""" + self._merge_fields(self.fields, self.response, new_response) + + def _merge_fields( + self, + fields: dict[str, FieldType], + target: dict, + source: dict, + ) -> None: + for key, field_type in fields.items(): + source_value = source.get(key) + + if source_value is None: + continue + + if isinstance(field_type, PrimitiveField): + self._merge_primitive(target, key, source_value) + + elif isinstance(field_type, ObjectField): + self._merge_object(field_type, target, key, source_value) + + elif isinstance(field_type, ListField): + self._merge_list(field_type, target, key, source_value) + + def _merge_primitive(self, target: dict, key: str, source_value: Any) -> None: + if not target.get(key) and source_value: + target[key] = source_value + + def _merge_object( + self, + field_type: ObjectField, + target: dict, + key: str, + source_value: dict, + ) -> None: + if key not in target or target[key] is None: + target[key] = {} + + self._merge_fields(field_type.children, target[key], source_value) + + def _merge_list( + self, + field_type: ListField, + target: dict, + key: str, + source_value: list, + ) -> None: + target_list = target.get(key, []) + + if not target_list: + target[key] = source_value + return + + if not source_value: + return + + if not isinstance(field_type.item_type, ObjectField): + return + + key_fields = self.match_keys.get(key, []) + if not key_fields: + return + + for target_item in target_list: + matched_item = self._find_matching_item( + target_item, source_value, key_fields + ) + if matched_item: + self._merge_fields( + field_type.item_type.children, target_item, matched_item + ) + + def _find_matching_item( + self, + target_item: dict, + source_items: list[dict], + key_fields: list[str], + ) -> dict | None: + # Require at least 2 matching key fields to avoid false positives + for source_item in source_items: + if self._items_match(target_item, source_item, key_fields): + return source_item + + return None + + def _items_match( + self, + item1: dict, + item2: dict, + key_fields: list[str], + ) -> bool: + matches = 0 + + for field in key_fields: + value1 = item1.get(field) + value2 = item2.get(field) + + if value1 and value2 and value1 == value2: + matches += 1 + + return matches >= 2