Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class ExtractionResult:
card_number: Card number extracted from credit card statement header,
or None for bank statements. Set to "unknown" when CC PDF is
detected on paid tier but no card number pattern matches.
statement_year: Year inferred from a document-level date field (e.g.
"Payment Due Date: 3 Mar 2026" → 2026). Used to resolve yearless
transaction dates (e.g. "3 Feb") at sort time. None when the year
could not be determined from the PDF.
"""

transactions: list[Transaction]
Expand All @@ -32,3 +36,4 @@ class ExtractionResult:
source_file: Path
warnings: list[ExtractionWarning] = field(default_factory=list)
card_number: str | None = None
statement_year: int | None = None
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@

_IBAN_HEADER_Y = 350

# Patterns for extracting the statement year from a payment due date field.
# Matches lines like:
# "Payment Due 3 Mar 2026"
# "Payment Due Date: 20 Feb 2026"
_PAYMENT_DUE_PATTERNS = [
r"Payment\s+Due\s+Date\s*[:\s]\s*\d{1,2}\s+\w+\s+(\d{4})",
r"Payment\s+Due\s+\d{1,2}\s+\w+\s+(\d{4})",
]


class PageHeaderAnalyser:
"""Inspects the page header area for credit card indicators and IBAN."""
Expand Down Expand Up @@ -57,6 +66,34 @@ def is_credit_card_statement(self, page: Any, table_top_y: int) -> bool:
logger.warning("Error checking for credit card statement: %s", e)
return False

def extract_statement_year(self, page: Any) -> int | None:
"""Extract the statement year from a 'Payment Due' or 'Payment Due Date' field.

Scans the full page 1 text for patterns like:
- "Payment Due 3 Mar 2026"
- "Payment Due Date: 20 Feb 2026"

Args:
page: pdfplumber page object (page 1 only)

Returns:
Four-digit year as int if found, None otherwise
"""
try:
page_text = page.extract_text()
if page_text:
for pattern in _PAYMENT_DUE_PATTERNS:
match = re.search(pattern, page_text, re.IGNORECASE)
if match:
year = int(match.group(1))
logger.debug(
"Statement year %d extracted from 'Payment Due' field", year
)
return year
except (AttributeError, ValueError, TypeError) as e:
logger.warning("Error extracting statement year from page: %s", e)
return None

def extract_iban(self, page: Any) -> str | None:
"""Extract account IBAN from the page header area (y < 350).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,15 @@ def extract(self, pdf_path: Path) -> ExtractionResult:
rows: list[dict] = []
iban = None
card_number: str | None = None

filename_date = extract_filename_date(pdf_path.name)
page_processor = StatefulPageRowProcessor(
RowPostProcessor(
columns=self.columns,
row_classifier=self._row_classifier,
template=self.template,
filename_date=filename_date,
filename=pdf_path.name,
)
)
statement_year: int | None = None

with self._pdf_reader.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
if page_num == 1 and self._header_analyser.is_credit_card_statement(
page, self.table_top_y
# --- Page 1 pre-scan: gather document-level metadata before processing rows ---
if pdf.pages:
page1 = pdf.pages[0]

if self._header_analyser.is_credit_card_statement(
page1, self.table_top_y
):
if self._entitlements is None or self._entitlements.require_iban:
logger.warning(
Expand All @@ -129,27 +122,40 @@ def extract(self, pdf_path: Path) -> ExtractionResult:
],
)

