refactor(reconcile): consolidate data compare (join, mismatch, output) — #745#2509
refactor(reconcile): consolidate data compare (join, mismatch, output) — #745#2509BesikiML wants to merge 5 commits into
Conversation
…cile output assembly; route capture mismatch through _inner_join_for_capture_mismatch; add module docstring for hash vs aggregate vs capture flows.
Restore join_aggregate_data compatibility alias for existing imports and align integration JdbcReaderOptions fixture keys with current dataclass fields.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2509 +/- ##
==========================================
- Coverage 69.10% 69.01% -0.10%
==========================================
Files 105 105
Lines 9482 9521 +39
Branches 1050 1051 +1
==========================================
+ Hits 6553 6571 +18
- Misses 2735 2756 +21
Partials 194 194 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
✅ 169/169 passed, 7 flaky, 2 skipped, 1h14m43s total Flaky tests:
Running from acceptance #4924 |
m-abulazm
left a comment
There was a problem hiding this comment.
Verdict: Behavior-safe, but works against its own goal. The refactor is genuinely behavior-preserving across all three flows (I verified this exhaustively — see below). But for a PR whose stated purpose is
to reduce duplication, it nets +143 lines and ~14 new helpers (only ~6 of which are actually shared), and it bundles in dead code and an unmigrated rename. I'd request changes on the cheap cleanups (dead
constant, rename churn) and ask the author to reconsider the abstraction depth before merge. Nothing here is a correctness blocker.
✅ Behavior preservation — clean (the important part for a refactor)
All three flows trace byte-for-byte equivalent to main. Specifically verified:
- Hash reconcile (reconcile_data/_get_mismatch_data): same full-outer join + alias order, same selectExpr prefixed projection, same persist checkpoint, correct missing-side mapping, same mismatch pipeline.
Projection lists are built from the original df.columns (not the post-withColumn frame), so the added hash_match/agg_data_match columns never leak into a startswith projection — no column-set drift. - Aggregate reconcile: full/cross branches identical; the cross branch never forwards on=None; the col→c loop-variable rename is a verified no-op (it only shadowed the imported col inside a comprehension
that never calls it). - Capture: f"{source_alias}." with source_alias="base" produces a byte-identical column reference to the base 'base.' literal; select_expr order and the final sorted(compare_columns) reorder are unchanged;
backtick/quoted-name normalization path is untouched. - The one semantic-looking change — if mismatch: → if mismatch_df is not None in _data_reconcile_output — is equivalent: PySpark DataFrame defines neither bool nor len, so a non-None frame is always
truthy. (Marginally safer, even.)
static tracing, so those integration tests should be run before merge.
🔧 Findings to address
-
Dead, unrelated constant in conftest.py (request change — easy)
tests/integration/reconcile/conftest.py:75 adds DIAMONDS_JDBC_READER_OPTIONS plus a JdbcReaderOptions import to support it. It's referenced nowhere in the repo (git grep finds only the definition). It's also
orthogonal scope for a compare.py refactor. → Remove the constant and the now-unused import; land any real parallel-read fixture in the PR that actually consumes it. -
Rename + backward-compat alias = pure churn (request change — easy)
join_aggregate_data → prepare_persisted_aggregate_join, then a module-level alias join_aggregate_data = prepare_persisted_aggregate_join "for existing imports/callers." But the only caller
(reconciliation.py:14, :282) was never migrated and still uses the old name — so the new name has zero call sites, and the docstring "Replaces the former entry point" is contradicted by the unchanged caller.
These symbols aren't a public API (no all, not re-exported). → Pick one: migrate the single call site and delete the alias, or drop the rename entirely. Don't ship two names. -
Net +143 lines for a "consolidation"; ~half the new helpers are single-use (the core altitude issue)
~6 helpers genuinely dedupe (≥2 callers) and should stay: _aliased_join, _join_prepare_persist, _filter_to_value_mismatches, _mismatch_projection_for_prefixed_columns, _joined_rows_missing_on_side,
_data_reconcile_output. The other ~8 are single-use wrappers that add indirection without removing duplication: _persist_reconcile_dataframe (a one-line forward), _inner_join_for_capture_mismatch,
_select_prefixed_columns_for_hash_reconcile, _select_aggregate_joined_columns, _value_mismatch_where_both_present, _mismatch_rows_for_prefixed_compare_column, _capture_mismatch_base_compare_projections,
_capture_mismatch_per_column_match_exprs. Per CLAUDE.md "no single-use abstractions," inlining these would likely flip the PR to a net line reduction — matching its stated goal. -
_join_prepare_persist + prepare-callback is heavier than needed (low)
8 params (5 keyword-only) + a Callable callback for 2 call sites, each forced to wrap its select in a closure that re-passes source/target it already holds. Consider: each flow calls the shared _aliased_join
directly, then does its own select + persist. That keeps the one genuinely-shared piece (the cross/full branch) without the pipeline indirection.
📝 Minor / nits
- Naming: prefixed_compare_column / compare_basename are module-novel coinages generalizing over a constant (_HASH_COLUMN_NAME) that has exactly one value at the call site. Prefer existing module vocabulary.
- PR description: references _mismatch_rows_for_aggregate_mappin — the real symbol is _mismatch_rows_for_aggregate_mappings. And it frames the conftest change as "aligning JDBC option fixture keys," but the
diff aligns nothing — it adds one brand-new unused constant. - Dead defensive branch: _aliased_join raises ValueError("join condition 'on' is required…") for how != "cross" and on is None, which no caller can trigger.
👍 Worth keeping
_data_reconcile_output, _joined_rows_missing_on_side, _aliased_join, _filter_to_value_mismatches, _mismatch_projection_for_prefixed_columns, and the col→c shadowing fix are real, behavior-preserving dedup.
Summary
Refactors
compare.pyso hash reconcile, aggregate reconcile, and capture (inner-join) flows share core building blocks where appropriate.Related: #745
What changed
API / migration
prepare_persisted_aggregate_joinoverjoin_aggregate_data._mismatch_rows_for_aggregate_mappin.Testing