From 8d3491d366459f3a555a7007b1be5dd1ce0289ae Mon Sep 17 00:00:00 2001 From: ketaki-deodhar Date: Thu, 8 Jan 2026 16:04:47 -0800 Subject: [PATCH 1/2] 31759 - update verify flow to use migration filter --- data-tool/flows/config.py | 2 + data-tool/flows/corps_verify_flow.py | 105 ++++++++++++++++++++++++--- 2 files changed, 97 insertions(+), 10 deletions(-) diff --git a/data-tool/flows/config.py b/data-tool/flows/config.py index 7118995bfe..c21ba8d486 100644 --- a/data-tool/flows/config.py +++ b/data-tool/flows/config.py @@ -152,6 +152,8 @@ class _Config(): # pylint: disable=too-few-public-methods TOMBSTONE_BATCH_SIZE = int(TOMBSTONE_BATCH_SIZE) if TOMBSTONE_BATCH_SIZE.isnumeric() else 0 # verify flow + VERIFY_BATCHES = os.getenv('VERIFY_BATCHES') + VERIFY_BATCHES = int(VERIFY_BATCHES) if VERIFY_BATCHES.isnumeric() else 0 VERIFY_BATCH_SIZE = os.getenv('VERIFY_BATCH_SIZE') VERIFY_BATCH_SIZE = int(VERIFY_BATCH_SIZE) if VERIFY_BATCH_SIZE.isnumeric() else 0 VERIFY_SUMMARY_PATH = os.getenv('VERIFY_SUMMARY_PATH') diff --git a/data-tool/flows/corps_verify_flow.py b/data-tool/flows/corps_verify_flow.py index 753245f01f..c0fe965d00 100644 --- a/data-tool/flows/corps_verify_flow.py +++ b/data-tool/flows/corps_verify_flow.py @@ -2,9 +2,10 @@ import pandas as pd from common.init_utils import colin_extract_init, get_config, lear_init +from typing import List from prefect import flow, task from prefect.cache_policies import NO_CACHE -from sqlalchemy import Engine, text +from sqlalchemy import Engine, text, bindparam # TODO: adjust clause in different phases @@ -26,6 +27,63 @@ WHERE b.identifier IS NULL """ +# --- MIG corp selection (from COLIN) used when USE_MIGRATION_FILTER = True --- +MIG_CORP_FILTER_BASE = """ + SELECT DISTINCT mcb.corp_num + FROM mig_corp_batch mcb + JOIN mig_batch b ON b.id = mcb.mig_batch_id + JOIN mig_group g ON g.id = b.mig_group_id + WHERE 1 = 1 + {batch_filter} + {group_filter} +""" + + +def _parse_csv(csv_val: str) -> List[int]: + if not csv_val: + return [] + + return [ + int(token) + for token in (t.strip() for t in csv_val.split(',')) + if token.isdigit() + ] + + +@task(cache_policy=NO_CACHE) +def get_mig_corp_candidates(config, colin_engine: Engine) -> List[str]: + """ + Build the candidate corp list purely from MIG metadata (COLIN). + This is used to *replace* suffix-based selection when USE_MIGRATION_FILTER is True. + """ + if not config.USE_MIGRATION_FILTER: + return [] + + # Parse CSV env strings into integer lists for expanding binds + batch_ids = _parse_csv(config.MIG_BATCH_IDS) if config.MIG_BATCH_IDS else [] + group_ids = _parse_csv(config.MIG_GROUP_IDS) if config.MIG_GROUP_IDS else [] + + batch_filter = "AND b.id IN :batch_ids" if batch_ids else "" + group_filter = "AND g.id IN :group_ids" if group_ids else "" + sql = MIG_CORP_FILTER_BASE.format(batch_filter=batch_filter, group_filter=group_filter) + + # Conditionally bind lists with expanding (environment not used by this query) + stmt = text(sql) + params = {} + + if batch_ids: + stmt = stmt.bindparams(bindparam('batch_ids', expanding=True)) + params['batch_ids'] = batch_ids + if group_ids: + stmt = stmt.bindparams(bindparam('group_ids', expanding=True)) + params['group_ids'] = group_ids + + with colin_engine.connect() as conn: + rows = conn.execute(stmt, params).fetchall() + candidates = [r[0] for r in rows] + print(f'👷 MIG corp candidates found: {len(candidates)}') + return candidates + @task(name='1-Count', cache_policy=NO_CACHE) def get_verify_count(colin_engine: Engine) -> int: @@ -36,18 +94,25 @@ def get_verify_count(colin_engine: Engine) -> int: @task(name='2-Verify', cache_policy=NO_CACHE) -def verify(colin_engine: Engine, lear_engine: Engine, limit: int, offset: int) -> list: +def verify(colin_engine: Engine, lear_engine: Engine, + mig_corp_candidates: List[str], limit: int, offset: int) -> list: identifiers = None - with colin_engine.connect() as colin_conn: - rs = colin_conn.execute(text(colin_query), {'limit': limit, 'offset': offset}) - colin_results = rs.fetchall() - identifiers = [row[0] for row in colin_results] + if mig_corp_candidates is not None and len(mig_corp_candidates) > 0: + identifiers = mig_corp_candidates[offset:offset+limit] + else: + with colin_engine.connect() as colin_conn: + rs = colin_conn.execute(text(colin_query), + {'limit': limit, 'offset': offset}) + colin_results = rs.fetchall() + identifiers = [row[0] for row in colin_results] + # Now check LEAR for missing corps if identifiers: with lear_engine.connect() as lear_conn: - rs = lear_conn.execute(text(lear_query), {'identifiers': identifiers}) + rs = lear_conn.execute(text(lear_query), + {'identifiers': identifiers}) lear_results = rs.fetchall() missing = [row[0] for row in lear_results] return missing @@ -66,12 +131,30 @@ def verify_flow(): colin_engine = colin_extract_init(config) lear_engine = lear_init(config) - total = get_verify_count(colin_engine) + # Determine mode + mig_mode = bool(config.USE_MIGRATION_FILTER) + mig_corp_candidates: List[str] = [] + if mig_mode: + mig_corp_candidates = get_mig_corp_candidates(config, colin_engine) + print(f'👷 Using MIG filter mode with {len(mig_corp_candidates)} candidates') + # Get total count based on mode (config.USE_MIGRATION_FILTER) + if config.USE_MIGRATION_FILTER and mig_corp_candidates is not None: + total = len(mig_corp_candidates) + else: + total = get_verify_count(colin_engine) + + if config.VERIFY_BATCHES <= 0: + raise ValueError('VERIFY_BATCHES must be explicitly set to a positive integer') if config.VERIFY_BATCH_SIZE <= 0: raise ValueError('VERIFY_BATCH_SIZE must be explicitly set to a positive integer') batch_size = config.VERIFY_BATCH_SIZE - batches = math.ceil(total/batch_size) + + # Determine number of batches based on mode (config.USE_MIGRATION_FILTER) + if mig_mode: + batches = min(math.ceil(total/batch_size), config.VERIFY_BATCHES) + else: + batches = math.ceil(total/batch_size) print(f'🚀 Verifying {total} busiesses...') @@ -81,7 +164,9 @@ def verify_flow(): futures = [] while cnt < batches: print(f'🚀 Running {cnt} round...') - futures.append(verify.submit(colin_engine, lear_engine, batch_size, offset)) + futures.append(verify.submit(colin_engine, lear_engine, + mig_corp_candidates, batch_size, + offset)) offset += batch_size cnt += 1 From df5988f08b1bb0833e26bc5b534cb8337eaf8720 Mon Sep 17 00:00:00 2001 From: ketaki-deodhar Date: Thu, 8 Jan 2026 16:55:51 -0800 Subject: [PATCH 2/2] 31759 - refactor code --- data-tool/flows/corps_verify_flow.py | 77 +++++++++++++--------------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/data-tool/flows/corps_verify_flow.py b/data-tool/flows/corps_verify_flow.py index c0fe965d00..c26f8d1a2c 100644 --- a/data-tool/flows/corps_verify_flow.py +++ b/data-tool/flows/corps_verify_flow.py @@ -131,57 +131,54 @@ def verify_flow(): colin_engine = colin_extract_init(config) lear_engine = lear_init(config) - # Determine mode + # Validate configuration + if config.VERIFY_BATCHES <= 0 or config.VERIFY_BATCH_SIZE <= 0: + raise ValueError('VERIFY_BATCHES and VERIFY_BATCH_SIZE must be positive integers') + + batch_size = config.VERIFY_BATCH_SIZE mig_mode = bool(config.USE_MIGRATION_FILTER) - mig_corp_candidates: List[str] = [] + + # Get migration candidates if in MIG mode + mig_corp_candidates = ( + get_mig_corp_candidates(config, colin_engine) + if mig_mode else [] + ) if mig_mode: - mig_corp_candidates = get_mig_corp_candidates(config, colin_engine) print(f'👷 Using MIG filter mode with {len(mig_corp_candidates)} candidates') - # Get total count based on mode (config.USE_MIGRATION_FILTER) - if config.USE_MIGRATION_FILTER and mig_corp_candidates is not None: - total = len(mig_corp_candidates) - else: - total = get_verify_count(colin_engine) - - if config.VERIFY_BATCHES <= 0: - raise ValueError('VERIFY_BATCHES must be explicitly set to a positive integer') - if config.VERIFY_BATCH_SIZE <= 0: - raise ValueError('VERIFY_BATCH_SIZE must be explicitly set to a positive integer') - batch_size = config.VERIFY_BATCH_SIZE + # Determine total count + total = ( + len(mig_corp_candidates) + if mig_mode else get_verify_count(colin_engine) + ) - # Determine number of batches based on mode (config.USE_MIGRATION_FILTER) - if mig_mode: - batches = min(math.ceil(total/batch_size), config.VERIFY_BATCHES) - else: - batches = math.ceil(total/batch_size) + # Calculate number of batches + batches = ( + min(math.ceil(total / batch_size), config.VERIFY_BATCHES) + if mig_mode else math.ceil(total / batch_size) + ) - print(f'🚀 Verifying {total} busiesses...') + print(f'🚀 Verifying {total} businesses...') - cnt = 0 - offset = 0 + # Process batches results = [] - futures = [] - while cnt < batches: - print(f'🚀 Running {cnt} round...') - futures.append(verify.submit(colin_engine, lear_engine, - mig_corp_candidates, batch_size, - offset)) - offset += batch_size - cnt += 1 - - for f in futures: - r = f.result() - results.extend(r) - - print(f'🌟 Complete round {cnt}') - - if summary_path:=config.VERIFY_SUMMARY_PATH: + for cnt in range(batches): + offset = cnt * batch_size + print(f'🚀 Running batch {cnt + 1}...') + batch_results = verify( + colin_engine, lear_engine, mig_corp_candidates, batch_size, offset + ) + results.extend(batch_results) + + print(f'🌟 Verification complete. Processed {batches} batches.') + + # Save or display results + if summary_path := config.VERIFY_SUMMARY_PATH: df = pd.DataFrame(results, columns=['identifier']) df.to_csv(summary_path, index=False) - print(f"🌰 Save {len(results)} corps which meet the selection criteria but don't exsit in LEAR to {summary_path}") + print(f"🌰 Saved {len(results)} missing corps to {summary_path}") else: - print(f"🌰 {len(results)} corps which meet the selection criteria don't exsit in LEAR: {results}") + print(f"🌰 {len(results)} missing corps: {results}") except Exception as e: raise e