diff --git a/a4d-python/.env.example b/a4d-python/.env.example index 0ee33a0..0937a10 100644 --- a/a4d-python/.env.example +++ b/a4d-python/.env.example @@ -7,6 +7,11 @@ A4D_DATASET=tracker A4D_DOWNLOAD_BUCKET=a4dphase2_upload A4D_UPLOAD_BUCKET=a4dphase2_output +# GCP Authentication (optional - uses Application Default Credentials if not set) +# For local development: run `gcloud auth application-default login` +# For CI/CD or VM: set path to service account key file +# GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json + # Paths A4D_DATA_ROOT=/path/to/tracker/files A4D_OUTPUT_DIR=output diff --git a/a4d-python/scripts/check_sheets.py b/a4d-python/scripts/check_sheets.py index c85b4c3..0037efb 100644 --- a/a4d-python/scripts/check_sheets.py +++ b/a4d-python/scripts/check_sheets.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 """Check which sheets are being processed by R vs Python.""" -import polars as pl from pathlib import Path +import polars as pl + def check_sheets(): """Compare which sheets were processed.""" diff --git a/a4d-python/scripts/compare_r_vs_python.py b/a4d-python/scripts/compare_r_vs_python.py index 2afb517..43e6a8b 100644 --- a/a4d-python/scripts/compare_r_vs_python.py +++ b/a4d-python/scripts/compare_r_vs_python.py @@ -5,17 +5,20 @@ R and Python pipelines to verify the migration produces equivalent results. Usage: - uv run python scripts/compare_r_vs_python.py --file "2018_CDA A4D Tracker_patient_cleaned.parquet" - uv run python scripts/compare_r_vs_python.py -f "2018_CDA A4D Tracker_patient_cleaned.parquet" + uv run python scripts/compare_r_vs_python.py \ + --file "2018_CDA A4D Tracker_patient_cleaned.parquet" + uv run python scripts/compare_r_vs_python.py \ + -f "2018_CDA A4D Tracker_patient_cleaned.parquet" """ +from pathlib import Path + import polars as pl import typer -from pathlib import Path +from rich import box from rich.console import Console -from rich.table import Table from rich.panel import Panel -from rich import box +from rich.table import Table console = Console() app = typer.Typer() @@ -169,7 +172,7 @@ def compare_metadata_fields(r_df: pl.DataFrame, py_df: pl.DataFrame): sample = r_unique.head(3).to_list() console.print(f" Sample: {sample}") else: - console.print(f" [red]✗ Mismatch![/red]") + console.print(" [red]✗ Mismatch![/red]") console.print(f" R has {len(r_unique):,} unique values") console.print(f" Python has {len(py_unique):,} unique values") @@ -268,7 +271,8 @@ def find_value_mismatches(r_df: pl.DataFrame, py_df: pl.DataFrame): try: joined = r_df.join(py_df, on=join_keys, how="inner", suffix="_py") console.print( - f"[cyan]Analyzing {len(joined):,} common records (matched on {'+'.join(join_keys)})[/cyan]\n" + f"[cyan]Analyzing {len(joined):,} common records " + f"(matched on {'+'.join(join_keys)})[/cyan]\n" ) except Exception as e: console.print(f"[red]Error joining datasets: {e}[/red]\n") @@ -281,8 +285,8 @@ def find_value_mismatches(r_df: pl.DataFrame, py_df: pl.DataFrame): # Tolerance for floating point comparisons # Use relative tolerance of 1e-9 (about 9 decimal places) - FLOAT_REL_TOL = 1e-9 - FLOAT_ABS_TOL = 1e-12 + float_rel_tol = 1e-9 + float_abs_tol = 1e-12 for col in sorted(common_cols): col_py = f"{col}_py" @@ -305,7 +309,8 @@ def find_value_mismatches(r_df: pl.DataFrame, py_df: pl.DataFrame): if is_numeric: # For numeric columns, use approximate comparison - # Two values are considered equal if |a - b| <= max(rel_tol * max(|a|, |b|), abs_tol) + # Two values are equal if: + # |a - b| <= max(rel_tol * max(|a|, |b|), abs_tol) # Add columns for comparison logic comparison_df = joined.with_columns( @@ -315,9 +320,9 @@ def find_value_mismatches(r_df: pl.DataFrame, py_df: pl.DataFrame): # Calculate tolerance threshold pl.max_horizontal( [ - FLOAT_REL_TOL + float_rel_tol * pl.max_horizontal([pl.col(col).abs(), pl.col(col_py).abs()]), - pl.lit(FLOAT_ABS_TOL), + pl.lit(float_abs_tol), ] ).alias("_tolerance"), # Check null status @@ -327,7 +332,8 @@ def find_value_mismatches(r_df: pl.DataFrame, py_df: pl.DataFrame): ) # Find mismatches - # Mismatch if: (1) null status differs OR (2) both not null and differ by more than tolerance + # Mismatch if: (1) null status differs OR + # (2) both not null and differ by more than tolerance mismatched_rows = comparison_df.filter( (pl.col("_col_null") != pl.col("_col_py_null")) # Null mismatch | ( @@ -394,7 +400,8 @@ def find_value_mismatches(r_df: pl.DataFrame, py_df: pl.DataFrame): mismatches.items(), key=lambda x: x[1]["percentage"], reverse=True ): console.print( - f"\n[bold cyan]{col}:[/bold cyan] {stats['count']} mismatches ({stats['percentage']:.1f}%)" + f"\n[bold cyan]{col}:[/bold cyan] " + f"{stats['count']} mismatches ({stats['percentage']:.1f}%)" ) # Include patient_id and sheet_name in examples examples_with_ids = stats["examples_with_ids"] @@ -496,14 +503,14 @@ def compare( console.print(f" ✓ R output: {len(r_df):,} records, {len(r_df.columns)} columns") except Exception as e: console.print(f"[red] ✗ Failed to read R parquet: {e}[/red]") - raise typer.Exit(1) + raise typer.Exit(1) from e try: py_df = pl.read_parquet(python_parquet) console.print(f" ✓ Python output: {len(py_df):,} records, {len(py_df.columns)} columns") except Exception as e: console.print(f"[red] ✗ Failed to read Python parquet: {e}[/red]") - raise typer.Exit(1) + raise typer.Exit(1) from e console.print() diff --git a/a4d-python/scripts/export_single_tracker.py b/a4d-python/scripts/export_single_tracker.py index 3d88c5c..7fda054 100644 --- a/a4d-python/scripts/export_single_tracker.py +++ b/a4d-python/scripts/export_single_tracker.py @@ -5,8 +5,10 @@ uv run python scripts/export_single_tracker.py Example: - uv run python scripts/export_single_tracker.py \ - "/Volumes/USB SanDisk 3.2Gen1 Media/A4D/data/a4dphase2_upload/Malaysia/SBU/2024_Sibu Hospital A4D Tracker.xlsx" \ + uv run python scripts/export_single_tracker.py \\ + "/Volumes/USB SanDisk 3.2Gen1 Media/A4D/data/\\ + a4dphase2_upload/Malaysia/SBU/\\ + 2024_Sibu Hospital A4D Tracker.xlsx" \\ output/patient_data_raw """ diff --git a/a4d-python/scripts/reprocess_tracker.py b/a4d-python/scripts/reprocess_tracker.py index 68be9ed..dfd3f3b 100644 --- a/a4d-python/scripts/reprocess_tracker.py +++ b/a4d-python/scripts/reprocess_tracker.py @@ -2,10 +2,11 @@ """Quick script to re-process a single tracker.""" from pathlib import Path + from a4d.pipeline.tracker import process_tracker_patient tracker_file = Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Cambodia/CDA/2025_06_CDA A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Cambodia/CDA/2025_06_CDA A4D Tracker.xlsx" # noqa: E501 ) output_root = Path("/Volumes/USB SanDisk 3.2Gen1 Media/a4d/output_python") diff --git a/a4d-python/scripts/test_cleaning.py b/a4d-python/scripts/test_cleaning.py index 778dd8e..118c83c 100644 --- a/a4d-python/scripts/test_cleaning.py +++ b/a4d-python/scripts/test_cleaning.py @@ -2,6 +2,7 @@ """Test cleaning pipeline on Sibu Hospital 2024 tracker.""" from pathlib import Path + import polars as pl from a4d.clean.patient import clean_patient_data @@ -27,7 +28,7 @@ def test_cleaning(): # Read raw data df_raw = pl.read_parquet(raw_path) - print(f"\n📥 Raw data loaded:") + print("\n📥 Raw data loaded:") print(f" Rows: {len(df_raw)}") print(f" Columns: {len(df_raw.columns)}") print(f" Columns: {df_raw.columns[:10]}...") @@ -36,15 +37,15 @@ def test_cleaning(): collector = ErrorCollector() # Clean data - print(f"\n🧹 Cleaning data...") + print("\n🧹 Cleaning data...") df_clean = clean_patient_data(df_raw, collector) - print(f"\n📤 Cleaned data:") + print("\n📤 Cleaned data:") print(f" Rows: {len(df_clean)}") print(f" Columns: {len(df_clean.columns)}") # Show schema - print(f"\n📋 Schema (first 20 columns):") + print("\n📋 Schema (first 20 columns):") for i, (col, dtype) in enumerate(df_clean.schema.items()): if i < 20: null_count = df_clean[col].null_count() @@ -55,12 +56,12 @@ def test_cleaning(): print(f"\n⚠️ Errors collected: {len(collector)}") if len(collector) > 0: errors_df = collector.to_dataframe() - print(f"\n Error breakdown by column:") + print("\n Error breakdown by column:") error_counts = errors_df.group_by("column").count().sort("count", descending=True) for row in error_counts.iter_rows(named=True): print(f" {row['column']:40s}: {row['count']:3d} errors") - print(f"\n First 5 errors:") + print("\n First 5 errors:") print(errors_df.head(5)) # Write output @@ -72,7 +73,7 @@ def test_cleaning(): print(f"\n✅ Cleaned data written to: {output_path}") # Sample data check - print(f"\n🔍 Sample row (first non-null patient):") + print("\n🔍 Sample row (first non-null patient):") sample = df_clean.filter(pl.col("patient_id").is_not_null()).head(1) for col in sample.columns[:15]: print(f" {col:40s}: {sample[col][0]}") diff --git a/a4d-python/scripts/test_extended_trackers.py b/a4d-python/scripts/test_extended_trackers.py index bfe4358..b4b5741 100644 --- a/a4d-python/scripts/test_extended_trackers.py +++ b/a4d-python/scripts/test_extended_trackers.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 """Extended end-to-end tests on older tracker files (2018-2021).""" +# Disable logging for clean output +import logging +import sys from pathlib import Path -from a4d.extract.patient import read_all_patient_sheets + from a4d.clean.patient import clean_patient_data from a4d.errors import ErrorCollector -import sys - -# Disable logging for clean output -import logging +from a4d.extract.patient import read_all_patient_sheets logging.disable(logging.CRITICAL) @@ -16,37 +16,37 @@ ( "2021_Siriraj_Thailand", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Thailand/SRJ/2021_Siriraj Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Thailand/SRJ/2021_Siriraj Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ( "2021_UdonThani_Thailand", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Thailand/UTH/2021_Udon Thani Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Thailand/UTH/2021_Udon Thani Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ( "2020_VNC_Vietnam", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Vietnam/VNC/2020_Vietnam National Children's Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Vietnam/VNC/2020_Vietnam National Children's Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ( "2019_Penang_Malaysia", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/PNG/2019_Penang General Hospital A4D Tracker_DC.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/PNG/2019_Penang General Hospital A4D Tracker_DC.xlsx" # noqa: E501 ), ), ( "2019_Mandalay_Myanmar", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Myanmar/MCH/2019_Mandalay Children's Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Myanmar/MCH/2019_Mandalay Children's Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ( "2018_Yangon_Myanmar", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Myanmar/YCH/2018_Yangon Children's Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Myanmar/YCH/2018_Yangon Children's Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ] @@ -83,7 +83,8 @@ ) print( - f" ✅ EXTRACTION: {len(df_raw)} rows, {len(df_raw.columns)} cols, year={year}, months={months}" + f" ✅ EXTRACTION: {len(df_raw)} rows, " + f"{len(df_raw.columns)} cols, year={year}, months={months}" ) # Clean @@ -105,7 +106,8 @@ } print( - f" ✅ CLEANING: {len(df_clean)} rows, {len(df_clean.columns)} cols, {len(collector)} errors" + f" ✅ CLEANING: {len(df_clean)} rows, " + f"{len(df_clean.columns)} cols, {len(collector)} errors" ) print( f" Key columns: insulin_type={stats['insulin_type']}/{len(df_clean)}, " diff --git a/a4d-python/scripts/test_multiple_trackers.py b/a4d-python/scripts/test_multiple_trackers.py index 3a27c41..3e992ea 100644 --- a/a4d-python/scripts/test_multiple_trackers.py +++ b/a4d-python/scripts/test_multiple_trackers.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 """Test extraction + cleaning on multiple trackers for end-to-end validation.""" +# Disable logging for clean output +import logging +import sys from pathlib import Path -from a4d.extract.patient import read_all_patient_sheets + from a4d.clean.patient import clean_patient_data from a4d.errors import ErrorCollector -import sys - -# Disable logging for clean output -import logging +from a4d.extract.patient import read_all_patient_sheets logging.disable(logging.CRITICAL) @@ -16,25 +16,25 @@ ( "2024_ISDFI", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Philippines/ISD/2024_ISDFI A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Philippines/ISD/2024_ISDFI A4D Tracker.xlsx" # noqa: E501 ), ), ( "2024_Penang", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/PNG/2024_Penang General Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/PNG/2024_Penang General Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ( "2023_Sibu", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/SBU/2023_Sibu Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/SBU/2023_Sibu Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ( "2022_Penang", Path( - "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/PNG/2022_Penang General Hospital A4D Tracker.xlsx" + "/Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/Malaysia/PNG/2022_Penang General Hospital A4D Tracker.xlsx" # noqa: E501 ), ), ] @@ -72,7 +72,8 @@ ) print( - f" ✅ EXTRACTION: {len(df_raw)} rows, {len(df_raw.columns)} cols, year={year}, months={months}" + f" ✅ EXTRACTION: {len(df_raw)} rows, " + f"{len(df_raw.columns)} cols, year={year}, months={months}" ) # Clean diff --git a/a4d-python/scripts/verify_fixes.py b/a4d-python/scripts/verify_fixes.py index 9421a23..f0636c1 100644 --- a/a4d-python/scripts/verify_fixes.py +++ b/a4d-python/scripts/verify_fixes.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 """Verify that the Python fixes are working correctly by analyzing the output.""" -import polars as pl from pathlib import Path +import polars as pl + def verify_python_output(): """Verify Python output has correct types and column ordering.""" diff --git a/a4d-python/src/a4d/clean/converters.py b/a4d-python/src/a4d/clean/converters.py index 0f2c3e2..8f9a4fc 100644 --- a/a4d-python/src/a4d/clean/converters.py +++ b/a4d-python/src/a4d/clean/converters.py @@ -165,9 +165,11 @@ def parse_date_column( df = df.with_columns(pl.col(column).alias(f"_orig_{column}")) # Apply parse_date_flexible to each value - # NOTE: Using list-based approach instead of map_elements() because map_elements() - # with return_dtype=pl.Date fails when ALL values are None (all-NA columns like hospitalisation_date). - # Explicit Series creation with dtype=pl.Date works because it doesn't require non-null values. + # NOTE: Using list-based approach instead of map_elements() because + # map_elements() with return_dtype=pl.Date fails when ALL values are None + # (all-NA columns like hospitalisation_date). + # Explicit Series creation with dtype=pl.Date works because it doesn't + # require non-null values. column_values = df[column].cast(pl.Utf8).to_list() parsed_dates = [ parse_date_flexible(val, error_val=settings.error_val_date) for val in column_values diff --git a/a4d-python/src/a4d/clean/date_parser.py b/a4d-python/src/a4d/clean/date_parser.py index 7aaa1a5..896216f 100644 --- a/a4d-python/src/a4d/clean/date_parser.py +++ b/a4d-python/src/a4d/clean/date_parser.py @@ -11,7 +11,6 @@ import re from datetime import date, datetime, timedelta -from typing import Optional from dateutil import parser as date_parser from loguru import logger @@ -20,7 +19,7 @@ EXCEL_EPOCH = date(1899, 12, 30) -def parse_date_flexible(date_str: Optional[str], error_val: str = "9999-09-09") -> Optional[date]: +def parse_date_flexible(date_str: str | None, error_val: str = "9999-09-09") -> date | None: """Parse date strings flexibly using Python's dateutil.parser. Handles common edge cases from A4D tracker data: diff --git a/a4d-python/src/a4d/clean/patient.py b/a4d-python/src/a4d/clean/patient.py index 385dd0b..321ae37 100644 --- a/a4d-python/src/a4d/clean/patient.py +++ b/a4d-python/src/a4d/clean/patient.py @@ -25,10 +25,9 @@ from a4d.clean.schema import ( apply_schema, get_date_columns, - get_numeric_columns, get_patient_data_schema, ) -from a4d.clean.transformers import extract_regimen, str_to_lower +from a4d.clean.transformers import extract_regimen from a4d.clean.validators import validate_all_columns from a4d.config import settings from a4d.errors import ErrorCollector @@ -319,7 +318,8 @@ def _derive_insulin_fields(df: pl.DataFrame) -> pl.DataFrame: For 2024+ trackers: - insulin_type: "human insulin" if any human column is Y, else "analog insulin" - insulin_subtype: Comma-separated list like "pre-mixed,rapid-acting,long-acting" - (will be replaced with "Undefined" by validation since comma-separated values aren't in allowed_values) + (will be replaced with "Undefined" by validation since + comma-separated values aren't in allowed_values) NOTE: Python is CORRECT here. Comparison with R will show differences because R has a typo. @@ -704,7 +704,8 @@ def _fix_age_from_dob(df: pl.DataFrame, error_collector: ErrorCollector) -> pl.D ages_negative += 1 else: logger.warning( - f"Patient {patient_id}: age {excel_age} is different from calculated age {calc_age}. " + f"Patient {patient_id}: age {excel_age} is different " + f"from calculated age {calc_age}. " f"Using calculated age instead of original age." ) error_collector.add_error( @@ -712,7 +713,10 @@ def _fix_age_from_dob(df: pl.DataFrame, error_collector: ErrorCollector) -> pl.D patient_id=patient_id, column="age", original_value=str(excel_age), - error_message=f"Age mismatch: Excel={excel_age}, Calculated={calc_age}. Using calculated age.", + error_message=( + f"Age mismatch: Excel={excel_age}, " + f"Calculated={calc_age}. Using calculated age." + ), error_code="invalid_value", function_name="_fix_age_from_dob", ) @@ -737,7 +741,9 @@ def _fix_age_from_dob(df: pl.DataFrame, error_collector: ErrorCollector) -> pl.D if ages_fixed > 0 or ages_missing > 0 or ages_negative > 0: logger.info( - f"Age fixes applied: {ages_fixed} corrected, {ages_missing} filled from DOB, {ages_negative} negative (set to error)" + f"Age fixes applied: {ages_fixed} corrected, " + f"{ages_missing} filled from DOB, " + f"{ages_negative} negative (set to error)" ) return df @@ -829,7 +835,8 @@ def _validate_dates(df: pl.DataFrame, error_collector: ErrorCollector) -> pl.Dat tracker_year = row.get("tracker_year") logger.warning( - f"Patient {patient_id}: {col} = {original_date} is beyond tracker year {tracker_year}. " + f"Patient {patient_id}: {col} = {original_date} " + f"is beyond tracker year {tracker_year}. " f"Replacing with error date." ) error_collector.add_error( diff --git a/a4d-python/src/a4d/clean/schema.py b/a4d-python/src/a4d/clean/schema.py index cd46447..f767550 100644 --- a/a4d-python/src/a4d/clean/schema.py +++ b/a4d-python/src/a4d/clean/schema.py @@ -1,10 +1,10 @@ """Meta schema definition for patient data - matches R pipeline exactly.""" + import polars as pl -from typing import Dict -def get_patient_data_schema() -> Dict[str, pl.DataType]: +def get_patient_data_schema() -> dict[str, pl.DataType]: """Get the complete meta schema for patient data. This schema EXACTLY matches the R pipeline's schema in script2_process_patient_data.R. diff --git a/a4d-python/src/a4d/clean/schema_old.py b/a4d-python/src/a4d/clean/schema_old.py index e2b562c..6d91d28 100644 --- a/a4d-python/src/a4d/clean/schema_old.py +++ b/a4d-python/src/a4d/clean/schema_old.py @@ -9,11 +9,11 @@ raw data are processed - the rest are left empty. """ + import polars as pl -from typing import Dict -def get_patient_data_schema() -> Dict[str, pl.DataType]: +def get_patient_data_schema() -> dict[str, pl.DataType]: """Get the complete meta schema for patient data. This schema defines ALL columns that should exist in the final diff --git a/a4d-python/src/a4d/clean/transformers.py b/a4d-python/src/a4d/clean/transformers.py index aecf55c..b952023 100644 --- a/a4d-python/src/a4d/clean/transformers.py +++ b/a4d-python/src/a4d/clean/transformers.py @@ -7,8 +7,8 @@ type: basic_function. """ + import polars as pl -import re from a4d.config import settings @@ -371,7 +371,8 @@ def split_bp_in_sys_and_dias(df: pl.DataFrame) -> pl.DataFrame: if has_errors: logger.warning( - f"Found invalid values for column blood_pressure_mmhg that do not follow the format X/Y. " + "Found invalid values for column blood_pressure_mmhg " + f"that do not follow the format X/Y. " f"Values were replaced with {error_val_int}." ) diff --git a/a4d-python/src/a4d/clean/validators.py b/a4d-python/src/a4d/clean/validators.py index 9180693..f279d52 100644 --- a/a4d-python/src/a4d/clean/validators.py +++ b/a4d-python/src/a4d/clean/validators.py @@ -13,13 +13,14 @@ transformers.py for better type safety and maintainability. """ -import polars as pl -from typing import Any import re +from typing import Any + +import polars as pl from a4d.config import settings from a4d.errors import ErrorCollector -from a4d.reference.loaders import load_yaml, get_reference_data_path +from a4d.reference.loaders import get_reference_data_path, load_yaml def sanitize_str(text: str) -> str: @@ -402,7 +403,7 @@ def fix_single_id(patient_id: str | None) -> str | None: patient_id=original, column=patient_id_col, original_value=original, - error_message=f"Patient ID truncated (length > 8)", + error_message="Patient ID truncated (length > 8)", error_code="invalid_value", ) else: @@ -412,7 +413,7 @@ def fix_single_id(patient_id: str | None) -> str | None: patient_id=original, column=patient_id_col, original_value=original, - error_message=f"Invalid patient ID format (expected XX_YY###)", + error_message="Invalid patient ID format (expected XX_YY###)", error_code="invalid_value", ) diff --git a/a4d-python/src/a4d/cli.py b/a4d-python/src/a4d/cli.py index 9307351..51adbf1 100644 --- a/a4d-python/src/a4d/cli.py +++ b/a4d-python/src/a4d/cli.py @@ -209,7 +209,8 @@ def process_patient_cmd( raise typer.Exit(0) else: console.print( - f"\n[bold red]✗ Pipeline completed with {result.failed_trackers} failures[/bold red]\n" + f"\n[bold red]✗ Pipeline completed with " + f"{result.failed_trackers} failures[/bold red]\n" ) raise typer.Exit(1) @@ -291,6 +292,160 @@ def create_tables_cmd( raise typer.Exit(1) from e +@app.command("upload-tables") +def upload_tables_cmd( + tables_dir: Annotated[ + Path, + typer.Option("--tables-dir", "-t", help="Directory containing parquet table files"), + ], + dataset: Annotated[ + str | None, + typer.Option("--dataset", "-d", help="BigQuery dataset name (default: from config)"), + ] = None, + project_id: Annotated[ + str | None, + typer.Option("--project", "-p", help="GCP project ID (default: from config)"), + ] = None, + append: Annotated[ + bool, + typer.Option("--append", help="Append to existing tables instead of replacing"), + ] = False, +): + """Upload pipeline output tables to BigQuery. + + Loads parquet files from the tables directory into the configured + BigQuery dataset. By default, existing tables are replaced (matching + the R pipeline behavior). + + \b + Examples: + # Upload tables from default output directory + uv run a4d upload-tables --tables-dir output/tables + + # Upload to a specific dataset + uv run a4d upload-tables --tables-dir output/tables --dataset tracker_dev + + # Append instead of replace + uv run a4d upload-tables --tables-dir output/tables --append + """ + from a4d.gcp.bigquery import load_pipeline_tables + + console.print("\n[bold blue]A4D BigQuery Upload[/bold blue]\n") + console.print(f"Tables directory: {tables_dir}") + + if not tables_dir.exists(): + console.print(f"[bold red]Error: Directory not found: {tables_dir}[/bold red]\n") + raise typer.Exit(1) + + try: + results = load_pipeline_tables( + tables_dir=tables_dir, + dataset=dataset, + project_id=project_id, + replace=not append, + ) + + if results: + result_table = Table(title="Uploaded Tables") + result_table.add_column("Table", style="cyan") + result_table.add_column("Rows", justify="right", style="green") + result_table.add_column("Status", style="green") + + for table_name, job in results.items(): + result_table.add_row( + table_name, + f"{job.output_rows:,}" if job.output_rows else "?", + "✓", + ) + + console.print(result_table) + console.print( + f"\n[bold green]✓ Uploaded {len(results)} tables to BigQuery[/bold green]\n" + ) + else: + console.print("[bold yellow]No tables found to upload[/bold yellow]\n") + + except Exception as e: + console.print(f"\n[bold red]Error: {e}[/bold red]\n") + raise typer.Exit(1) from e + + +@app.command("download-trackers") +def download_trackers_cmd( + destination: Annotated[ + Path, + typer.Option("--destination", "-d", help="Local directory to download files to"), + ], + bucket: Annotated[ + str | None, + typer.Option("--bucket", "-b", help="GCS bucket name (default: from config)"), + ] = None, +): + """Download tracker files from Google Cloud Storage. + + \b + Examples: + # Download to local directory + uv run a4d download-trackers --destination /data/trackers + + # Download from specific bucket + uv run a4d download-trackers --destination /data/trackers --bucket my-bucket + """ + from a4d.gcp.storage import download_tracker_files + + console.print("\n[bold blue]A4D Tracker Download[/bold blue]\n") + console.print(f"Destination: {destination}") + + try: + downloaded = download_tracker_files(destination=destination, bucket_name=bucket) + console.print(f"\n[bold green]✓ Downloaded {len(downloaded)} files[/bold green]\n") + except Exception as e: + console.print(f"\n[bold red]Error: {e}[/bold red]\n") + raise typer.Exit(1) from e + + +@app.command("upload-output") +def upload_output_cmd( + source_dir: Annotated[ + Path, + typer.Option("--source", "-s", help="Output directory to upload"), + ], + bucket: Annotated[ + str | None, + typer.Option("--bucket", "-b", help="GCS bucket name (default: from config)"), + ] = None, + prefix: Annotated[ + str, + typer.Option("--prefix", help="Prefix for uploaded blob names"), + ] = "", +): + """Upload pipeline output to Google Cloud Storage. + + \b + Examples: + # Upload output directory + uv run a4d upload-output --source output/ + + # Upload with prefix + uv run a4d upload-output --source output/ --prefix 2024-01 + """ + from a4d.gcp.storage import upload_output + + console.print("\n[bold blue]A4D Output Upload[/bold blue]\n") + console.print(f"Source: {source_dir}") + + if not source_dir.exists(): + console.print(f"[bold red]Error: Directory not found: {source_dir}[/bold red]\n") + raise typer.Exit(1) + + try: + uploaded = upload_output(source_dir=source_dir, bucket_name=bucket, prefix=prefix) + console.print(f"\n[bold green]✓ Uploaded {len(uploaded)} files to GCS[/bold green]\n") + except Exception as e: + console.print(f"\n[bold red]Error: {e}[/bold red]\n") + raise typer.Exit(1) from e + + @app.command("version") def version_cmd(): """Show version information.""" diff --git a/a4d-python/src/a4d/extract/patient.py b/a4d-python/src/a4d/extract/patient.py index ed199b5..8e65285 100644 --- a/a4d-python/src/a4d/extract/patient.py +++ b/a4d-python/src/a4d/extract/patient.py @@ -407,7 +407,7 @@ def clean_excel_errors(df: pl.DataFrame) -> pl.DataFrame: >>> clean_df["bmi"].to_list() ['17.5', None, '18.2'] """ - EXCEL_ERRORS = [ + excel_errors = [ "#DIV/0!", "#VALUE!", "#REF!", @@ -432,12 +432,12 @@ def clean_excel_errors(df: pl.DataFrame) -> pl.DataFrame: df = df.with_columns( [ - pl.when(pl.col(col).is_in(EXCEL_ERRORS)).then(None).otherwise(pl.col(col)).alias(col) + pl.when(pl.col(col).is_in(excel_errors)).then(None).otherwise(pl.col(col)).alias(col) for col in data_cols ] ) - for error in EXCEL_ERRORS: + for error in excel_errors: for col in data_cols: count = (df[col] == error).sum() if count > 0: @@ -752,7 +752,10 @@ def read_all_patient_sheets( patient_id="MISSING", column="patient_id", original_value=None, - error_message=f"Row in sheet '{sheet_name}' has missing patient_id (name: {name_value})", + error_message=( + f"Row in sheet '{sheet_name}' has missing " + f"patient_id (name: {name_value})" + ), error_code="missing_required_field", script="extract", function_name="read_all_patient_sheets", @@ -761,7 +764,8 @@ def read_all_patient_sheets( # Filter out ALL rows with missing patient_id df_combined = df_combined.filter(pl.col("patient_id").is_not_null()) - # Filter out empty rows (both patient_id and name are null/empty) - this is redundant now but kept for clarity + # Filter out empty rows (both patient_id and name are null/empty) + # This is redundant now but kept for clarity if "name" in df_combined.columns: df_combined = df_combined.filter( ~( @@ -897,7 +901,8 @@ def read_all_patient_sheets( f"from {len(all_sheets_data)} month sheets" ) - # Reorder: metadata first (tracker_year, tracker_month, clinic_id, patient_id), then patient data + # Reorder: metadata first, then patient data + # (tracker_year, tracker_month, clinic_id, patient_id) priority_cols = ["tracker_year", "tracker_month", "clinic_id", "patient_id"] existing_priority = [c for c in priority_cols if c in df_combined.columns] other_cols = [c for c in df_combined.columns if c not in priority_cols] diff --git a/a4d-python/src/a4d/gcp/__init__.py b/a4d-python/src/a4d/gcp/__init__.py index e69de29..89b75e0 100644 --- a/a4d-python/src/a4d/gcp/__init__.py +++ b/a4d-python/src/a4d/gcp/__init__.py @@ -0,0 +1,21 @@ +from a4d.gcp.bigquery import ( + TABLE_CONFIGS, + get_bigquery_client, + load_pipeline_tables, + load_table, +) +from a4d.gcp.storage import ( + download_tracker_files, + get_storage_client, + upload_output, +) + +__all__ = [ + "TABLE_CONFIGS", + "download_tracker_files", + "get_bigquery_client", + "get_storage_client", + "load_pipeline_tables", + "load_table", + "upload_output", +] diff --git a/a4d-python/src/a4d/gcp/bigquery.py b/a4d-python/src/a4d/gcp/bigquery.py new file mode 100644 index 0000000..ad3d24d --- /dev/null +++ b/a4d-python/src/a4d/gcp/bigquery.py @@ -0,0 +1,187 @@ +"""BigQuery table loading from parquet files. + +Replaces the R pipeline's `ingest_data()` function which used the `bq` CLI tool. +Uses the google-cloud-bigquery Python client for loading parquet files with +clustering configuration matching the R pipeline. +""" + +from pathlib import Path + +from google.cloud import bigquery +from loguru import logger + +from a4d.config import settings + +# Table configurations matching the R pipeline's clustering fields. +# Each table maps to the clustering fields used for optimal query performance. +TABLE_CONFIGS: dict[str, list[str]] = { + "patient_data_monthly": ["clinic_id", "patient_id", "tracker_date"], + "patient_data_annual": ["patient_id", "tracker_date"], + "patient_data_static": ["clinic_id", "patient_id", "tracker_date"], + "patient_data_hba1c": ["clinic_id", "patient_id", "tracker_date"], + "product_data": [ + "clinic_id", + "product_released_to", + "product_table_year", + "product_table_month", + ], + "clinic_data_static": ["clinic_id"], + "logs": ["level", "log_file", "file_name"], + "tracker_metadata": ["file_name", "clinic_code"], +} + +# Maps the pipeline output file names to BigQuery table names. +# Note: table_logs.parquet uses this name from create_table_logs() in tables/logs.py. +PARQUET_TO_TABLE: dict[str, str] = { + "patient_data_static.parquet": "patient_data_static", + "patient_data_monthly.parquet": "patient_data_monthly", + "patient_data_annual.parquet": "patient_data_annual", + "table_logs.parquet": "logs", +} + + +def get_bigquery_client(project_id: str | None = None) -> bigquery.Client: + """Create a BigQuery client. + + Authentication uses Application Default Credentials (ADC): + - In Cloud Run / GCE: automatic via metadata server + - Locally: via `gcloud auth application-default login` + - In CI: via GOOGLE_APPLICATION_CREDENTIALS environment variable + + Args: + project_id: GCP project ID (defaults to settings.project_id) + + Returns: + Configured BigQuery client + """ + return bigquery.Client(project=project_id or settings.project_id) + + +def load_table( + parquet_path: Path, + table_name: str, + client: bigquery.Client | None = None, + dataset: str | None = None, + project_id: str | None = None, + replace: bool = True, +) -> bigquery.LoadJob: + """Load a parquet file into a BigQuery table. + + Replicates the R pipeline's `ingest_data()` function: + 1. Optionally deletes the existing table (replace=True, matching R's delete=T default) + 2. Loads the parquet file with clustering fields + + Args: + parquet_path: Path to the parquet file to load + table_name: BigQuery table name (e.g., "patient_data_monthly") + client: BigQuery client (created if not provided) + dataset: Dataset name (defaults to settings.dataset) + project_id: GCP project ID (defaults to settings.project_id) + replace: If True, replaces the existing table (default matches R pipeline) + + Returns: + Completed LoadJob + + Raises: + FileNotFoundError: If parquet file doesn't exist + ValueError: If table_name is not in TABLE_CONFIGS + google.api_core.exceptions.GoogleAPIError: On BigQuery API errors + """ + if not parquet_path.exists(): + raise FileNotFoundError(f"Parquet file not found: {parquet_path}") + + dataset = dataset or settings.dataset + project_id = project_id or settings.project_id + + if client is None: + client = get_bigquery_client(project_id) + + table_ref = f"{project_id}.{dataset}.{table_name}" + logger.info(f"Loading {parquet_path.name} → {table_ref}") + + # Configure the load job + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET, + write_disposition=( + bigquery.WriteDisposition.WRITE_TRUNCATE + if replace + else bigquery.WriteDisposition.WRITE_APPEND + ), + ) + + # Add clustering if configured for this table + clustering_fields = TABLE_CONFIGS.get(table_name) + if clustering_fields: + job_config.clustering_fields = clustering_fields + logger.info(f"Clustering fields: {clustering_fields}") + + # Load the parquet file + with open(parquet_path, "rb") as f: + load_job = client.load_table_from_file(f, table_ref, job_config=job_config) + + # Wait for completion + load_job.result() + + logger.info( + f"Loaded {load_job.output_rows} rows into {table_ref} " + f"({parquet_path.stat().st_size / 1024 / 1024:.2f} MB)" + ) + return load_job + + +def load_pipeline_tables( + tables_dir: Path, + client: bigquery.Client | None = None, + dataset: str | None = None, + project_id: str | None = None, + replace: bool = True, +) -> dict[str, bigquery.LoadJob]: + """Load all pipeline output tables into BigQuery. + + Scans the tables directory for known parquet files and loads each one + into the corresponding BigQuery table. + + Args: + tables_dir: Directory containing parquet table files (e.g., output/tables/) + client: BigQuery client (created if not provided) + dataset: Dataset name (defaults to settings.dataset) + project_id: GCP project ID (defaults to settings.project_id) + replace: If True, replaces existing tables + + Returns: + Dictionary mapping table name to completed LoadJob + + Raises: + FileNotFoundError: If tables_dir doesn't exist + """ + if not tables_dir.exists(): + raise FileNotFoundError(f"Tables directory not found: {tables_dir}") + + if client is None: + project_id = project_id or settings.project_id + client = get_bigquery_client(project_id) + + logger.info(f"Loading pipeline tables from: {tables_dir}") + + results: dict[str, bigquery.LoadJob] = {} + + for parquet_name, table_name in PARQUET_TO_TABLE.items(): + parquet_path = tables_dir / parquet_name + if parquet_path.exists(): + try: + job = load_table( + parquet_path=parquet_path, + table_name=table_name, + client=client, + dataset=dataset, + project_id=project_id, + replace=replace, + ) + results[table_name] = job + except Exception: + logger.exception(f"Failed to load table: {table_name}") + else: + logger.warning(f"Table file not found, skipping: {parquet_name}") + + logger.info(f"Successfully loaded {len(results)}/{len(PARQUET_TO_TABLE)} tables") + return results diff --git a/a4d-python/src/a4d/gcp/storage.py b/a4d-python/src/a4d/gcp/storage.py new file mode 100644 index 0000000..93adda1 --- /dev/null +++ b/a4d-python/src/a4d/gcp/storage.py @@ -0,0 +1,129 @@ +"""Google Cloud Storage operations for tracker file download and output upload. + +Replaces the R pipeline's `gsutil` CLI calls with the google-cloud-storage +Python client library. +""" + +from pathlib import Path + +from google.cloud import storage +from loguru import logger + +from a4d.config import settings + + +def get_storage_client(project_id: str | None = None) -> storage.Client: + """Create a GCS client. + + Authentication uses Application Default Credentials (ADC): + - In Cloud Run / GCE: automatic via metadata server + - Locally: via `gcloud auth application-default login` + - In CI: via GOOGLE_APPLICATION_CREDENTIALS environment variable + + Args: + project_id: GCP project ID (defaults to settings.project_id) + + Returns: + Configured storage client + """ + return storage.Client(project=project_id or settings.project_id) + + +def download_tracker_files( + destination: Path, + bucket_name: str | None = None, + client: storage.Client | None = None, +) -> list[Path]: + """Download tracker files from GCS bucket. + + Replaces R pipeline's `download_data()` function which used `gsutil -m cp -r`. + Downloads all .xlsx files from the bucket, preserving directory structure. + + Args: + destination: Local directory to download files to + bucket_name: GCS bucket name (defaults to settings.download_bucket) + client: Storage client (created if not provided) + + Returns: + List of downloaded file paths + """ + bucket_name = bucket_name or settings.download_bucket + + if client is None: + client = get_storage_client() + + bucket = client.bucket(bucket_name) + destination.mkdir(parents=True, exist_ok=True) + + logger.info(f"Downloading tracker files from gs://{bucket_name} to {destination}") + + downloaded: list[Path] = [] + blobs = list(bucket.list_blobs()) + logger.info(f"Found {len(blobs)} objects in bucket") + + for blob in blobs: + # Skip directory markers + if blob.name.endswith("/"): + continue + + local_path = destination / blob.name + local_path.parent.mkdir(parents=True, exist_ok=True) + + logger.debug(f"Downloading: {blob.name}") + blob.download_to_filename(str(local_path)) + downloaded.append(local_path) + + logger.info(f"Downloaded {len(downloaded)} files") + return downloaded + + +def upload_output( + source_dir: Path, + bucket_name: str | None = None, + prefix: str = "", + client: storage.Client | None = None, +) -> list[str]: + """Upload output directory to GCS bucket. + + Replaces R pipeline's `upload_data()` function which used `gsutil -m cp -r`. + Uploads all files from the source directory, preserving directory structure. + + Args: + source_dir: Local directory to upload + bucket_name: GCS bucket name (defaults to settings.upload_bucket) + prefix: Optional prefix for uploaded blob names + client: Storage client (created if not provided) + + Returns: + List of uploaded blob names + + Raises: + FileNotFoundError: If source directory doesn't exist + """ + if not source_dir.exists(): + raise FileNotFoundError(f"Source directory not found: {source_dir}") + + bucket_name = bucket_name or settings.upload_bucket + + if client is None: + client = get_storage_client() + + bucket = client.bucket(bucket_name) + + logger.info(f"Uploading {source_dir} to gs://{bucket_name}/{prefix}") + + uploaded: list[str] = [] + files = [f for f in source_dir.rglob("*") if f.is_file()] + + for file_path in files: + relative_path = file_path.relative_to(source_dir) + blob_name = f"{prefix}/{relative_path}" if prefix else str(relative_path) + blob_name = blob_name.replace("\\", "/") # Windows compatibility + + logger.debug(f"Uploading: {blob_name}") + blob = bucket.blob(blob_name) + blob.upload_from_filename(str(file_path)) + uploaded.append(blob_name) + + logger.info(f"Uploaded {len(uploaded)} files to gs://{bucket_name}") + return uploaded diff --git a/a4d-python/src/a4d/logging.py b/a4d-python/src/a4d/logging.py index 19d27a9..d9ca150 100644 --- a/a4d-python/src/a4d/logging.py +++ b/a4d-python/src/a4d/logging.py @@ -70,7 +70,11 @@ def setup_logging( sys.stdout, level=console_log_level, colorize=True, - format="{time:HH:mm:ss} | {level: <8} | {message}", + format=( + "{time:HH:mm:ss} | " + "{level: <8} | " + "{message}" + ), ) # File handler: JSON output for BigQuery upload diff --git a/a4d-python/src/a4d/pipeline/patient.py b/a4d-python/src/a4d/pipeline/patient.py index 271bb41..b320c59 100644 --- a/a4d-python/src/a4d/pipeline/patient.py +++ b/a4d-python/src/a4d/pipeline/patient.py @@ -1,10 +1,10 @@ """Main patient pipeline orchestration.""" import os +from collections.abc import Callable from concurrent.futures import ProcessPoolExecutor, as_completed from datetime import datetime from pathlib import Path -from typing import Callable from loguru import logger from tqdm import tqdm @@ -312,7 +312,7 @@ def run_patient_pipeline( logger.info(f"Logs table created: {logs_table_path}") logger.info(f"Created {len(tables)} tables total") - except Exception as e: + except Exception: logger.exception("Failed to create tables") # Don't fail entire pipeline if table creation fails else: diff --git a/a4d-python/src/a4d/reference/provinces.py b/a4d-python/src/a4d/reference/provinces.py index 59df048..2fa1694 100644 --- a/a4d-python/src/a4d/reference/provinces.py +++ b/a4d-python/src/a4d/reference/provinces.py @@ -99,7 +99,8 @@ def load_canonical_provinces() -> list[str]: all_provinces.extend(provinces) logger.info( - f"Loaded {len(all_provinces)} canonical province names from {len(provinces_by_country)} countries" + f"Loaded {len(all_provinces)} canonical province names " + f"from {len(provinces_by_country)} countries" ) return all_provinces diff --git a/a4d-python/src/a4d/reference/synonyms.py b/a4d-python/src/a4d/reference/synonyms.py index b230f6c..6d1c778 100644 --- a/a4d-python/src/a4d/reference/synonyms.py +++ b/a4d-python/src/a4d/reference/synonyms.py @@ -205,7 +205,8 @@ def rename_columns( if unmapped_columns: if strict: raise ValueError( - f"Unmapped columns found: {unmapped_columns}. These columns do not appear in the synonym file." + f"Unmapped columns found: {unmapped_columns}. " + "These columns do not appear in the synonym file." ) else: logger.warning( @@ -222,7 +223,8 @@ def rename_columns( duplicates = {t: c for t, c in target_counts.items() if c > 1} logger.warning( f"Multiple source columns map to same target name: {duplicates}. " - f"Keeping first occurrence only. This is an edge case from discontinued 2023 format." + "Keeping first occurrence only. " + "This is an edge case from discontinued 2023 format." ) # Keep only first occurrence of each target diff --git a/a4d-python/tests/test_extract/test_patient.py b/a4d-python/tests/test_extract/test_patient.py index f930241..0d2d31d 100644 --- a/a4d-python/tests/test_extract/test_patient.py +++ b/a4d-python/tests/test_extract/test_patient.py @@ -160,7 +160,7 @@ def test_find_month_sheets_2024(): reason="Tracker files not available", ) @pytest.mark.parametrize( - "tracker_file,sheet_name,year,expected_patients,expected_cols,notes", + ("tracker_file", "sheet_name", "year", "expected_patients", "expected_cols", "notes"), TRACKER_TEST_CASES, ids=lambda params: f"{params[1] if isinstance(params, tuple) and len(params) > 1 else params}", ) diff --git a/a4d-python/tests/test_gcp/__init__.py b/a4d-python/tests/test_gcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/a4d-python/tests/test_gcp/test_bigquery.py b/a4d-python/tests/test_gcp/test_bigquery.py new file mode 100644 index 0000000..8512092 --- /dev/null +++ b/a4d-python/tests/test_gcp/test_bigquery.py @@ -0,0 +1,173 @@ +"""Tests for BigQuery loading module.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from a4d.gcp.bigquery import ( + PARQUET_TO_TABLE, + TABLE_CONFIGS, + load_pipeline_tables, + load_table, +) + + +def _get_job_config(mock_client): + """Extract job_config from mock client's load_table_from_file call.""" + return mock_client.load_table_from_file.call_args.kwargs["job_config"] + + +class TestTableConfigs: + """Test that table configurations match the R pipeline.""" + + def test_patient_data_monthly_clustering(self): + assert TABLE_CONFIGS["patient_data_monthly"] == [ + "clinic_id", + "patient_id", + "tracker_date", + ] + + def test_patient_data_annual_clustering(self): + assert TABLE_CONFIGS["patient_data_annual"] == ["patient_id", "tracker_date"] + + def test_patient_data_static_clustering(self): + assert TABLE_CONFIGS["patient_data_static"] == [ + "clinic_id", + "patient_id", + "tracker_date", + ] + + def test_all_pipeline_tables_have_configs(self): + for table_name in PARQUET_TO_TABLE.values(): + assert table_name in TABLE_CONFIGS, f"Missing config for {table_name}" + + +class TestLoadTable: + """Test loading a single parquet file to BigQuery.""" + + def test_raises_file_not_found(self, tmp_path): + missing_file = tmp_path / "missing.parquet" + with pytest.raises(FileNotFoundError, match="Parquet file not found"): + load_table(missing_file, "patient_data_monthly") + + @patch("a4d.gcp.bigquery.get_bigquery_client") + def test_load_table_with_replace(self, mock_get_client, tmp_path): + parquet_file = tmp_path / "test.parquet" + parquet_file.write_bytes(b"fake parquet data") + + mock_client = MagicMock() + mock_job = MagicMock() + mock_job.output_rows = 100 + mock_client.load_table_from_file.return_value = mock_job + mock_get_client.return_value = mock_client + + load_table(parquet_file, "patient_data_monthly", client=mock_client) + + mock_client.load_table_from_file.assert_called_once() + job_config = _get_job_config(mock_client) + assert job_config.clustering_fields == ["clinic_id", "patient_id", "tracker_date"] + mock_job.result.assert_called_once() + + @patch("a4d.gcp.bigquery.get_bigquery_client") + def test_load_table_with_append(self, mock_get_client, tmp_path): + parquet_file = tmp_path / "test.parquet" + parquet_file.write_bytes(b"fake parquet data") + + mock_client = MagicMock() + mock_job = MagicMock() + mock_job.output_rows = 50 + mock_client.load_table_from_file.return_value = mock_job + + load_table(parquet_file, "patient_data_monthly", client=mock_client, replace=False) + + job_config = _get_job_config(mock_client) + assert job_config.write_disposition == "WRITE_APPEND" + + @patch("a4d.gcp.bigquery.get_bigquery_client") + def test_load_table_correct_table_ref(self, mock_get_client, tmp_path): + parquet_file = tmp_path / "test.parquet" + parquet_file.write_bytes(b"fake parquet data") + + mock_client = MagicMock() + mock_job = MagicMock() + mock_job.output_rows = 10 + mock_client.load_table_from_file.return_value = mock_job + + load_table( + parquet_file, + "patient_data_static", + client=mock_client, + dataset="test_dataset", + project_id="test_project", + ) + + table_ref = mock_client.load_table_from_file.call_args.args[1] + assert table_ref == "test_project.test_dataset.patient_data_static" + + +class TestLoadPipelineTables: + """Test loading all pipeline tables.""" + + def test_raises_if_dir_missing(self, tmp_path): + missing_dir = tmp_path / "nonexistent" + with pytest.raises(FileNotFoundError, match="Tables directory not found"): + load_pipeline_tables(missing_dir) + + @patch("a4d.gcp.bigquery.load_table") + @patch("a4d.gcp.bigquery.get_bigquery_client") + def test_loads_existing_tables(self, mock_get_client, mock_load, tmp_path): + tables_dir = tmp_path / "tables" + tables_dir.mkdir() + + # Create some table files + (tables_dir / "patient_data_static.parquet").write_bytes(b"data") + (tables_dir / "patient_data_monthly.parquet").write_bytes(b"data") + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_load.return_value = MagicMock() + + results = load_pipeline_tables(tables_dir, client=mock_client) + + assert mock_load.call_count == 2 + assert "patient_data_static" in results + assert "patient_data_monthly" in results + + @patch("a4d.gcp.bigquery.load_table") + @patch("a4d.gcp.bigquery.get_bigquery_client") + def test_skips_missing_tables(self, mock_get_client, mock_load, tmp_path): + tables_dir = tmp_path / "tables" + tables_dir.mkdir() + + # Only create one table file + (tables_dir / "patient_data_static.parquet").write_bytes(b"data") + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_load.return_value = MagicMock() + + results = load_pipeline_tables(tables_dir, client=mock_client) + + assert mock_load.call_count == 1 + assert "patient_data_static" in results + assert "patient_data_monthly" not in results + + @patch("a4d.gcp.bigquery.load_table") + @patch("a4d.gcp.bigquery.get_bigquery_client") + def test_continues_on_single_table_failure(self, mock_get_client, mock_load, tmp_path): + tables_dir = tmp_path / "tables" + tables_dir.mkdir() + + (tables_dir / "patient_data_static.parquet").write_bytes(b"data") + (tables_dir / "patient_data_monthly.parquet").write_bytes(b"data") + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + # First call succeeds, second fails + mock_load.side_effect = [MagicMock(), Exception("API error")] + + results = load_pipeline_tables(tables_dir, client=mock_client) + + # Should have one success despite the failure + assert len(results) == 1 diff --git a/a4d-python/tests/test_gcp/test_storage.py b/a4d-python/tests/test_gcp/test_storage.py new file mode 100644 index 0000000..77ff437 --- /dev/null +++ b/a4d-python/tests/test_gcp/test_storage.py @@ -0,0 +1,114 @@ +"""Tests for Google Cloud Storage module.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from a4d.gcp.storage import download_tracker_files, upload_output + + +class TestDownloadTrackerFiles: + """Test downloading tracker files from GCS.""" + + @patch("a4d.gcp.storage.get_storage_client") + def test_downloads_files(self, mock_get_client, tmp_path): + destination = tmp_path / "trackers" + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_bucket = MagicMock() + mock_client.bucket.return_value = mock_bucket + + # Simulate blobs in bucket + blob1 = MagicMock() + blob1.name = "2024/tracker1.xlsx" + blob2 = MagicMock() + blob2.name = "2024/tracker2.xlsx" + mock_bucket.list_blobs.return_value = [blob1, blob2] + + result = download_tracker_files(destination, client=mock_client) + + assert len(result) == 2 + assert blob1.download_to_filename.called + assert blob2.download_to_filename.called + + @patch("a4d.gcp.storage.get_storage_client") + def test_skips_directory_markers(self, mock_get_client, tmp_path): + destination = tmp_path / "trackers" + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_bucket = MagicMock() + mock_client.bucket.return_value = mock_bucket + + blob_dir = MagicMock() + blob_dir.name = "2024/" + blob_file = MagicMock() + blob_file.name = "2024/tracker.xlsx" + mock_bucket.list_blobs.return_value = [blob_dir, blob_file] + + result = download_tracker_files(destination, client=mock_client) + + assert len(result) == 1 + assert not blob_dir.download_to_filename.called + + @patch("a4d.gcp.storage.get_storage_client") + def test_creates_destination_directory(self, mock_get_client, tmp_path): + destination = tmp_path / "new" / "dir" + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_bucket = MagicMock() + mock_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [] + + download_tracker_files(destination, client=mock_client) + + assert destination.exists() + + +class TestUploadOutput: + """Test uploading output to GCS.""" + + def test_raises_if_source_missing(self, tmp_path): + missing_dir = tmp_path / "nonexistent" + with pytest.raises(FileNotFoundError, match="Source directory not found"): + upload_output(missing_dir) + + @patch("a4d.gcp.storage.get_storage_client") + def test_uploads_files(self, mock_get_client, tmp_path): + source = tmp_path / "output" + source.mkdir() + (source / "tables").mkdir() + (source / "tables" / "data.parquet").write_bytes(b"data") + (source / "logs.txt").write_text("log") + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_bucket = MagicMock() + mock_client.bucket.return_value = mock_bucket + mock_blob = MagicMock() + mock_bucket.blob.return_value = mock_blob + + result = upload_output(source, client=mock_client) + + assert len(result) == 2 + assert mock_blob.upload_from_filename.call_count == 2 + + @patch("a4d.gcp.storage.get_storage_client") + def test_upload_with_prefix(self, mock_get_client, tmp_path): + source = tmp_path / "output" + source.mkdir() + (source / "file.parquet").write_bytes(b"data") + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_bucket = MagicMock() + mock_client.bucket.return_value = mock_bucket + mock_blob = MagicMock() + mock_bucket.blob.return_value = mock_blob + + result = upload_output(source, prefix="2024-01", client=mock_client) + + assert len(result) == 1 + assert result[0] == "2024-01/file.parquet" diff --git a/a4d-python/tests/test_integration/test_clean_integration.py b/a4d-python/tests/test_integration/test_clean_integration.py index 21e5fdf..a8423f4 100644 --- a/a4d-python/tests/test_integration/test_clean_integration.py +++ b/a4d-python/tests/test_integration/test_clean_integration.py @@ -61,7 +61,7 @@ def test_clean_tracks_errors(self, tracker_2024_penang): df_raw = read_all_patient_sheets(tracker_2024_penang) collector = ErrorCollector() - df_clean = clean_patient_data(df_raw, collector) + clean_patient_data(df_raw, collector) # Should have some errors (type conversions, invalid values, etc.) # Exact count varies, but should be non-zero for this tracker diff --git a/a4d-python/tests/test_integration/test_e2e.py b/a4d-python/tests/test_integration/test_e2e.py index 2bf5c08..c4ed7bf 100644 --- a/a4d-python/tests/test_integration/test_e2e.py +++ b/a4d-python/tests/test_integration/test_e2e.py @@ -18,7 +18,7 @@ @pytest.mark.parametrize( - "tracker_fixture,expected_rows,expected_year,description", + ("tracker_fixture", "expected_rows", "expected_year", "description"), [ ("tracker_2024_penang", 174, 2024, "2024 Penang - Annual + Patient List"), ("tracker_2024_isdfi", 70, 2024, "2024 ISDFI Philippines"), diff --git a/a4d-python/tests/test_integration/test_r_validation.py b/a4d-python/tests/test_integration/test_r_validation.py index 4eab9d2..08d9fe6 100644 --- a/a4d-python/tests/test_integration/test_r_validation.py +++ b/a4d-python/tests/test_integration/test_r_validation.py @@ -4,8 +4,10 @@ the final cleaned parquet files for all 174 trackers. These tests require: -- R pipeline outputs in: /Volumes/USB SanDisk 3.2Gen1 Media/a4d/output_r/patient_data_cleaned/ -- Python pipeline outputs in: /Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/output/patient_data_cleaned/ +- R pipeline outputs in: + /Volumes/USB SanDisk 3.2Gen1 Media/a4d/output_r/patient_data_cleaned/ +- Python pipeline outputs in: + /Volumes/USB SanDisk 3.2Gen1 Media/a4d/a4dphase2_upload/output/patient_data_cleaned/ Run with: uv run pytest tests/test_integration/test_r_validation.py -v -m slow """ @@ -31,11 +33,18 @@ }, "2024_Mahosot Hospital A4D Tracker_patient_cleaned.parquet": { "record_diff": 1, - "reason": "Python correctly extracts LA-MH088 which is missing row number in Excel column A; R incorrectly drops it", + "reason": ( + "Python correctly extracts LA-MH088 which is missing row number " + "in Excel column A; R incorrectly drops it" + ), }, "2022_Children's Hospital 2 A4D Tracker_patient_cleaned.parquet": { "record_diff": -15, - "reason": "Excel data quality issue: Oct22 sheet has space instead of 1 in column A for first patient row, causing Python to misdetect headers and skip October (15 rows). R handles this differently.", + "reason": ( + "Excel data quality issue: Oct22 sheet has space instead of 1 " + "in column A for first patient row, causing Python to misdetect " + "headers and skip October (15 rows). R handles this differently." + ), }, } @@ -44,22 +53,37 @@ # If the issue is fixed, the test will FAIL with a message to remove it from this dict KNOWN_ISSUES = { "2018_Penang General Hospital A4D Tracker_DC_patient_cleaned.parquet": { - "duplicate_records": "Excel has duplicate patient_id MY_PN004 in Oct18 sheet that needs to be fixed", + "duplicate_records": ( + "Excel has duplicate patient_id MY_PN004 in Oct18 sheet " + "that needs to be fixed" + ), }, "2023_Vietnam National Children's Hospital A4D Tracker_patient_cleaned.parquet": { - "duplicate_records": "Excel has duplicate patient_id VN_VC026 in Aug23 sheet that needs to be fixed", + "duplicate_records": ( + "Excel has duplicate patient_id VN_VC026 in Aug23 sheet " + "that needs to be fixed" + ), }, "2023_NPH A4D Tracker_patient_cleaned.parquet": { - "duplicate_records": "4 patients KH_NPH026, KH_NPH027, KH_NPH028, KH_NPH029 have incorrect patient_id in Sep23 and Oct23 and are truncated to KH_NPH02 causing duplicates", + "duplicate_records": ( + "4 patients KH_NPH026, KH_NPH027, KH_NPH028, KH_NPH029 have " + "incorrect patient_id in Sep23 and Oct23 and are truncated to " + "KH_NPH02 causing duplicates" + ), }, "2025_06_North Okkalapa General Hospital A4D Tracker_patient_cleaned.parquet": { - "patient_id_format": "R replaces MM_NO097/098/099 with 'Undefined' due to format validation. Python correctly preserves original IDs.", + "patient_id_format": ( + "R replaces MM_NO097/098/099 with 'Undefined' due to format " + "validation. Python correctly preserves original IDs." + ), }, } # Trackers to skip due to data quality issues in source Excel files SKIP_VALIDATION = { - "2024_Vietnam National Children Hospital A4D Tracker_patient_cleaned.parquet": "Excel has duplicate patient rows with conflicting data in Jul24", + "2024_Vietnam National Children Hospital A4D Tracker_patient_cleaned.parquet": ( + "Excel has duplicate patient rows with conflicting data in Jul24" + ), } # Columns to skip in data value comparison due to known extraction/processing differences @@ -73,7 +97,13 @@ # Use this when R has errors affecting many/all patients in specific columns for a file FILE_COLUMN_EXCEPTIONS = { "2025_06_Jayavarman VII Hospital A4D Tracker_patient_cleaned.parquet": { - "reason": "Excel cells contain Unicode '≥15' (U+2265). R's readxl reads raw Unicode. Python's openpyxl (data_only=True) normalizes to ASCII '>15'. R's regex grepl('>|<') only matches ASCII, fails to parse '≥15', results in error value 999999. R needs update to handle Unicode comparison operators (≥, ≤).", + "reason": ( + "Excel cells contain Unicode '≥15' (U+2265). R's readxl reads " + "raw Unicode. Python's openpyxl (data_only=True) normalizes to " + "ASCII '>15'. R's regex grepl('>|<') only matches ASCII, fails " + "to parse '≥15', results in error value 999999. R needs update " + "to handle Unicode comparison operators (≥, ≤)." + ), "skip_columns": [ "hba1c_baseline", "hba1c_baseline_exceeds", @@ -82,15 +112,32 @@ ], }, "2025_06_Kantha Bopha II Hospital A4D Tracker_patient_cleaned.parquet": { - "reason": "R BUG: Sets province to 'Undefined' for Takéo, Tboung Khmum, and Preah Sihanouk despite these being in allowed_provinces.yaml. Python now correctly validates and preserves these province names using sanitize_str(). All three provinces are properly listed in the YAML with correct UTF-8 encoding (Takéo has é as U+00E9). R's sanitize_str() should handle this by removing accents, but validation fails. Needs investigation in R's check_allowed_values() or YAML loading.", + "reason": ( + "R BUG: Sets province to 'Undefined' for Takéo, Tboung Khmum, " + "and Preah Sihanouk despite these being in " + "allowed_provinces.yaml. Python now correctly validates and " + "preserves these province names using sanitize_str(). All three " + "provinces are properly listed in the YAML with correct UTF-8 " + "encoding (Takéo has é as U+00E9). R's sanitize_str() should " + "handle this by removing accents, but validation fails. Needs " + "investigation in R's check_allowed_values() or YAML loading." + ), "skip_columns": ["province"], }, "2025_06_Mahosot Hospital A4D Tracker_patient_cleaned.parquet": { - "reason": "Patient LA_MH054 has invalid insulin_regimen value 'nph' (lowercase). R uppercases to 'NPH', Python preserves original. Both should reject as invalid.", + "reason": ( + "Patient LA_MH054 has invalid insulin_regimen value 'nph' " + "(lowercase). R uppercases to 'NPH', Python preserves original. " + "Both should reject as invalid." + ), "skip_columns": ["insulin_regimen"], }, "2025_06_Mandalay Children's Hospital A4D Tracker_patient_cleaned.parquet": { - "reason": "R has systematic extraction errors - sets error values (999999 or 9999-09-09) for most columns. Python correctly extracts data.", + "reason": ( + "R has systematic extraction errors - sets error values " + "(999999 or 9999-09-09) for most columns. " + "Python correctly extracts data." + ), "skip_columns": [ "age", "blood_pressure_updated", @@ -113,7 +160,10 @@ ], }, "2025_06_Mandalay General Hospital A4D Tracker_patient_cleaned.parquet": { - "reason": "R sets error value 999999 for t1d_diagnosis_age. Python correctly extracts values.", + "reason": ( + "R sets error value 999999 for t1d_diagnosis_age. " + "Python correctly extracts values." + ), "skip_columns": ["t1d_diagnosis_age"], }, "2025_06_NPH A4D Tracker_patient_cleaned.parquet": { @@ -150,7 +200,8 @@ "status", } -# Exceptions for required column validation - files where specific required columns have known null values +# Exceptions for required column validation +# Files where specific required columns have known null values # Format: {filename: {column: reason}} REQUIRED_COLUMN_EXCEPTIONS = { "2017_Mandalay Children's Hospital A4D Tracker_patient_cleaned.parquet": { @@ -163,7 +214,10 @@ "status": "Patient KH_CD008 has missing status in April 2019 in source Excel file", }, "2019_Mahosot Hospital A4D Tracker_patient_cleaned.parquet": { - "status": "Patient LA_MH005 has missing status in January and February 2019 in source Excel file", + "status": ( + "Patient LA_MH005 has missing status in January and " + "February 2019 in source Excel file" + ), }, "2019_Preah Kossamak Hospital A4D Tracker_patient_cleaned.parquet": { "status": "Patient KH_PK022 has missing status in August 2019 in source Excel file", @@ -178,7 +232,10 @@ "status": "Patient KH_KB017_PK has missing status in source Excel file", }, "2022_Chiang Mai Maharaj Nakorn A4D Tracker_patient_cleaned.parquet": { - "status": "Patients TH_CP027, TH_CP028, TH_CP029, TH_CP030 have missing status in source Excel file", + "status": ( + "Patients TH_CP027, TH_CP028, TH_CP029, TH_CP030 " + "have missing status in source Excel file" + ), }, "2022_Chulalongkorn Hospital A4D Tracker_patient_cleaned.parquet": { "status": "Patients TH_CH006, TH_CH007, TH_CH008 have missing status in source Excel file", @@ -190,7 +247,11 @@ "status": "Patient MY_LW013 has missing status in source Excel file", }, "2022_Mandalay Children's Hospital A4D Tracker_patient_cleaned.parquet": { - "status": "Patients MM_MD078, MM_MD079, MM_MD080, MM_MD081, MM_MD082, MM_MD083 have missing status in source Excel file", + "status": ( + "Patients MM_MD078, MM_MD079, MM_MD080, MM_MD081, " + "MM_MD082, MM_MD083 have missing status in " + "source Excel file" + ), }, "2022_Penang General Hospital A4D Tracker_patient_cleaned.parquet": { "status": "Patient MY_PN013 has missing status in source Excel file", @@ -240,27 +301,42 @@ PATIENT_LEVEL_EXCEPTIONS = { "2025_06_CDA A4D Tracker_patient_cleaned.parquet": { "KH_CD018": { - "reason": "R extraction error: missing 'Analog Insulin' value that Python correctly extracts", + "reason": ( + "R extraction error: missing 'Analog Insulin' value " + "that Python correctly extracts" + ), "skip_columns": ["insulin_type"], }, }, "2025_06_Jayavarman VII Hospital A4D Tracker_patient_cleaned.parquet": { "KH_JV078": { - "reason": "R sets error date '9999-09-09' for lost_date when Excel cell is empty. Python correctly extracts null.", + "reason": ( + "R sets error date '9999-09-09' for lost_date when " + "Excel cell is empty. Python correctly extracts null." + ), "skip_columns": ["lost_date"], }, }, "2025_06_Kantha Bopha II Hospital A4D Tracker_patient_cleaned.parquet": { "KH_KB023": { - "reason": "R extraction error: sex should be 'F' but R sets 'Undefined'. Python correctly extracts 'F'.", + "reason": ( + "R extraction error: sex should be 'F' but R sets " + "'Undefined'. Python correctly extracts 'F'." + ), "skip_columns": ["sex"], }, "KH_KB073": { - "reason": "R extraction error: missing 'Analog Insulin' value that Python correctly extracts", + "reason": ( + "R extraction error: missing 'Analog Insulin' value " + "that Python correctly extracts" + ), "skip_columns": ["insulin_type"], }, "KH_KB139": { - "reason": "R extraction error: missing 'Analog Insulin' value that Python correctly extracts", + "reason": ( + "R extraction error: missing 'Analog Insulin' value " + "that Python correctly extracts" + ), "skip_columns": ["insulin_type"], }, }, @@ -300,7 +376,7 @@ def test_output_directories_exist(): assert PY_OUTPUT_DIR.exists(), f"Python output directory not found: {PY_OUTPUT_DIR}" -@pytest.mark.parametrize("filename, r_path, py_path", get_all_tracker_files()) +@pytest.mark.parametrize(("filename", "r_path", "py_path"), get_all_tracker_files()) def test_record_count_matches(filename, r_path, py_path): """Test that record counts match between R and Python for each tracker. @@ -352,7 +428,7 @@ def test_record_count_matches(filename, r_path, py_path): ) -@pytest.mark.parametrize("filename, r_path, py_path", get_all_tracker_files()) +@pytest.mark.parametrize(("filename", "r_path", "py_path"), get_all_tracker_files()) def test_schema_matches(filename, r_path, py_path): """Test that column schemas match between R and Python for each tracker. @@ -380,7 +456,7 @@ def test_schema_matches(filename, r_path, py_path): assert not extra_in_py, f"{filename}: Extra columns in Python: {extra_in_py}" -@pytest.mark.parametrize("filename, r_path, py_path", get_all_tracker_files()) +@pytest.mark.parametrize(("filename", "r_path", "py_path"), get_all_tracker_files()) def test_patient_ids_match(filename, r_path, py_path): """Test that unique patient IDs match between R and Python for each tracker. @@ -440,7 +516,7 @@ def test_patient_ids_match(filename, r_path, py_path): assert not extra_in_py, f"{filename}: Extra patient_ids in Python: {extra_in_py}" -@pytest.mark.parametrize("filename, r_path, py_path", get_all_tracker_files()) +@pytest.mark.parametrize(("filename", "r_path", "py_path"), get_all_tracker_files()) def test_no_duplicate_records(filename, r_path, py_path): """Test that there are no duplicate (patient_id, tracker_month) combinations. @@ -478,11 +554,12 @@ def test_no_duplicate_records(filename, r_path, py_path): ) assert len(duplicates) == 0, ( - f"{filename}: Found {len(duplicates)} duplicate (patient_id, clinic_id, tracker_month) combinations" + f"{filename}: Found {len(duplicates)} duplicate " + f"(patient_id, clinic_id, tracker_month) combinations" ) -@pytest.mark.parametrize("filename, r_path, py_path", get_all_tracker_files()) +@pytest.mark.parametrize(("filename", "r_path", "py_path"), get_all_tracker_files()) def test_required_columns_not_null(filename, r_path, py_path): """Test that required columns are never null/empty in Python output. @@ -502,7 +579,7 @@ def test_required_columns_not_null(filename, r_path, py_path): # First, check if exceptions are still valid (alert if fixed) if filename in REQUIRED_COLUMN_EXCEPTIONS: - for col, reason in REQUIRED_COLUMN_EXCEPTIONS[filename].items(): + for col, _reason in REQUIRED_COLUMN_EXCEPTIONS[filename].items(): if col in df_py.columns: null_count = df_py[col].null_count() if null_count == 0: @@ -545,7 +622,7 @@ def test_file_coverage(self, tracker_files): missing_py = 0 available = 0 - for filename, r_path, py_path in tracker_files: + for filename, _r_path, py_path in tracker_files: if filename in SKIP_VALIDATION: skipped += 1 elif not py_path.exists(): @@ -566,7 +643,7 @@ def test_file_coverage(self, tracker_files): # Just report, don't assert - this is informational only -@pytest.mark.parametrize("filename, r_path, py_path", get_all_tracker_files()) +@pytest.mark.parametrize(("filename", "r_path", "py_path"), get_all_tracker_files()) def test_data_values_match(filename, r_path, py_path): """Test that data values match between R and Python for matching patients. @@ -597,7 +674,8 @@ def test_data_values_match(filename, r_path, py_path): common_cols = sorted(r_cols & py_cols) # Must have at least patient_id and tracker_month - assert "patient_id" in common_cols and "tracker_month" in common_cols + assert "patient_id" in common_cols + assert "tracker_month" in common_cols # Join on patient_id and tracker_month to compare matching records # Use inner join to only compare patients that exist in both diff --git a/a4d-python/tests/test_reference/test_provinces.py b/a4d-python/tests/test_reference/test_provinces.py index 30e4dca..61eb58d 100644 --- a/a4d-python/tests/test_reference/test_provinces.py +++ b/a4d-python/tests/test_reference/test_provinces.py @@ -68,7 +68,7 @@ def test_provinces_are_lowercased(self): """Test that all provinces are lowercased.""" provinces_by_country = load_provinces_by_country() - for country, provinces in provinces_by_country.items(): + for _country, provinces in provinces_by_country.items(): assert all(p == p.lower() for p in provinces) def test_includes_expected_countries(self): @@ -232,7 +232,7 @@ def test_case_insensitive_validation_comprehensive(self): provinces_by_country = load_provinces_by_country() # Get a few provinces from the data - thailand = provinces_by_country["THAILAND"] + provinces_by_country["THAILAND"] vietnam = provinces_by_country["VIETNAM"] # Test that both original case and variations work