Skip to content

Commit 8eec3ed

Browse files
authored
Merge pull request #44 from longieirl/feat/28-service-registry
feat(#28): add ServiceRegistry — centralise enrichment/classify/dedup/sort pipeline
2 parents 1c0dd48 + 3ca3969 commit 8eec3ed

9 files changed

Lines changed: 445 additions & 423 deletions

File tree

.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ output/
121121
**/output/
122122

123123
# Logs
124-
logs/*.log
125-
logs/processing_activity.jsonl
124+
logs/
125+
**/logs/
126126
*.log
127127

128128
# Development Artifacts

packages/parser-core/src/bankstatements_core/builders/processor_builder.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,16 @@ def build(self) -> "BankStatementProcessor":
342342
config.extraction.enable_dynamic_boundary,
343343
)
344344

345+
from bankstatements_core.services.service_registry import ServiceRegistry
346+
347+
registry = ServiceRegistry.from_config(config, entitlements=self._entitlements)
348+
345349
return BankStatementProcessor(
346350
config=config,
347351
output_strategies=self._output_strategies,
348352
duplicate_strategy=self._duplicate_strategy,
349353
repository=self._repository,
350354
activity_log=self._activity_log,
351355
entitlements=self._entitlements,
356+
registry=registry,
352357
)

packages/parser-core/src/bankstatements_core/patterns/factories.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,16 @@ def create_custom(
210210
),
211211
)
212212

213+
from bankstatements_core.services.service_registry import ServiceRegistry
214+
215+
registry = ServiceRegistry.from_config(config, entitlements=entitlements)
216+
213217
processor = BankStatementProcessor(
214218
config=config,
215219
output_strategies=output_strategies,
216220
duplicate_strategy=duplicate_strategy,
217221
entitlements=entitlements,
222+
registry=registry,
218223
)
219224

220225
return processor

