Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.git
.github
.Rproj.user
.Rhistory
.RData
*.Rproj
a4d-python/.pytest_cache
a4d-python/.ruff_cache
a4d-python/htmlcov
a4d-python/.coverage
a4d-python/profiling/*.prof
data/
secrets/
2 changes: 1 addition & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
enable-cache: true

- name: Set up Python
run: uv python install 3.11
run: uv python install 3.14

- name: Install dependencies
run: uv sync --all-extras
Expand Down
25 changes: 12 additions & 13 deletions a4d-python/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.11-slim
FROM python:3.14-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
g++ \
curl \
Expand All @@ -13,23 +13,22 @@ ENV PATH="/root/.cargo/bin:${PATH}"

WORKDIR /app

# Copy dependency files
COPY pyproject.toml ./
# Copy dependency files first to leverage Docker layer caching
COPY a4d-python/pyproject.toml a4d-python/uv.lock ./

# Install dependencies
RUN uv sync --frozen
# Install production dependencies only
RUN uv sync --frozen --no-dev

# Copy application code
COPY src/ src/
COPY scripts/ scripts/
COPY a4d-python/src/ src/

# Copy reference data from parent directory
# (This will be mounted or copied during build)
COPY ../reference_data/ reference_data/
# Copy reference data from the repo root
COPY reference_data/ reference_data/

# Set environment
ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1
ENV A4D_DATA_ROOT=/workspace/data

# Default command
CMD ["uv", "run", "python", "scripts/run_pipeline.py"]
# Run the full pipeline: download → process → upload to GCS → ingest into BigQuery
CMD ["uv", "run", "a4d", "run-pipeline"]
5 changes: 2 additions & 3 deletions a4d-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ name = "a4d"
version = "2.0.0"
description = "A4D Medical Tracker Data Processing Pipeline (Python)"
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.14"
authors = [
{name = "Michael Aydinbas", email = "michael.aydinbas@gmail.com"}
]
license = {text = "MIT"}

dependencies = [
"polars>=0.20.0",
"duckdb>=0.10.0",
"pydantic>=2.6.0",
"pydantic-settings>=2.2.0",
"pandera[polars]>=0.18.0",
Expand Down Expand Up @@ -47,7 +46,7 @@ build-backend = "hatchling.build"

[tool.ruff]
line-length = 100
target-version = "py311"
target-version = "py314"
lint.select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
Expand Down
120 changes: 118 additions & 2 deletions a4d-python/src/a4d/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,124 @@ def upload_output_cmd(
raise typer.Exit(1) from e


@app.command("version")
def version_cmd():
@app.command("run-pipeline")
def run_pipeline_cmd(
workers: Annotated[
int, typer.Option("--workers", "-w", help="Number of parallel workers (1 = sequential)")
] = 4,
force: Annotated[
bool, typer.Option("--force", help="Force reprocessing (ignore existing outputs)")
] = False,
skip_upload: Annotated[
bool,
typer.Option("--skip-upload", help="Skip GCS and BigQuery uploads (local testing)"),
] = False,
):
"""Run the full end-to-end A4D pipeline.

Executes all pipeline stages in sequence:
1. Download tracker files from Google Cloud Storage
2. Extract and clean all tracker files
3. Create final tables (static, monthly, annual)
4. Upload output files to Google Cloud Storage
5. Ingest tables into BigQuery

All configuration is read from environment variables (A4D_*) or a .env file.

\b
Examples:
# Full pipeline with 4 workers
uv run a4d run-pipeline

# Force reprocess all files
uv run a4d run-pipeline --force

# Local testing without GCS/BigQuery uploads
uv run a4d run-pipeline --skip-upload
"""
from a4d.config import settings
from a4d.gcp.bigquery import load_pipeline_tables
from a4d.gcp.storage import download_tracker_files, upload_output

console.print("\n[bold blue]A4D Full Pipeline[/bold blue]\n")
console.print(f"Data root: {settings.data_root}")
console.print(f"Output root: {settings.output_root}")
console.print(f"Workers: {workers}")
console.print(f"Project: {settings.project_id}")
console.print(f"Dataset: {settings.dataset}\n")

# Step 1 – Download tracker files from GCS
if not skip_upload:
console.print("[bold]Step 1/5:[/bold] Downloading tracker files from GCS...")
try:
downloaded = download_tracker_files(destination=settings.data_root)
console.print(f" ✓ Downloaded {len(downloaded)} files\n")
except Exception as e:
console.print(f"\n[bold red]Error during download: {e}[/bold red]\n")
raise typer.Exit(1) from e
else:
console.print("[bold]Step 1/5:[/bold] Skipping GCS download (--skip-upload)\n")

# Step 2+3 – Extract, clean and build tables
console.print("[bold]Steps 2–3/5:[/bold] Processing tracker files...\n")
try:
result = run_patient_pipeline(
max_workers=workers,
force=force,
show_progress=True,
console_log_level="WARNING",
)

console.print(
f" ✓ Processed {result.total_trackers} trackers "
f"({result.successful_trackers} ok, {result.failed_trackers} failed)\n"
)

if result.failed_trackers > 0:
console.print("[bold yellow]Failed trackers:[/bold yellow]")
for tr in result.tracker_results:
if not tr.success:
console.print(f" • {tr.tracker_file.name}: {tr.error}")
console.print()

if not result.success:
console.print("[bold red]✗ Pipeline failed – aborting upload steps[/bold red]\n")
raise typer.Exit(1)

except Exception as e:
console.print(f"\n[bold red]Error during processing: {e}[/bold red]\n")
raise typer.Exit(1) from e

tables_dir = settings.output_root / "tables"

# Step 4 – Upload output to GCS
if not skip_upload:
console.print("[bold]Step 4/5:[/bold] Uploading output files to GCS...")
try:
uploaded = upload_output(source_dir=settings.output_root)
console.print(f" ✓ Uploaded {len(uploaded)} files\n")
except Exception as e:
console.print(f"\n[bold red]Error during GCS upload: {e}[/bold red]\n")
raise typer.Exit(1) from e
else:
console.print("[bold]Step 4/5:[/bold] Skipping GCS upload (--skip-upload)\n")

# Step 5 – Ingest tables into BigQuery
if not skip_upload:
console.print("[bold]Step 5/5:[/bold] Ingesting tables into BigQuery...")
try:
bq_results = load_pipeline_tables(tables_dir=tables_dir)
console.print(f" ✓ Loaded {len(bq_results)} tables into BigQuery\n")
except Exception as e:
console.print(f"\n[bold red]Error during BigQuery upload: {e}[/bold red]\n")
raise typer.Exit(1) from e
else:
console.print("[bold]Step 5/5:[/bold] Skipping BigQuery upload (--skip-upload)\n")

console.print("[bold green]✓ Full pipeline completed successfully![/bold green]\n")



"""Show version information."""
console.print("[bold cyan]A4D Pipeline v0.1.0[/bold cyan]")
console.print("Python implementation of the A4D medical tracker processing pipeline")
Expand Down
Loading
Loading