Skip to content

RFC: Unify duplicated word-to-row, word-grouping, and column-coverage logic into word_utils #21

@longieirl

Description

@longieirl

Problem

Three pieces of logic are duplicated across the extraction layer with no shared implementation. Each duplication causes independent bug-fix surfaces and makes the codebase harder to reason about.


Duplication 1 — Relaxed word-to-row assignment

Both ContentDensityService.analyze_content_density() and TableBoundaryDetector._build_row_from_words() independently implement the same operation: given a list of words at one Y-coordinate, assign each word to a column using a relaxed boundary check (xmin <= x0 < xmax).

ContentDensityService (content_density.py:70–82):

row = dict.fromkeys(columns, "")
for w in word_groups[y_coord]:
    x0 = w["x0"]
    text = w["text"]
    for col, (xmin, xmax) in columns.items():
        if xmin <= x0 < xmax:
            row[col] += text + " "
            break
row = {k: v.strip() for k, v in row.items()}

TableBoundaryDetector._build_row_from_words() (boundary_detector.py:396–406):

row = dict.fromkeys(self.columns, "")
for w in words:
    x0 = w["x0"]
    text = w["text"]
    for col, (xmin, xmax) in self.columns.items():
        if xmin <= x0 < xmax:
            row[col] += text + " "
            break
return {k: v.strip() for k, v in row.items()}