packages/parser-core/src/bankstatements_core/processor.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from bankstatements_core.services.pdf_processing_orchestrator import (
2323
PDFProcessingOrchestrator,
2424
)
25+
from bankstatements_core.services.service_registry import ServiceRegistry
2526
from bankstatements_core.services.sorting_service import (
2627
ChronologicalSortingStrategy,
2728
NoSortingStrategy,
@@ -122,6 +123,7 @@ def __init__(
122123
activity_log: Any | None = None,
123124
entitlements: Any | None = None,
124125
template_registry: Any | None = None,
126+
registry: ServiceRegistry | None = None,
125127
):
126128
"""
127129
Initialize the bank statement processor.
@@ -250,6 +252,17 @@ def __init__(
250252
sorting_service=self._sorting_service,
251253
)
252254

255+
# ServiceRegistry: single wiring point for transaction processing
256+
if registry is not None:
257+
self._registry = registry
258+
else:
259+
self._registry = ServiceRegistry.from_config(
260+
config,
261+
entitlements=entitlements,
262+
duplicate_detector=self._duplicate_service,
263+
sorting_service=self._sorting_service,
264+
)
265+
253266
self._output_orchestrator = OutputOrchestrator(
254267
output_dir=self.output_dir,
255268
output_strategies=self.output_strategies,
@@ -322,8 +335,8 @@ def run(self) -> dict:
322335
pdf_ibans[extraction.source_file.name] = extraction.iban
323336
all_rows.extend(transactions_to_dicts(extraction.transactions))
324337

325-
# Step 2: Group transactions by IBAN (delegated to orchestrator)
326-
rows_by_iban = self._transaction_orchestrator.group_by_iban(all_rows, pdf_ibans)
338+
# Step 2: Group transactions by IBAN (delegated to registry)
339+
rows_by_iban = self._registry.group_by_iban(all_rows, pdf_ibans)
327340
logger.debug(
328341
f"Grouped {len(all_rows)} transactions into {len(rows_by_iban)} IBAN groups"
329342
)
@@ -402,11 +415,9 @@ def _process_transaction_group(
402415
f"Using template '{template_id}' for transaction type classification"
403416
)
404417

405-
# Detect duplicates and sort (delegated to orchestrator)
406-
unique_rows, duplicate_rows = (
407-
self._transaction_orchestrator.process_transaction_group(
408-
iban_rows, template=template
409-
)
418+
# Detect duplicates and sort (delegated to registry)
419+
unique_rows, duplicate_rows = self._registry.process_transaction_group(
420+
iban_rows, template=template
410421
)
411422

412423
# Filter duplicates to remove any empty rows and header rows
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
"""ServiceRegistry — single wiring point for transaction processing services.
2+
3+
Centralises construction of duplicate detection, sorting, IBAN grouping, and
4+
the enrichment/classification pipeline that was previously spread across
5+
TransactionProcessingOrchestrator and BankStatementProcessor.
6+
7+
Usage (primary path)::
8+
9+
registry = ServiceRegistry.from_config(processor_config, entitlements)
10+
unique, dupes = registry.process_transaction_group(rows, template)
11+
grouped = registry.group_by_iban(rows, pdf_ibans)
12+
13+
Escape hatches are available for callers that need individual services::
14+
15+
detector = registry.get_duplicate_detector()
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import logging
21+
from dataclasses import dataclass
22+
from typing import TYPE_CHECKING, Any
23+
24+
if TYPE_CHECKING:
25+
from bankstatements_core.config.processor_config import ProcessorConfig
26+
from bankstatements_core.domain.protocols.services import (
27+
IDuplicateDetector,
28+
IIBANGrouping,
29+
ITransactionSorting,
30+
)
31+
from bankstatements_core.entitlements import Entitlements
32+
from bankstatements_core.templates.template_model import BankTemplate
33+
34+
logger = logging.getLogger(__name__)
35+
36+
37+
@dataclass(frozen=True)
38+
class _ServiceContext:
39+
"""Shared dependencies passed once to ServiceRegistry at construction time.
40+
41+
This is an internal dataclass — never exposed to callers.
42+
"""
43+
44+
column_names: list[str]
45+
debit_columns: list[str]
46+
credit_columns: list[str]
47+
entitlements: Any # Entitlements | None
48+
49+
50+
class ServiceRegistry:
51+
"""Single wiring point for all transaction processing services.
52+
53+
Callers use the primary methods for the common case.
54+
Individual services are accessible via get_*() escape hatches for tests
55+
or specialised callers.
56+
"""
57+
58+
def __init__(
59+
self,
60+
context: _ServiceContext,
61+
duplicate_detector: "IDuplicateDetector",
62+
sorting_service: "ITransactionSorting",
63+
grouping_service: "IIBANGrouping",
64+
) -> None:
65+
self._context = context
66+
self._duplicate_detector = duplicate_detector
67+
self._sorting_service = sorting_service
68+
self._grouping_service = grouping_service
69+
70+
# ------------------------------------------------------------------
71+
# Factory
72+
# ------------------------------------------------------------------
73+
74+
@classmethod
75+
def from_config(
76+
cls,
77+
config: "ProcessorConfig",
78+
entitlements: "Entitlements | None" = None,
79+
duplicate_detector: "IDuplicateDetector | None" = None,
80+
sorting_service: "ITransactionSorting | None" = None,
81+
grouping_service: "IIBANGrouping | None" = None,
82+
) -> "ServiceRegistry":
83+
"""Build a ServiceRegistry from a ProcessorConfig.
84+
85+
Args:
86+
config: Processor configuration carrying column, sorting, and
87+
processing settings.
88+
entitlements: Optional tier-based entitlements.
89+
duplicate_detector: Override duplicate detector (default: AllFields).
90+
sorting_service: Override sorting service (default: chronological
91+
if config.processing.sort_by_date, else no-sort).
92+
grouping_service: Override IBAN grouping service (default: suffix-4).
93+
94+
Returns:
95+
Fully wired ServiceRegistry instance.
96+
"""
97+
from bankstatements_core.config.column_config import get_column_names
98+
from bankstatements_core.patterns.strategies import AllFieldsDuplicateStrategy
99+
from bankstatements_core.processor import find_matching_columns
100+
from bankstatements_core.services.duplicate_detector import (
101+
DuplicateDetectionService,
102+
)
103+
from bankstatements_core.services.iban_grouping import IBANGroupingService
104+
from bankstatements_core.services.sorting_service import (
105+
ChronologicalSortingStrategy,
106+
NoSortingStrategy,
107+
TransactionSortingService,
108+
)
109+
110+
column_names = (
111+
get_column_names(config.extraction.columns)
112+
if config.extraction.columns
113+
else []
114+
)
115+
debit_columns = find_matching_columns(column_names, ["debit"])
116+
credit_columns = find_matching_columns(column_names, ["credit"])
117+
118+
context = _ServiceContext(
119+
column_names=column_names,
120+
debit_columns=debit_columns,
121+
credit_columns=credit_columns,
122+
entitlements=entitlements,
123+
)
124+
125+
if duplicate_detector is None:
126+
duplicate_detector = DuplicateDetectionService(AllFieldsDuplicateStrategy())
127+
128+
if sorting_service is None:
129+
sort_strategy = (
130+
ChronologicalSortingStrategy()
131+
if config.processing.sort_by_date
132+
else NoSortingStrategy()
133+
)
134+
sorting_service = TransactionSortingService(sort_strategy)
135+
136+
if grouping_service is None:
137+
grouping_service = IBANGroupingService()
138+
139+
return cls(context, duplicate_detector, sorting_service, grouping_service)
140+
141+
# ------------------------------------------------------------------
142+
# Primary methods (80 % case)
143+
# ------------------------------------------------------------------
144+
145+
def process_transaction_group(
146+
self,
147+
transactions: list[dict],
148+
template: "BankTemplate | None" = None,
149+
) -> tuple[list[dict], list[dict]]:
150+
"""Enrich → classify → deduplicate → sort a group of transactions.
151+
152+
This replaces the explicit five-call chain that was previously spread
153+
across BankStatementProcessor and TransactionProcessingOrchestrator.
154+
155+
Args:
156+
transactions: List of transaction dicts for a single IBAN group.
157+
template: Optional bank template used for transaction type keywords.
158+
159+
Returns:
160+
Tuple of (unique_transactions, duplicate_transactions).
161+
"""
162+
enriched = self._enrich_with_filename(transactions)
163+
enriched = self._enrich_with_document_type(enriched)
164+
enriched = self._classify_transaction_types(enriched, template)
165+
166+
unique_rows, duplicate_rows = self._duplicate_detector.detect_and_separate(
167+
enriched
168+
)
169+
logger.info(
170+
"Duplicate detection: %d unique, %d duplicates",
171+
len(unique_rows),
172+
len(duplicate_rows),
173+
)
174+
175+
sorted_rows = self._sorting_service.sort(unique_rows)
176+
return sorted_rows, duplicate_rows
177+
178+
def group_by_iban(
179+
self,
180+
transactions: list[dict],
181+
pdf_ibans: dict[str, str],
182+
) -> dict[str, list[dict]]:
183+
"""Group transactions by IBAN suffix.
184+
185+
Args:
186+
transactions: Flat list of all transaction dicts.
187+
pdf_ibans: Mapping of PDF filename → IBAN string.
188+
189+
Returns:
190+
Dict of IBAN suffix → list of transaction dicts.
191+
"""
192+
return self._grouping_service.group_by_iban(transactions, pdf_ibans)
193+
194+
# ------------------------------------------------------------------
195+
# Escape hatches (20 % case)
196+
# ------------------------------------------------------------------
197+
198+
def get_duplicate_detector(self) -> "IDuplicateDetector":
199+
return self._duplicate_detector
200+
201+
def get_sorting_service(self) -> "ITransactionSorting":
202+
return self._sorting_service
203+
204+
def get_grouping_service(self) -> "IIBANGrouping":
205+
return self._grouping_service
206+
207+
# ------------------------------------------------------------------
208+
# Internal enrichment helpers (inlined from TransactionProcessingOrchestrator)
209+
# ------------------------------------------------------------------
210+
211+
@staticmethod
212+
def _enrich_with_filename(transactions: list[dict]) -> list[dict]:
213+
"""Set Filename key from source_pdf if not already present."""
214+
for row in transactions:
215+
if "Filename" not in row:
216+
row["Filename"] = row.get("source_pdf", "")
217+
return transactions
218+
219+
@staticmethod
220+
def _enrich_with_document_type(
221+
transactions: list[dict], default_type: str = "bank_statement"
222+
) -> list[dict]:
223+
"""Set document_type if not already present."""
224+
for row in transactions:
225+
if "document_type" not in row:
226+
row["document_type"] = default_type
227+
return transactions
228+
229+
@staticmethod
230+
def _classify_transaction_types(
231+
transactions: list[dict],
232+
template: "BankTemplate | None" = None,
233+
) -> list[dict]:
234+
"""Classify each transaction using Chain of Responsibility."""
235+
from bankstatements_core.services.transaction_type_classifier import (
236+
create_transaction_type_classifier_chain,
237+
)
238+
239+
if not transactions:
240+
return transactions
241+
242+
document_type = transactions[0].get("document_type")
243+
classifier = create_transaction_type_classifier_chain(document_type)
244+
245+
for transaction in transactions:
246+
transaction["transaction_type"] = classifier.classify(transaction, template)
247+
248+
logger.info(
249+
"Transaction type classification: %d transactions classified",
250+
len(transactions),
251+
)
252+
return transactions

0 commit comments

Comments
 (0)