if iban is None and page_num == 1:
iban = self._header_analyser.extract_iban(page)
if iban:
logger.info(
"IBAN found on page %s: %s****%s",
page_num,
iban[:4],
iban[-4:],
# Paid tier CC: extract card number and statement year up front
extracted = self._extract_card_number(page1)
card_number = extracted if extracted is not None else "unknown"

statement_year = self._header_analyser.extract_statement_year(page1)
if statement_year is None:
logger.warning(
"Could not determine statement year from '%s'. "
"Yearless dates will not sort correctly.",
pdf_path.name,
)

# Extract card number on page 1 for CC statements (paid tier only)
if page_num == 1 and card_number is None:
extracted = self._extract_card_number(page)
if extracted is not None:
card_number = extracted
elif self._header_analyser.is_credit_card_statement(
page, self.table_top_y
):
# CC PDF detected on paid tier but no card pattern matched
card_number = "unknown"
iban = self._header_analyser.extract_iban(page1)
if iban:
logger.info(
"IBAN found on page 1: %s****%s",
iban[:4],
iban[-4:],
)

# Build page processor now that document-level metadata is known
filename_date = extract_filename_date(pdf_path.name)
page_processor = StatefulPageRowProcessor(
RowPostProcessor(
columns=self.columns,
row_classifier=self._row_classifier,
template=self.template,
filename_date=filename_date,
filename=pdf_path.name,
statement_year=statement_year,
)
)

for page_num, page in enumerate(pdf.pages, 1):
page_rows = self._extract_page(page, page_num)
if page_rows is None:
continue
Expand All @@ -168,6 +174,7 @@ def extract(self, pdf_path: Path) -> ExtractionResult:
iban=iban,
source_file=pdf_path,
card_number=card_number,
statement_year=statement_year,
)

def _extract_card_number(self, page: Any) -> str | None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,34 @@ def _do_classify(
return None


class RefContinuationClassifier(RowClassifier):
"""Classifies reference-number continuation lines (e.g. AIB CC 'Ref: 123456').

AIB Credit Card PDFs split each transaction across two physical lines:
- Line 1: Transaction Date | Posting Date | Details | Amount
- Line 2: Reference number only (e.g. "Ref: 123456") with the same date
repeated in the Transaction Date column

Without this classifier, Line 2 is misclassified as a 'transaction' by
TransactionClassifier (it has a date) and emitted as an empty phantom row.
This classifier catches it before TransactionClassifier runs and marks it
as 'continuation' so RowMergerService merges it into the parent transaction.
"""

_REF_PATTERN = re.compile(r"^Ref\s*:\s*\d+", re.IGNORECASE)

def _do_classify(
self, row: dict, columns: dict[str, tuple[int | float, int | float]]
) -> str | None:
"""Detect reference-number continuation lines."""
description_text = self._get_description_text(row, columns)

if self._REF_PATTERN.match(description_text):
return "continuation"

return None


class FXContinuationClassifier(RowClassifier):
"""Classifies foreign exchange continuation lines."""

Expand Down Expand Up @@ -401,10 +429,11 @@ def create_row_classifier_chain() -> RowClassifier:
0 HeaderMetadataClassifier — column headers and field labels
1 AdministrativeClassifier — BALANCE FORWARD, Lending @
2 ReferenceCodeClassifier — IE123456 patterns
3 FXContinuationClassifier — FX rates, fees, exchange lines
4 TimestampMetadataClassifier — 01JAN2023 TIME 14:30
5 TransactionClassifier — debit/credit/date combinations
6 DefaultMetadataClassifier — catch-all fallback
3 RefContinuationClassifier — Ref: 123456 continuation lines (AIB CC)
4 FXContinuationClassifier — FX rates, fees, exchange lines
5 TimestampMetadataClassifier — 01JAN2023 TIME 14:30
6 TransactionClassifier — debit/credit/date combinations
7 DefaultMetadataClassifier — catch-all fallback

Returns:
The head of the classifier chain
Expand All @@ -414,9 +443,10 @@ def create_row_classifier_chain() -> RowClassifier:
(0, HeaderMetadataClassifier),
(1, AdministrativeClassifier),
(2, ReferenceCodeClassifier),
(3, FXContinuationClassifier),
(4, TimestampMetadataClassifier),
(5, TransactionClassifier),
(6, DefaultMetadataClassifier),
(3, RefContinuationClassifier),
(4, FXContinuationClassifier),
(5, TimestampMetadataClassifier),
(6, TransactionClassifier),
(7, DefaultMetadataClassifier),
]
).build_chain()
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__( # noqa: PLR0913
filename_date: str,
filename: str,
scoring_config: ExtractionScoringConfig | None = None,
statement_year: int | None = None,
) -> None:
self._columns = columns
self._row_classifier = row_classifier
Expand All @@ -69,6 +70,7 @@ def __init__( # noqa: PLR0913
if scoring_config is not None
else ExtractionScoringConfig.default()
)
self._statement_year = statement_year
self._date_col = ColumnTypeIdentifier.find_first_column_of_type(columns, "date")
self._balance_col = ColumnTypeIdentifier.find_first_column_of_type(
columns, "balance"
Expand Down Expand Up @@ -147,6 +149,8 @@ def process(self, row: dict, current_date: str) -> str:

# Metadata tagging
row["Filename"] = self._filename
if self._statement_year is not None:
row["statement_year"] = str(self._statement_year)
if self._template:
row["document_type"] = self._template.document_type
row["template_id"] = self._template.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ class DateParserService:
"%d%B%Y", # 01December2023
]

