Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
85ac5ef
feat(#32): make ScoringConfig injectable in TemplateDetector
web-flow Mar 24, 2026
55b6643
feat(23-01): wire PDFTableExtractor.extract() and facade to return Ex…
web-flow Mar 25, 2026
d925f02
test(23-02): add failing tests for ExtractionResult return from extra…
web-flow Mar 25, 2026
c9a7e92
feat(23-02): wire extract_from_pdf() to return ExtractionResult directly
web-flow Mar 25, 2026
3bfa787
feat(23-02): wire process_all_pdfs() to return list[ExtractionResult]
web-flow Mar 25, 2026
a089d02
feat(23-03): wire processor.py to return list[ExtractionResult]
web-flow Mar 25, 2026
0334de6
test(24-01): add failing tests for word_utils pure functions
web-flow Mar 25, 2026
aecb9d3
feat(24-01): implement word_utils.py with three pure standalone funct…
web-flow Mar 25, 2026
7f0427f
merge(24-01): bring word_utils TDD work into feat/32 branch
web-flow Mar 25, 2026
db0b6a7
merge(24-01): bring word_utils TDD work into worktree-agent-a9eba7e4 …
web-flow Mar 25, 2026
69046c0
feat(24-02): migrate all callers to word_utils, delete private originals
web-flow Mar 25, 2026
feed87a
refactor(24-02): remove private-method test calls, covered by test_wo…
web-flow Mar 25, 2026
69d25c1
merge(24-02): bring caller migration into feat/32 branch
web-flow Mar 25, 2026
3307b34
style: apply black formatting to parser-core
web-flow Mar 25, 2026
d9ade9b
style: fix isort import ordering in parser-core
web-flow Mar 25, 2026
4d60970
fix: remove unused imports and variables flagged by flake8
web-flow Mar 25, 2026
60df9ad
fix: rename loop var to avoid ExtractionResult/dict type collision in…
web-flow Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
RowClassifier,
create_row_classifier_chain,
)
from bankstatements_core.extraction.word_utils import (
assign_words_to_columns,
calculate_column_coverage,
group_words_by_y,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -97,7 +102,8 @@ def detect_boundary(self, words: list[dict]) -> int:
return self.fallback_bottom_y

# Phase 0: Group words and find transaction positions
lines = self._group_words_by_y(words)
filtered_words = [w for w in words if w["top"] >= self.table_top_y]
lines = group_words_by_y(filtered_words)
if not lines:
return self.fallback_bottom_y

Expand Down Expand Up @@ -156,23 +162,6 @@ def detect_boundary(self, words: list[dict]) -> int:
logger.debug("No clear table end detected - using fallback boundary")
return self.fallback_bottom_y

def _group_words_by_y(self, words: list[dict]) -> dict[float, list[dict]]:
"""
Group words by Y-coordinate (rounded).

Args:
words: List of word dictionaries with 'top', 'x0', 'text' keys

Returns:
Dictionary mapping Y-coordinate to list of words at that Y
"""
lines: dict[float, list[dict]] = {}
for w in words:
if w["top"] >= self.table_top_y:
y_key = round(w["top"], 0)
lines.setdefault(y_key, []).append(w)
return lines

def _find_transaction_positions(
self, lines: dict[float, list[dict]], sorted_y_coords: list[float]
) -> tuple[list[float], float | None]:
Expand All @@ -190,7 +179,7 @@ def _find_transaction_positions(
last_transaction_y = None

for y_coord in sorted_y_coords:
row = self._build_row_from_words(lines[y_coord])
row = assign_words_to_columns(lines[y_coord], self.columns)

if any(row.values()):
row_type = self._row_classifier.classify(row, self.columns)
Expand Down Expand Up @@ -278,7 +267,7 @@ def _detect_by_spatial_gaps(
post_gap_transactions = 0

for y_coord in post_gap_y_coords:
row = self._build_row_from_words(lines[y_coord])
row = assign_words_to_columns(lines[y_coord], self.columns)

if (
any(row.values())
Expand Down Expand Up @@ -320,11 +309,11 @@ def _detect_by_structure_breakdown(
if last_transaction_y is not None and y_coord <= last_transaction_y:
continue

row = self._build_row_from_words(lines[y_coord])
row = assign_words_to_columns(lines[y_coord], self.columns)

if any(row.values()):
# Check if this row has any structure (data in expected columns)
column_coverage = self._calculate_column_coverage([row])
column_coverage = calculate_column_coverage([row], self.columns)
if column_coverage < 0.3: # Less than 30% of columns have data
structure_breakdown_count += 1
else:
Expand Down Expand Up @@ -369,7 +358,7 @@ def _detect_by_consecutive_non_transactions(
if last_transaction_y is not None and y_coord <= last_transaction_y:
continue

row = self._build_row_from_words(lines[y_coord])
row = assign_words_to_columns(lines[y_coord], self.columns)

if any(row.values()):
row_type = self._row_classifier.classify(row, self.columns)
Expand All @@ -391,48 +380,3 @@ def _detect_by_consecutive_non_transactions(
)

return None

def _build_row_from_words(self, words: list[dict]) -> dict[str, str]:
"""
Build a row dictionary from words by assigning to columns.

Args:
words: List of words at the same Y-coordinate

Returns:
Dictionary mapping column names to concatenated text
"""
row = dict.fromkeys(self.columns, "")

for w in words:
x0 = w["x0"]
text = w["text"]
for col, (xmin, xmax) in self.columns.items():
if xmin <= x0 < xmax:
row[col] += text + " "
break

return {k: v.strip() for k, v in row.items()}

def _calculate_column_coverage(self, rows: list[dict[str, str]]) -> float:
"""
Calculate what percentage of columns have data in the given rows.

Args:
rows: List of row dictionaries

Returns:
Float between 0.0 and 1.0 representing column coverage
"""
if not rows:
return 0.0

total_columns = len(self.columns)
columns_with_data = set()

for row in rows:
for col_name, value in row.items():
if value and value.strip():
columns_with_data.add(col_name)

return len(columns_with_data) / total_columns if total_columns > 0 else 0.0
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import TYPE_CHECKING

from bankstatements_core.config.column_config import DEFAULT_COLUMNS
from bankstatements_core.domain import ExtractionResult
from bankstatements_core.extraction.extraction_params import TABLE_BOTTOM_Y, TABLE_TOP_Y

if TYPE_CHECKING:
Expand Down Expand Up @@ -70,7 +71,7 @@ def extract_tables_from_pdf(
enable_page_validation: bool | None = None,
enable_header_check: bool | None = None,
template: "BankTemplate" | None = None,
) -> tuple[list[dict], int, str | None]:
) -> ExtractionResult:
"""
Extract table data from PDF within specified bounds (facade function).

Expand All @@ -87,7 +88,8 @@ def extract_tables_from_pdf(
template: Optional BankTemplate to use for extraction configuration

Returns:
Tuple of (extracted rows, number of pages, IBAN if found)
ExtractionResult containing extracted transactions, page count, IBAN,
source file path, and any document-level warnings
"""
from bankstatements_core.extraction.pdf_extractor import PDFTableExtractor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
if TYPE_CHECKING:
from bankstatements_core.domain.protocols.pdf_reader import IPDFReader

from bankstatements_core.domain import ExtractionResult
from bankstatements_core.domain.converters import dicts_to_transactions
from bankstatements_core.extraction.iban_extractor import IBANExtractor
from bankstatements_core.extraction.page_header_analyser import PageHeaderAnalyser
from bankstatements_core.extraction.row_builder import RowBuilder
Expand Down Expand Up @@ -72,14 +74,15 @@ def __init__(
else:
self._pdf_reader = pdf_reader

def extract(self, pdf_path: Path) -> tuple[list[dict], int, str | None]:
def extract(self, pdf_path: Path) -> ExtractionResult:
"""Extract table data from PDF file.

Args:
pdf_path: Path to the PDF file

Returns:
Tuple of (extracted rows, total page count, IBAN if found)
ExtractionResult containing extracted transactions, page count,
IBAN if found, source file path, and any document-level warnings
"""
rows: list[dict] = []
iban = None
Expand All @@ -104,7 +107,13 @@ def extract(self, pdf_path: Path) -> tuple[list[dict], int, str | None]:
f"Credit card statement detected in {pdf_path.name}. "
f"Credit card statements are not currently supported. Skipping file."
)
return [], len(pdf.pages), None
return ExtractionResult(
transactions=[],
page_count=len(pdf.pages),
iban=None,
source_file=pdf_path,
warnings=["credit card statement detected, skipped"],
)

if iban is None and page_num == 1:
iban = self._header_analyser.extract_iban(page)
Expand All @@ -124,7 +133,12 @@ def extract(self, pdf_path: Path) -> tuple[list[dict], int, str | None]:

rows.extend(page_processor.process_page(page_rows))

return rows, len(pdf.pages), iban
return ExtractionResult(
transactions=dicts_to_transactions(rows),
page_count=len(pdf.pages),
iban=iban,
source_file=pdf_path,
)

def _extract_page(self, page: Any, page_num: int) -> list[dict] | None:
"""Extract rows from a single page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
import logging
from typing import TYPE_CHECKING

from bankstatements_core.extraction.word_utils import (
assign_words_to_columns,
group_words_by_y,
)

if TYPE_CHECKING:
from bankstatements_core.extraction.row_classifiers import RowClassifier

Expand All @@ -31,8 +36,6 @@ def __init__(
) -> None:
self._columns = columns
self._row_classifier = row_classifier
self._column_names = list(columns.keys())
self._rightmost_column = self._column_names[-1] if self._column_names else None

def build_rows(self, words: list[dict]) -> list[dict]:
"""Group words by Y position, assign to columns, return transaction/continuation rows.
Expand All @@ -43,35 +46,14 @@ def build_rows(self, words: list[dict]) -> list[dict]:
Returns:
List of row dictionaries classified as 'transaction' or 'continuation'
"""
lines: dict[float, list[dict]] = {}
for w in words:
y_key = round(w["top"], 0)
lines.setdefault(y_key, []).append(w)

lines = group_words_by_y(words)
page_rows = []
for _, line_words in sorted(lines.items()):
row = dict.fromkeys(self._columns, "")

for w in line_words:
x0 = w["x0"]
x1 = w.get("x1", x0 + max(len(w["text"]) * 3, 10))
text = w["text"]

for col, (xmin, xmax) in self._columns.items():
if col == self._rightmost_column:
if xmin <= x0 and x1 <= xmax:
row[col] += text + " "
break
else:
if xmin <= x0 < xmax:
row[col] += text + " "
break

row = {k: v.strip() for k, v in row.items()}

row = assign_words_to_columns(
line_words, self._columns, strict_rightmost=True
)
if any(row.values()):
row_type = self._row_classifier.classify(row, self._columns)
if row_type in ["transaction", "continuation"]:
page_rows.append(row)

return page_rows
Loading
Loading