Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions transaction_parser/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import frappe


class FileProcessingError(frappe.ValidationError):
"""Custom exception for file processing errors."""
84 changes: 67 additions & 17 deletions transaction_parser/transaction_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,54 +34,104 @@ def parse(transaction, country, file_url, ai_model=None, page_limit=None):
def _parse(
country,
transaction,
file_url,
file_urls,
ai_model=None,
page_limit=None,
user=None,
party=None,
company=None,
communication_name=None,
):
try:
file = None
filename = file_url.split("/")[-1]
if (
isinstance(file_urls, str)
and file_urls.startswith("[")
and file_urls.endswith("]")
):
file_urls = frappe.parse_json(file_urls)

elif isinstance(file_urls, str):
file_urls = [file_urls]

file_names = frappe.get_list(
"File",
filters={"file_url": ("in", file_urls)},
fields=["name", "file_type"],
order_by="creation desc",
group_by="file_url",
)

# xlsx/xls first, then pdf, then csv. If no xlsx/xls, csv takes its place.
file_types = {(f.file_type or "").lower() for f in file_names}
has_spreadsheet = file_types & {"xlsx", "xls"}

if has_spreadsheet:
FILE_TYPE_PRIORITY = {"xlsx": 0, "xls": 0, "pdf": 1, "csv": 2}
else:
FILE_TYPE_PRIORITY = {"csv": 0, "pdf": 1}

file = frappe.get_last_doc("File", filters={"file_url": file_url})
filename = file.file_name
file_names.sort(
key=lambda f: FILE_TYPE_PRIORITY.get((f.file_type or "").lower(), 99)
)

files = []
for file_name in file_names:
file = frappe.get_doc("File", file_name)
files.append(file)

controller = get_controller(country, transaction)(party=party, company=company)
doc = controller.generate(file, ai_model, page_limit)
doc = controller.generate(files, ai_model, page_limit)

filenames = (
", ".join([f.file_name for f in files])
if len(files) > 1
else files[0].file_name
)
notification = {
"document_type": TRANSACTION_MAP[transaction],
"document_name": doc.name,
"subject": _("{0} {1} generated from {2}").format(
_(TRANSACTION_MAP[transaction]),
doc.name,
filename,
filenames,
),
}

except Exception as e:
notification = None
reference_doctype = "Communication" if communication_name else "File"
reference_docname = (
communication_name
if communication_name
else (files[0].name if files else None)
)

if (
isinstance(e, frappe.DuplicateEntryError)
and frappe.flags.skip_duplicate_error
):
subject = _("Duplicate {0} found for {1}").format(
_(TRANSACTION_MAP[transaction]),
f"{reference_doctype} {reference_docname}",
)

notification = {
"document_type": "File",
"document_name": file.name if file else filename,
"subject": _("Duplicate entry found for {0}").format(filename),
"document_type": reference_doctype,
"document_name": reference_docname,
"subject": subject,
"message": str(e),
}
return

