Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ output/
**/output/

# Logs
logs/*.log
logs/processing_activity.jsonl
logs/
**/logs/
*.log

# Development Artifacts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,16 @@ def build(self) -> "BankStatementProcessor":
config.extraction.enable_dynamic_boundary,
)

from bankstatements_core.services.service_registry import ServiceRegistry

registry = ServiceRegistry.from_config(config, entitlements=self._entitlements)

return BankStatementProcessor(
config=config,
output_strategies=self._output_strategies,
duplicate_strategy=self._duplicate_strategy,
repository=self._repository,
activity_log=self._activity_log,
entitlements=self._entitlements,
registry=registry,
)
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,16 @@ def create_custom(
),
)

from bankstatements_core.services.service_registry import ServiceRegistry

registry = ServiceRegistry.from_config(config, entitlements=entitlements)

processor = BankStatementProcessor(
config=config,
output_strategies=output_strategies,
duplicate_strategy=duplicate_strategy,
entitlements=entitlements,
registry=registry,
)

return processor
25 changes: 18 additions & 7 deletions packages/parser-core/src/bankstatements_core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from bankstatements_core.services.pdf_processing_orchestrator import (
PDFProcessingOrchestrator,
)
from bankstatements_core.services.service_registry import ServiceRegistry
from bankstatements_core.services.sorting_service import (
ChronologicalSortingStrategy,
NoSortingStrategy,
Expand Down Expand Up @@ -122,6 +123,7 @@ def __init__(
activity_log: Any | None = None,
entitlements: Any | None = None,
template_registry: Any | None = None,
registry: ServiceRegistry | None = None,
):
"""
Initialize the bank statement processor.
Expand Down Expand Up @@ -250,6 +252,17 @@ def __init__(
sorting_service=self._sorting_service,
)

# ServiceRegistry: single wiring point for transaction processing
if registry is not None:
self._registry = registry
else:
self._registry = ServiceRegistry.from_config(
config,
entitlements=entitlements,
duplicate_detector=self._duplicate_service,
sorting_service=self._sorting_service,
)

self._output_orchestrator = OutputOrchestrator(
output_dir=self.output_dir,
output_strategies=self.output_strategies,
Expand Down Expand Up @@ -322,8 +335,8 @@ def run(self) -> dict:
pdf_ibans[extraction.source_file.name] = extraction.iban
all_rows.extend(transactions_to_dicts(extraction.transactions))

# Step 2: Group transactions by IBAN (delegated to orchestrator)
rows_by_iban = self._transaction_orchestrator.group_by_iban(all_rows, pdf_ibans)
# Step 2: Group transactions by IBAN (delegated to registry)
rows_by_iban = self._registry.group_by_iban(all_rows, pdf_ibans)
logger.debug(
f"Grouped {len(all_rows)} transactions into {len(rows_by_iban)} IBAN groups"
)
Expand Down Expand Up @@ -402,11 +415,9 @@ def _process_transaction_group(
f"Using template '{template_id}' for transaction type classification"
)

# Detect duplicates and sort (delegated to orchestrator)
unique_rows, duplicate_rows = (
self._transaction_orchestrator.process_transaction_group(
iban_rows, template=template
)
# Detect duplicates and sort (delegated to registry)
unique_rows, duplicate_rows = self._registry.process_transaction_group(
iban_rows, template=template
)

# Filter duplicates to remove any empty rows and header rows
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
"""ServiceRegistry — single wiring point for transaction processing services.

Centralises construction of duplicate detection, sorting, IBAN grouping, and
the enrichment/classification pipeline that was previously spread across
TransactionProcessingOrchestrator and BankStatementProcessor.

Usage (primary path)::

registry = ServiceRegistry.from_config(processor_config, entitlements)
unique, dupes = registry.process_transaction_group(rows, template)
grouped = registry.group_by_iban(rows, pdf_ibans)

Escape hatches are available for callers that need individual services::

detector = registry.get_duplicate_detector()
"""

from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from bankstatements_core.config.processor_config import ProcessorConfig
from bankstatements_core.domain.protocols.services import (
IDuplicateDetector,
IIBANGrouping,
ITransactionSorting,
)
from bankstatements_core.entitlements import Entitlements
from bankstatements_core.templates.template_model import BankTemplate

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class _ServiceContext:
"""Shared dependencies passed once to ServiceRegistry at construction time.

This is an internal dataclass — never exposed to callers.
"""

column_names: list[str]
debit_columns: list[str]
credit_columns: list[str]
entitlements: Any # Entitlements | None


class ServiceRegistry:
"""Single wiring point for all transaction processing services.

Callers use the primary methods for the common case.
Individual services are accessible via get_*() escape hatches for tests
or specialised callers.
"""

def __init__(
self,
context: _ServiceContext,
duplicate_detector: "IDuplicateDetector",
sorting_service: "ITransactionSorting",
grouping_service: "IIBANGrouping",
) -> None:
self._context = context
self._duplicate_detector = duplicate_detector
self._sorting_service = sorting_service
self._grouping_service = grouping_service

# ------------------------------------------------------------------
# Factory
# ------------------------------------------------------------------

@classmethod
def from_config(
cls,
config: "ProcessorConfig",
entitlements: "Entitlements | None" = None,
duplicate_detector: "IDuplicateDetector | None" = None,
sorting_service: "ITransactionSorting | None" = None,
grouping_service: "IIBANGrouping | None" = None,
) -> "ServiceRegistry":
"""Build a ServiceRegistry from a ProcessorConfig.

Args:
config: Processor configuration carrying column, sorting, and
processing settings.
entitlements: Optional tier-based entitlements.
duplicate_detector: Override duplicate detector (default: AllFields).
sorting_service: Override sorting service (default: chronological
if config.processing.sort_by_date, else no-sort).
grouping_service: Override IBAN grouping service (default: suffix-4).

Returns:
Fully wired ServiceRegistry instance.
"""
from bankstatements_core.config.column_config import get_column_names
from bankstatements_core.patterns.strategies import AllFieldsDuplicateStrategy
from bankstatements_core.processor import find_matching_columns
from bankstatements_core.services.duplicate_detector import (
DuplicateDetectionService,
)
from bankstatements_core.services.iban_grouping import IBANGroupingService
from bankstatements_core.services.sorting_service import (
ChronologicalSortingStrategy,
NoSortingStrategy,
TransactionSortingService,
)

column_names = (
get_column_names(config.extraction.columns)
if config.extraction.columns
else []
)
debit_columns = find_matching_columns(column_names, ["debit"])
credit_columns = find_matching_columns(column_names, ["credit"])

context = _ServiceContext(
column_names=column_names,
debit_columns=debit_columns,
credit_columns=credit_columns,
entitlements=entitlements,
)

if duplicate_detector is None:
duplicate_detector = DuplicateDetectionService(AllFieldsDuplicateStrategy())

if sorting_service is None:
sort_strategy = (
ChronologicalSortingStrategy()
if config.processing.sort_by_date
else NoSortingStrategy()
)
sorting_service = TransactionSortingService(sort_strategy)

if grouping_service is None:
grouping_service = IBANGroupingService()

return cls(context, duplicate_detector, sorting_service, grouping_service)

# ------------------------------------------------------------------
# Primary methods (80 % case)
# ------------------------------------------------------------------

def process_transaction_group(
self,
transactions: list[dict],
template: "BankTemplate | None" = None,
) -> tuple[list[dict], list[dict]]:
"""Enrich → classify → deduplicate → sort a group of transactions.

This replaces the explicit five-call chain that was previously spread
across BankStatementProcessor and TransactionProcessingOrchestrator.

Args:
transactions: List of transaction dicts for a single IBAN group.
template: Optional bank template used for transaction type keywords.

Returns:
Tuple of (unique_transactions, duplicate_transactions).
"""
enriched = self._enrich_with_filename(transactions)
enriched = self._enrich_with_document_type(enriched)
enriched = self._classify_transaction_types(enriched, template)

unique_rows, duplicate_rows = self._duplicate_detector.detect_and_separate(
enriched
)
logger.info(
"Duplicate detection: %d unique, %d duplicates",
len(unique_rows),
len(duplicate_rows),
)

sorted_rows = self._sorting_service.sort(unique_rows)
return sorted_rows, duplicate_rows

def group_by_iban(
self,
transactions: list[dict],
pdf_ibans: dict[str, str],
) -> dict[str, list[dict]]:
"""Group transactions by IBAN suffix.

Args:
transactions: Flat list of all transaction dicts.
pdf_ibans: Mapping of PDF filename → IBAN string.

Returns:
Dict of IBAN suffix → list of transaction dicts.
"""
return self._grouping_service.group_by_iban(transactions, pdf_ibans)

# ------------------------------------------------------------------
# Escape hatches (20 % case)
# ------------------------------------------------------------------

def get_duplicate_detector(self) -> "IDuplicateDetector":
return self._duplicate_detector

def get_sorting_service(self) -> "ITransactionSorting":
return self._sorting_service

def get_grouping_service(self) -> "IIBANGrouping":
return self._grouping_service

# ------------------------------------------------------------------
# Internal enrichment helpers (inlined from TransactionProcessingOrchestrator)
# ------------------------------------------------------------------

@staticmethod
def _enrich_with_filename(transactions: list[dict]) -> list[dict]:
"""Set Filename key from source_pdf if not already present."""
for row in transactions:
if "Filename" not in row:
row["Filename"] = row.get("source_pdf", "")
return transactions

@staticmethod
def _enrich_with_document_type(
transactions: list[dict], default_type: str = "bank_statement"
) -> list[dict]:
"""Set document_type if not already present."""
for row in transactions:
if "document_type" not in row:
row["document_type"] = default_type
return transactions

@staticmethod
def _classify_transaction_types(
transactions: list[dict],
template: "BankTemplate | None" = None,
) -> list[dict]:
"""Classify each transaction using Chain of Responsibility."""
from bankstatements_core.services.transaction_type_classifier import (
create_transaction_type_classifier_chain,
)

if not transactions:
return transactions

document_type = transactions[0].get("document_type")
classifier = create_transaction_type_classifier_chain(document_type)

for transaction in transactions:
transaction["transaction_type"] = classifier.classify(transaction, template)

logger.info(
"Transaction type classification: %d transactions classified",
len(transactions),
)
return transactions
Loading
Loading