def parse_transaction_date(self, date_str: str) -> datetime:
# Yearless date formats used by CC statements (e.g. AIB CC: "3 Feb")
YEARLESS_DATE_FORMATS = [ # noqa: RUF012
"%d %b", # 3 Feb
"%d %B", # 3 February
]

def parse_transaction_date(
self, date_str: str, hint_year: int | None = None
) -> datetime:
"""
Parse bank statement date string into datetime object.

Expand All @@ -59,9 +67,13 @@ def parse_transaction_date(self, date_str: str) -> datetime:
- DDMMMYY (e.g., "01DEC23")
- DDMMMYYYY (e.g., "01DEC2023")
- Partial dates: "DD/MM" (missing year)
- Yearless word dates: "3 Feb", "3 February" (requires hint_year)

Args:
date_str: Date string from bank statement
hint_year: Optional year inferred from a document-level field (e.g.
"Payment Due Date"). Used to resolve yearless dates like "3 Feb".
When None and the date is yearless, epoch is returned.

Returns:
datetime object, or epoch (1970-01-01) if unparseable
Expand All @@ -78,6 +90,8 @@ def parse_transaction_date(self, date_str: str) -> datetime:
datetime.datetime(2025, 4, 25, 0, 0)
>>> service.parse_transaction_date("")
datetime.datetime(1970, 1, 1, 0, 0)
>>> service.parse_transaction_date("3 Feb", hint_year=2026)
datetime.datetime(2026, 2, 3, 0, 0)
"""
# Handle empty or whitespace-only strings
if not date_str or not date_str.strip():
Expand All @@ -89,11 +103,16 @@ def parse_transaction_date(self, date_str: str) -> datetime:
# "Sept" -> "Sep" (Python's datetime uses 3-letter abbreviations)
date_str = date_str.replace("Sept", "Sep")

# Try common date formats
# Try common date formats (all include a year component)
parsed_date = self._parse_common_date_formats(date_str)
if parsed_date is not None:
return parsed_date

# Try yearless formats (e.g. "3 Feb" from CC statements)
parsed_date = self._parse_yearless_date(date_str, hint_year)
if parsed_date is not None:
return parsed_date

# Try partial date parsing (DD/MM without year)
parsed_date = self._parse_partial_date(date_str)
if parsed_date is not None:
Expand Down Expand Up @@ -200,6 +219,46 @@ def _parse_common_date_formats(self, date_str: str) -> datetime | None:

return None

def _parse_yearless_date(
self, date_str: str, hint_year: int | None
) -> datetime | None:
"""
Parse yearless date strings like "3 Feb" or "3 February".

Uses hint_year (from a document-level field such as "Payment Due Date")
to supply the missing year. Returns None when no hint_year is available
so the caller can fall through to epoch and log a warning.

Args:
date_str: Date string without a year component
hint_year: Year to substitute, or None if unknown

Returns:
Parsed datetime if format matches and hint_year is provided, None otherwise

Examples:
>>> service = DateParserService()
>>> service._parse_yearless_date("3 Feb", 2026)
datetime.datetime(2026, 2, 3, 0, 0)
>>> service._parse_yearless_date("3 February", 2026)
datetime.datetime(2026, 2, 3, 0, 0)
>>> service._parse_yearless_date("3 Feb", None)
None
"""
if hint_year is None:
return None

for fmt in self.YEARLESS_DATE_FORMATS:
# Append a fixed year so strptime never parses a year-free date
# (avoids Python 3.15 deprecation of yearless strptime).
augmented = f"{date_str} 1900"
augmented_fmt = f"{fmt} %Y"
parsed = self._try_parse_date_format(augmented, augmented_fmt)
if parsed is not None:
return parsed.replace(year=hint_year)

return None

def _parse_partial_date(self, date_str: str) -> datetime | None:
"""
Parse partial date strings like "DD/MM" or "DD-MM" (missing year).
Expand Down
Loading