Refactor: bug fixes, resource safety, shared utilities, logging, validation, and performance#4
Open
vjsingh1984 wants to merge 8 commits into
Open
Conversation
- sync_views.py: Fix incorrect header (said sync_tables.py)
- sync_views.py: Fix f-string on dict key ("status" field)
- sync_views.py: Fix operator precedence in filter (missing parens)
- sync_grs_ext.py: Fix missing f-string prefix in error status
- sync_grs_ext.py: Remove unpopulated loaded_table_types column
that caused DataFrame column length mismatch
- examples/clone_to_secondary.py: Fix catalog list (was single
string instead of two list elements)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- sync_tables.py: Wrap both source and target warehouse usage in try/finally blocks to guarantee cleanup on success or failure - sync_shared_tables.py: Add try/finally for target warehouse cleanup - sync_grs_ext.py: Add try/finally for target warehouse cleanup - sync_views.py: Add try/finally for target warehouse cleanup - sync_shared_tables.py: Replace sys.exit() with raise RuntimeError() to avoid killing notebook kernels; remove unused sys import Each leaked warehouse persists in the workspace (even auto-stopped) and can be accidentally restarted, incurring cost. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Create dr_sync/ package with reusable utilities: - sql_utils.py: execute_statement_sync() with timeout/cancellation, managed_warehouse() context manager, drop_table_if_exists() - workspace.py: create_client() factory with env var support - csv_mapping.py: load_mapping() with validation, lookup_value() - thread_utils.py: parallel_map() with error isolation, ProgressCounter - exceptions.py: DRSyncError hierarchy (ConfigurationError, MappingError, StatementError, WarehouseError, SyncError) Refactor sync scripts to use shared utilities: - Replace 8 identical polling loops with execute_statement_sync() - Replace 5 warehouse creation blocks with managed_warehouse() - Replace 2 duplicated drop_table functions with drop_table_if_exists() - Replace raw pd.read_csv calls with load_mapping/lookup_value Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add dr_sync/config.py with DRSyncConfig dataclass supporting: - from_common_module(): backward-compatible import from common.py - from_env(): secure configuration via DR_SYNC_* environment variables - validate(): returns list of configuration errors - Add .env.example template with all DR_SYNC_* variable names - Update all 9 sync scripts to auto-detect config source: env vars used when DR_SYNC_SOURCE_HOST is set, else common.py Existing users editing common.py are not disrupted. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add dr_sync/log.py with setup_logging() for console + optional file output, compatible with Databricks notebooks (stdout) - Replace all 76 print() calls across 9 sync scripts with logger calls: - logger.info() for normal progress messages - logger.warning() for skip conditions (GCP, already exists) - logger.error() for failures (missing mappings, creation errors) - Update dr_sync/sql_utils.py to use logging in managed_warehouse() and drop_table_if_exists() - All log messages use %s lazy formatting instead of f-strings Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add CSV validation functions to dr_sync/csv_mapping.py: - validate_catalog_mapping(): checks for duplicates - validate_cred_mapping(): checks cloud-specific required columns - validate_ext_location_mapping(): checks for empty URLs - Add --dry-run flag to all 9 sync scripts: - SQL scripts: skip warehouse creation entirely, log planned SQL ops - SDK scripts: skip create/update API calls, log what would change - Supported via config.dry_run and DR_SYNC_DRY_RUN env var - Add --log-level flag (DEBUG/INFO/WARNING/ERROR) to all scripts - CLI args only activate in __name__ == "__main__" blocks, so notebook execution is not affected Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace O(n) DataFrame scans with O(1) dict lookups in sync_creds_and_locs.py and sync_catalogs_and_schemas.py. Push Spark SQL filters down from Python to the database engine in sync_grs_ext.py and sync_tables.py. Parallelize schema creation across catalogs.
Run black formatter across all Python files. Fix E402 import order in dr_sync/sql_utils.py. Add ruff.toml with Databricks notebook builtins (spark, sql, display). Add .gitignore for Python, IDE, and env files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #3
Summary
This PR is a comprehensive refactoring of the DR sync scripts, addressing 6 confirmed bugs, warehouse resource leaks, massive code duplication, zero observability, no safety nets, and performance bottlenecks — all while preserving full backward compatibility.
Existing users editing
common.pyand runningpython sync_*.pywill not be disrupted.The changes are organized into 8 atomic commits (reviewable independently):
sync_grs_ext.py, data corruption insync_views.py, operator precedence bug, missing f-strings, example list bugmanaged_warehouse()context manager guarantees cleanup;sys.exit()replaced withRuntimeErrordr_sync/shared utilitiessql_utils,csv_mapping,thread_utils,workspace,exceptions)DRSyncConfigfrom_common_module()(backward compat) andfrom_env()(secure tokens via env vars)print()calls withlogger.info/warning/erroracross 9 scripts--dry-run--dry-runflag,--log-levelCLI support for all scriptsWhy Each Change Matters
Bug Fixes (Commit 1)
The most critical bug is in
sync_grs_ext.py:loaded_table_typesis initialized as an empty list but never populated in the loop body, causing a DataFrame column length mismatch that crashes the script when writing the status table. The operator precedence bug insync_views.pycan cause system tables to be synced or user tables to be skipped.Warehouse Leaks (Commit 2)
Every run of
sync_tables.pyleaks 2 serverless warehouses. Three other scripts leak 1 each. Over repeated DR runs, this accumulates orphaned warehouses that incur cost and clutter. The newmanaged_warehouse()context manager wraps creation intry/finally— even if the script crashes, the warehouse is deleted.Shared Utilities (Commit 3)
The SQL polling loop was copied 8 times across 4 files. When the
sync_grs_ext.pycopy drifted (missing f-string), only that copy broke — a classic duplication hazard. Consolidating intodr_sync/sql_utils.pymeans one implementation to maintain, test, and improve (e.g., the shared version adds timeout protection and statement cancellation that none of the originals had).Configuration (Commit 4)
DRSyncConfig.from_common_module()reads the existingcommon.py— zero disruption. Butfrom_env()provides a secure alternative:DR_SYNC_SOURCE_TOKENas an env var instead of a checked-in file. Scripts auto-detect which to use.Logging (Commit 5)
DR operations are high-stakes. When something fails at 3 AM, operators need timestamps, log levels, and the ability to grep for
ERROR— not undifferentiatedprint()output. All 76 calls are now properly leveled (infofor progress,warningfor skips,errorfor failures).Dry-Run (Commit 6)
python sync_tables.py --dry-runshows every table that would be cloned, every warehouse that would be created, every external table that would be dropped — without executing anything. This lets operators validate their configuration before committing to a DR sync. CSV validation catches mapping file errors (duplicates, missing columns, empty values) before execution begins.Performance (Commit 7)
For workspaces with many credentials/locations, the O(n)→O(1) lookup improvement is meaningful. Pushing SQL filters down to Spark eliminates transferring the entire
system.information_schema.tablesto Python. Parallel schema creation speeds up the catalog/schema sync phase.New Files
dr_sync/__init__.pydr_sync/sql_utils.pyexecute_statement_sync(),managed_warehouse(),drop_table_if_exists()dr_sync/config.pyDRSyncConfigdataclassdr_sync/csv_mapping.pydr_sync/thread_utils.pyparallel_map()withas_completed(),ProgressCounterdr_sync/workspace.pycreate_client()factorydr_sync/exceptions.pyDRSyncErrorhierarchydr_sync/log.pysetup_logging().env.exampleruff.tomlHow to Verify
Test Plan
pre-commit run --all-filespasses (black, ruff, mypy)--dry-runand logs planned operationssync_creds_and_locs.pycreates credentials/locations in a test workspacesync_catalogs_and_schemas.pycreates catalogs/schemas in a test workspacesync_tables.pydeep clones tables end-to-endcommon.pyworkflow: editcommon.py, run script, verify it reads configDR_SYNC_*vars, run script, verify it uses them