error_log = frappe.log_error(
"Transaction Parser API Error",
reference_doctype="File",
reference_name=file.name if file else filename,
if not (error_log := getattr(e, "error_log", None)):
error_log = frappe.log_error(
"Transaction Parser Error",
reference_doctype=reference_doctype,
reference_name=reference_docname,
)

message = _("Failed to generate {0} from {1}").format(
TRANSACTION_MAP[transaction], f"{reference_doctype} {reference_docname}"
)
message = _("Failed to generate {0} from {1}").format(_(transaction), filename)

notification = {
"document_type": error_log.doctype,
Expand All @@ -90,7 +140,7 @@ def _parse(
"message": str(e),
}

email_failure(user, message, str(e), file_url)
email_failure(user, message, str(e), file_urls)

finally:
if notification:
Expand Down
33 changes: 23 additions & 10 deletions transaction_parser/transaction_parser/ai_integration/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ def parse(
file_doc_name: str | None = None,
) -> dict:
messages = self._build_messages(document_type, document_schema, document_data)
response = self.send_message(messages=messages, file_doc_name=file_doc_name)

response = self.send_message(
messages=messages,
file_doc_name=file_doc_name,
)

return self.get_content(response)

def _build_messages(
self, document_type: str, document_schema: dict, document_data: str
self,
document_type: str,
document_schema: dict,
document_data: str,
) -> tuple:
"""Build the message structure for AI API call."""
system_prompt = get_system_prompt(document_schema)
Expand All @@ -58,7 +66,11 @@ def _build_messages(
},
)

def send_message(self, messages: tuple, file_doc_name: str | None = None) -> dict:
def send_message(
self,
messages: tuple,
file_doc_name: str | None = None,
) -> dict:
"""Send messages to AI API and handle the response."""
log = self._create_log_entry(file_doc_name)

Expand All @@ -84,13 +96,14 @@ def send_message(self, messages: tuple, file_doc_name: str | None = None) -> dic
def _create_log_entry(self, file_doc_name: str | None) -> frappe._dict:
"""Create a log entry for the API call."""
log = frappe._dict(url=self.model.base_url)
if file_doc_name:
log.update(
{
"reference_doctype": "File",
"reference_name": file_doc_name,
}
)

log.update(
{
"reference_doctype": "File",
"reference_name": file_doc_name,
}
)

return log

def _make_api_call(self, messages: tuple) -> Any:
Expand Down
97 changes: 83 additions & 14 deletions transaction_parser/transaction_parser/controllers/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
import frappe
from erpnext.setup.utils import get_exchange_rate
from erpnext.stock.get_item_details import get_item_details
from httpx import HTTPError
from rapidfuzz import fuzz, process

from transaction_parser.exceptions import FileProcessingError
from transaction_parser.transaction_parser.ai_integration.parser import AIParser
from transaction_parser.transaction_parser.utils import to_dict
from transaction_parser.transaction_parser.utils.file_processor import FileProcessor
from transaction_parser.transaction_parser.utils.response_merger import (
ResponseMerger,
)


class Transaction:
Expand All @@ -29,11 +34,17 @@ def __init__(
self.company = company

def generate(
self, file, ai_model: str | None = None, page_limit: int | None = None
self,
files,
ai_model: str | None = None,
page_limit: int | None = None,
):
self.initialize()

self.file = file
if isinstance(files, str):
files = [files]

self.files = files
self.ai_model = ai_model
self.data = self._parse_file_content(ai_model, page_limit)
self.doc = frappe.get_doc({"doctype": self.DOCTYPE})
Expand All @@ -49,7 +60,7 @@ def generate(

def initialize(self) -> None:
# file processing
self.file = None
self.files = None

# output schema
self.schema = None
Expand All @@ -72,20 +83,75 @@ def initialize(self) -> None:
def _parse_file_content(
self, ai_model: str | None = None, page_limit: int | None = None
) -> dict:
content = FileProcessor().get_content(self.file, page_limit)
schema = self.get_schema()

return AIParser(ai_model, self.settings).parse(
document_type=self.DOCTYPE,
document_schema=schema,
document_data=content,
file_doc_name=self.file.name,
if len(self.files) > 1:
return self._parse_multiple_files(ai_model, page_limit)

return self._parse_single_file(self.files[0], ai_model, page_limit)

def _parse_single_file(
self,
file,
ai_model: str | None = None,
page_limit: int | None = None,
) -> dict:
try:
content = FileProcessor().get_content(file, page_limit)
schema = self.get_schema()

return AIParser(ai_model, self.settings).parse(
document_type=self.DOCTYPE,
document_schema=schema,
document_data=content,
file_doc_name=file.name,
)

except FileProcessingError as e:
error_log = frappe.log_error(
title="File processing error in Transaction Parser",
reference_doctype="File",
reference_name=file.name,
)
e.error_log = error_log
raise e

except HTTPError as e:
error_log = frappe.log_error(
title="Transaction Parser API error",
reference_doctype="File",
reference_name=file.name,
)
e.error_log = error_log
raise e

def _parse_multiple_files(
self, ai_model: str | None = None, page_limit: int | None = None
) -> dict:
response = self._parse_single_file(self.files[0], ai_model, page_limit)
merger = ResponseMerger(
response,
schema=self.get_schema(),
match_keys=self.get_match_keys(),
)

for file in self.files[1:]:
if merger.is_complete():
break

new_response = self._parse_single_file(file, ai_model, page_limit)
merger.merge(new_response)

return merger.response

###################################
########## Output Schema ##########
###################################

def get_match_keys(self) -> dict[str, list[str]]:
"""Return list field name -> key fields used to match items during merge."""
return {
"item_list": ["party_item_code", "quantity", "rate", "description"],
}

def get_schema(self) -> dict:
if not self.schema:
self.schema = self._get_schema()
Expand Down Expand Up @@ -271,9 +337,12 @@ def _set_flags(self) -> None:
self.doc.flags.ignore_links = True

def _attach_file(self) -> None:
self.file.attached_to_doctype = self.DOCTYPE
self.file.attached_to_name = self.doc.name
self.file.save()
files_to_attach = self.files if isinstance(self.files, list) else [self.files]

for file_doc in files_to_attach:
file_doc.attached_to_doctype = self.DOCTYPE
file_doc.attached_to_name = self.doc.name
file_doc.save()

def set_exchange_rate(self, from_currency, date, args):
company_currency = erpnext.get_company_currency(self.doc.company)
Expand Down
Loading
Loading