Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data-tool/flows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ class _Config(): # pylint: disable=too-few-public-methods
TOMBSTONE_BATCH_SIZE = int(TOMBSTONE_BATCH_SIZE) if TOMBSTONE_BATCH_SIZE.isnumeric() else 0

# verify flow
VERIFY_BATCHES = os.getenv('VERIFY_BATCHES')
VERIFY_BATCHES = int(VERIFY_BATCHES) if VERIFY_BATCHES.isnumeric() else 0
VERIFY_BATCH_SIZE = os.getenv('VERIFY_BATCH_SIZE')
VERIFY_BATCH_SIZE = int(VERIFY_BATCH_SIZE) if VERIFY_BATCH_SIZE.isnumeric() else 0
VERIFY_SUMMARY_PATH = os.getenv('VERIFY_SUMMARY_PATH')
Expand Down
146 changes: 114 additions & 32 deletions data-tool/flows/corps_verify_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import pandas as pd
from common.init_utils import colin_extract_init, get_config, lear_init
from typing import List
from prefect import flow, task
from prefect.cache_policies import NO_CACHE
from sqlalchemy import Engine, text
from sqlalchemy import Engine, text, bindparam


# TODO: adjust clause in different phases
Expand All @@ -26,6 +27,63 @@
WHERE b.identifier IS NULL
"""

# --- MIG corp selection (from COLIN) used when USE_MIGRATION_FILTER = True ---
MIG_CORP_FILTER_BASE = """
SELECT DISTINCT mcb.corp_num
FROM mig_corp_batch mcb
JOIN mig_batch b ON b.id = mcb.mig_batch_id
JOIN mig_group g ON g.id = b.mig_group_id
WHERE 1 = 1
{batch_filter}
{group_filter}
"""


def _parse_csv(csv_val: str) -> List[int]:
if not csv_val:
return []

return [
int(token)
for token in (t.strip() for t in csv_val.split(','))
if token.isdigit()
]


@task(cache_policy=NO_CACHE)
def get_mig_corp_candidates(config, colin_engine: Engine) -> List[str]:
"""
Build the candidate corp list purely from MIG metadata (COLIN).
This is used to *replace* suffix-based selection when USE_MIGRATION_FILTER is True.
"""
if not config.USE_MIGRATION_FILTER:
return []

# Parse CSV env strings into integer lists for expanding binds
batch_ids = _parse_csv(config.MIG_BATCH_IDS) if config.MIG_BATCH_IDS else []
group_ids = _parse_csv(config.MIG_GROUP_IDS) if config.MIG_GROUP_IDS else []

batch_filter = "AND b.id IN :batch_ids" if batch_ids else ""
group_filter = "AND g.id IN :group_ids" if group_ids else ""
sql = MIG_CORP_FILTER_BASE.format(batch_filter=batch_filter, group_filter=group_filter)

# Conditionally bind lists with expanding (environment not used by this query)
stmt = text(sql)
params = {}

if batch_ids:
stmt = stmt.bindparams(bindparam('batch_ids', expanding=True))
params['batch_ids'] = batch_ids
if group_ids:
stmt = stmt.bindparams(bindparam('group_ids', expanding=True))
params['group_ids'] = group_ids

with colin_engine.connect() as conn:
rows = conn.execute(stmt, params).fetchall()
candidates = [r[0] for r in rows]
print(f'👷 MIG corp candidates found: {len(candidates)}')
return candidates


@task(name='1-Count', cache_policy=NO_CACHE)
def get_verify_count(colin_engine: Engine) -> int:
Expand All @@ -36,18 +94,25 @@ def get_verify_count(colin_engine: Engine) -> int:


@task(name='2-Verify', cache_policy=NO_CACHE)
def verify(colin_engine: Engine, lear_engine: Engine, limit: int, offset: int) -> list:
def verify(colin_engine: Engine, lear_engine: Engine,
mig_corp_candidates: List[str], limit: int, offset: int) -> list:

identifiers = None

with colin_engine.connect() as colin_conn:
rs = colin_conn.execute(text(colin_query), {'limit': limit, 'offset': offset})
colin_results = rs.fetchall()
identifiers = [row[0] for row in colin_results]
if mig_corp_candidates is not None and len(mig_corp_candidates) > 0:
identifiers = mig_corp_candidates[offset:offset+limit]
else:
with colin_engine.connect() as colin_conn:
rs = colin_conn.execute(text(colin_query),
{'limit': limit, 'offset': offset})
colin_results = rs.fetchall()
identifiers = [row[0] for row in colin_results]

# Now check LEAR for missing corps
if identifiers:
with lear_engine.connect() as lear_conn:
rs = lear_conn.execute(text(lear_query), {'identifiers': identifiers})
rs = lear_conn.execute(text(lear_query),
{'identifiers': identifiers})
lear_results = rs.fetchall()
missing = [row[0] for row in lear_results]
return missing
Expand All @@ -66,37 +131,54 @@ def verify_flow():
colin_engine = colin_extract_init(config)
lear_engine = lear_init(config)

total = get_verify_count(colin_engine)
# Validate configuration
if config.VERIFY_BATCHES <= 0 or config.VERIFY_BATCH_SIZE <= 0:
raise ValueError('VERIFY_BATCHES and VERIFY_BATCH_SIZE must be positive integers')

if config.VERIFY_BATCH_SIZE <= 0:
raise ValueError('VERIFY_BATCH_SIZE must be explicitly set to a positive integer')
batch_size = config.VERIFY_BATCH_SIZE
batches = math.ceil(total/batch_size)

print(f'🚀 Verifying {total} busiesses...')

cnt = 0
offset = 0
mig_mode = bool(config.USE_MIGRATION_FILTER)

# Get migration candidates if in MIG mode
mig_corp_candidates = (
get_mig_corp_candidates(config, colin_engine)
if mig_mode else []
)
if mig_mode:
print(f'👷 Using MIG filter mode with {len(mig_corp_candidates)} candidates')

# Determine total count
total = (
len(mig_corp_candidates)
if mig_mode else get_verify_count(colin_engine)
)

# Calculate number of batches
batches = (
min(math.ceil(total / batch_size), config.VERIFY_BATCHES)
if mig_mode else math.ceil(total / batch_size)
)

print(f'🚀 Verifying {total} businesses...')

# Process batches
results = []
futures = []
while cnt < batches:
print(f'🚀 Running {cnt} round...')
futures.append(verify.submit(colin_engine, lear_engine, batch_size, offset))
offset += batch_size
cnt += 1

for f in futures:
r = f.result()
results.extend(r)

print(f'🌟 Complete round {cnt}')

if summary_path:=config.VERIFY_SUMMARY_PATH:
for cnt in range(batches):
offset = cnt * batch_size
print(f'🚀 Running batch {cnt + 1}...')
batch_results = verify(
colin_engine, lear_engine, mig_corp_candidates, batch_size, offset
)
results.extend(batch_results)

print(f'🌟 Verification complete. Processed {batches} batches.')
Comment on lines +165 to +173
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep using futures like we did in the previous code. that way we leverage prefect's parallelization functionality.

looking into this more...will have more for you in a bit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try something like this:

# Process batches concurrently (Prefect futures via .submit())
futures = []

# 1) Submit all batch tasks (non-blocking)
for cnt in range(batches):
    offset = cnt * batch_size
    print(f'🚀 Submitting batch {cnt + 1}/{batches}...')
    futures.append(
        verify.submit(
            colin_engine,
            lear_engine,
            mig_corp_candidates,
            batch_size,
            offset,
        )
    )

# 2) Resolve futures (blocking) and collect results
for f in futures:
    batch_results = f.result()
    results.extend(batch_results)

print(f'🌟 Verification complete. Processed {batches} batches.')


# Save or display results
if summary_path := config.VERIFY_SUMMARY_PATH:
df = pd.DataFrame(results, columns=['identifier'])
df.to_csv(summary_path, index=False)
print(f"🌰 Save {len(results)} corps which meet the selection criteria but don't exsit in LEAR to {summary_path}")
print(f"🌰 Saved {len(results)} missing corps to {summary_path}")
else:
print(f"🌰 {len(results)} corps which meet the selection criteria don't exsit in LEAR: {results}")
print(f"🌰 {len(results)} missing corps: {results}")

except Exception as e:
raise e
Expand Down