These are character-for-character identical except for variable naming. Note: PDFTableExtractor._extract_rows_from_words() (pdf_extractor.py:310–341) uses a different strategy — strict boundary for the rightmost column, relaxed for others — so it is deliberately distinct and should not be merged here (see RFC #18 for RowBuilder).


Duplication 2 — Word-grouping by Y coordinate

Both TableBoundaryDetector._group_words_by_y() (boundary_detector.py:160–165) and the inline loop in PDFTableExtractor._extract_rows_from_words() (pdf_extractor.py:303–306) group words into dict[float, list[dict]] by rounding w["top"].

TableBoundaryDetector._group_words_by_y():

lines: dict[float, list[dict]] = {}
for w in words:
    if w["top"] >= self.table_top_y:          # applies table_top_y filter
        y_key = round(w["top"], 0)
        lines.setdefault(y_key, []).append(w)
return lines

PDFTableExtractor._extract_rows_from_words() inline:

lines: dict[float, list[dict]] = {}
for w in words:
    y_key = round(w["top"], 0)                 # no filter (words already cropped)
    lines.setdefault(y_key, []).append(w)

The only difference is the table_top_y pre-filter: TableBoundaryDetector filters because it receives uncropped words; PDFTableExtractor doesn't filter because it receives already-cropped words. The grouping expression itself is identical.


Duplication 3 — Column coverage calculation

TableBoundaryDetector._calculate_column_coverage() (boundary_detector.py:408–429):

if not rows:
    return 0.0
total_columns = len(self.columns)
columns_with_data = set()
for row in rows:
    for col_name, value in row.items():
        if value and value.strip():
            columns_with_data.add(col_name)
return len(columns_with_data) / total_columns if total_columns > 0 else 0.0

PageValidationService.calculate_column_coverage() (page_validation.py:116–146):

if not rows or not columns:
    return 0.0
column_names = list(columns.keys())
columns_with_data = set()
for row in rows:
    for col_name in column_names:
        if row.get(col_name, "").strip():
            columns_with_data.add(col_name)
return len(columns_with_data) / len(column_names)

Logic is equivalent. Minor style differences: the detector iterates row.items() while the service iterates column_names (safer against extra keys); the detector accepts self.columns implicitly while the service takes columns as a parameter. The PageValidationService version is the more correct one (explicit columns parameter, guards against extra keys in row).


Proposed Changes

1. Extract assign_words_to_columns() into extraction/word_utils.py

A new module-level function handles the relaxed word-to-row assignment shared by ContentDensityService and TableBoundaryDetector:

# extraction/word_utils.py

def assign_words_to_columns(
    words: list[dict],
    columns: dict[str, tuple[int | float, int | float]],
) -> dict[str, str]:
    """
    Build a row dict from a list of words using relaxed boundary check.

    Each word is assigned to the first column whose x-range contains the
    word's left edge (x0). The rightmost-column strict-containment strategy
    used by the extraction path (RowBuilder / PDFTableExtractor) is
    intentionally NOT applied here.

    Args:
        words: Words at a single Y-coordinate.
        columns: Column name → (x_min, x_max) boundaries.

    Returns:
        Dict mapping column names to stripped, concatenated text.
    """
    row = dict.fromkeys(columns, "")
    for w in words:
        x0 = w["x0"]
        text = w["text"]
        for col, (xmin, xmax) in columns.items():
            if xmin <= x0 < xmax:
                row[col] += text + " "
                break
    return {k: v.strip() for k, v in row.items()}


def group_words_by_y(
    words: list[dict],
    min_y: float | None = None,
) -> dict[float, list[dict]]:
    """
    Group words by rounded Y-coordinate.

    Args:
        words: List of word dicts with a 'top' key.
        min_y: If provided, words with top < min_y are excluded.

    Returns:
        Dict mapping rounded Y-coordinate to list of words at that Y.
    """
    lines: dict[float, list[dict]] = {}
    for w in words:
        if min_y is not None and w["top"] < min_y:
            continue
        y_key = round(w["top"], 0)
        lines.setdefault(y_key, []).append(w)
    return lines

TableBoundaryDetector._build_row_from_words() and ContentDensityService's inline loop become one-liners:

# boundary_detector.py
from bankstatements_core.extraction.word_utils import assign_words_to_columns, group_words_by_y

def _build_row_from_words(self, words):
    return assign_words_to_columns(words, self.columns)

def _group_words_by_y(self, words):
    return group_words_by_y(words, min_y=self.table_top_y)
# content_density.py
from bankstatements_core.extraction.word_utils import assign_words_to_columns

# Inside analyze_content_density loop:
row = assign_words_to_columns(word_groups[y_coord], columns)

PDFTableExtractor._extract_rows_from_words() (after RFC #18, this becomes RowBuilder.build_rows()) continues to use group_words_by_y(words) without a min_y filter (words are already cropped upstream), but the grouping expression is shared.

2. Replace TableBoundaryDetector._calculate_column_coverage() with PageValidationService.calculate_column_coverage()

TableBoundaryDetector._calculate_column_coverage() is a private, standalone method used only in _detect_by_structure_breakdown(). It can delegate to PageValidationService:

# boundary_detector.py
def __init__(self, columns, ..., page_validation_service=None):
    ...
    from bankstatements_core.services.page_validation import PageValidationService
    self._page_validation = page_validation_service or PageValidationService()

def _calculate_column_coverage(self, rows):
    return self._page_validation.calculate_column_coverage(rows, self.columns)

The private method signature changes to remove the columns parameter (it's already self.columns); the delegation call passes it explicitly. This also enables injection for testing.

Alternatively, since calculate_column_coverage is purely functional (no service state used), it can be extracted as a module-level function in word_utils.py or a column_utils.py module and both callers import it directly — no service dependency needed.

Recommended: module-level function, keeps both callers independent of the service layer:

# extraction/word_utils.py  (addition)
def calculate_column_coverage(
    rows: list[dict],
    columns: dict[str, tuple[int | float, int | float]],
) -> float:
    """Return fraction of columns that have at least one non-empty value across rows."""
    if not rows or not columns:
        return 0.0
    column_names = list(columns.keys())
    columns_with_data = {
        col_name
        for row in rows
        for col_name in column_names
        if row.get(col_name, "").strip()
    }
    return len(columns_with_data) / len(column_names)

PageValidationService.calculate_column_coverage() delegates to this function. TableBoundaryDetector._calculate_column_coverage() also delegates. Single implementation, two callers.


Non-Goals

  • Do not unify the strict/rightmost-column boundary logic in PDFTableExtractor / RowBuilder with the relaxed logic here — they are intentionally different.
  • Do not change TableBoundaryDetector._group_words_by_y() behaviour (the min_y filter is preserved as an optional parameter).
  • Do not remove PageValidationService.calculate_column_coverage() from the public API — it remains, delegating internally.

Files Touched

Action File
Create packages/parser-core/src/bankstatements_core/extraction/word_utils.py
Modify packages/parser-core/src/bankstatements_core/extraction/boundary_detector.py — replace _build_row_from_words, _group_words_by_y, _calculate_column_coverage with delegations
Modify packages/parser-core/src/bankstatements_core/services/content_density.py — replace inline word assignment loop with assign_words_to_columns
Modify packages/parser-core/src/bankstatements_core/services/page_validation.py — delegate calculate_column_coverage to the module-level function
Create packages/parser-core/tests/extraction/test_word_utils.py

Testing

# test_word_utils.py
COLS = {"Date": (0, 50), "Details": (50, 200), "Balance": (200, 300)}

def test_assign_words_to_columns_basic():
    words = [{"x0": 10, "text": "01/01"}, {"x0": 60, "text": "Tesco"}]
    row = assign_words_to_columns(words, COLS)
    assert row == {"Date": "01/01", "Details": "Tesco", "Balance": ""}

def test_assign_words_boundary_relaxed():
    # Word starting exactly at column boundary goes to that column
    words = [{"x0": 50, "text": "ABC"}]
    row = assign_words_to_columns(words, COLS)
    assert row["Details"] == "ABC"

def test_group_words_by_y_filters_min_y():
    words = [{"top": 200, "x0": 0, "text": "header"},
             {"top": 310, "x0": 0, "text": "row"}]
    groups = group_words_by_y(words, min_y=300)
    assert 200 not in groups
    assert 310 in groups

def test_calculate_column_coverage():
    rows = [{"Date": "01/01", "Details": "", "Balance": ""},
            {"Date": "",      "Details": "Tesco", "Balance": "100"}]
    assert calculate_column_coverage(rows, COLS) == pytest.approx(1.0)

TableBoundaryDetector and ContentDensityService existing tests continue to pass — behaviour is unchanged.


Acceptance Criteria

  • word_utils.py created with assign_words_to_columns, group_words_by_y, calculate_column_coverage
  • TableBoundaryDetector._build_row_from_words delegates to assign_words_to_columns
  • TableBoundaryDetector._group_words_by_y delegates to group_words_by_y
  • TableBoundaryDetector._calculate_column_coverage delegates to calculate_column_coverage
  • ContentDensityService inline word-assignment loop replaced with assign_words_to_columns
  • PageValidationService.calculate_column_coverage delegates to the shared function
  • Unit tests for all three functions in test_word_utils.py
  • All existing tests pass

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions