Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .coverage
Binary file not shown.
44 changes: 41 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: Test Pull Request
name: Tests

on:
push:
branches: [main]
pull_request:

jobs:
Expand All @@ -23,5 +25,41 @@ jobs:
- name: Install dependencies
run: uv sync --group dev

- name: Run tests
run: uv run pytest -vv -W error
- name: Run tests with coverage
run: uv run pytest -vv -W error --cov=transformplan --cov-report=xml --cov-report=term

coverage-badge:
needs: test
runs-on: ubuntu-22.04
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
permissions:
contents: write

steps:
- uses: actions/checkout@v5

- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
cache-dependency-glob: pyproject.toml
python-version: "3.12"

- name: Install dependencies
run: uv sync --group dev

- name: Generate coverage report
run: uv run pytest --cov=transformplan --cov-report=xml --cov-report=term

- name: Create coverage badge
uses: tj-actions/coverage-badge-py@v2
with:
output: coverage.svg

- name: Commit badge
run: |
git config --local user.email "github-actions[bot]@users.noreply.github.com"
git config --local user.name "github-actions[bot]"
git add coverage.svg
git diff --staged --quiet || git commit -m "chore: update coverage badge [skip ci]"
git push
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
![Coverage](./coverage.svg)

# TransformPlan: Auditable Data Transformation Pipelines

<table>
Expand Down Expand Up @@ -36,7 +38,7 @@ plan = (
.dt_age_years(column="date_of_birth", new_column="age")
.math_clamp(column="age", min_value=0, max_value=120)

# Categorize patients
# Categorize patients age
.map_discretize(column="age", bins=[18, 40, 65], labels=["young", "adult", "senior"], new_column="age_group")

# Filter and clean
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dev = [
"mkdocs>=1.6.0",
"mkdocs-material>=9.5.0",
"mkdocstrings[python]>=0.24.0",
"pytest-cov>=7.0.0",
]

[build-system]
Expand Down
85 changes: 59 additions & 26 deletions transformplan/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def set_operations(self, operations: list[dict[str, Any]]) -> None:
"""Record the operations that were applied."""
self._operations = operations

def set_metadata(self, **kwargs: Any) -> None:
def set_metadata(self, **kwargs: Any) -> None: # noqa: ANN401
"""Set arbitrary metadata on the protocol."""
self._metadata.update(kwargs)

Expand All @@ -275,32 +275,56 @@ def add_chunk(self, chunk_info: ChunkInfo) -> None:

@property
def chunks(self) -> list[ChunkInfo]:
"""List of chunk information."""
"""List of chunk information.

Returns:
List of ChunkInfo instances.
"""
return self._chunks

@property
def total_input_rows(self) -> int:
"""Total rows across all input chunks."""
"""Total rows across all input chunks.

Returns:
Sum of input rows.
"""
return sum(c.input_rows for c in self._chunks)

@property
def total_output_rows(self) -> int:
"""Total rows across all output chunks."""
"""Total rows across all output chunks.

Returns:
Sum of output rows.
"""
return sum(c.output_rows for c in self._chunks)

@property
def total_elapsed_seconds(self) -> float:
"""Total processing time across all chunks."""
"""Total processing time across all chunks.

Returns:
Sum of elapsed seconds.
"""
return sum(c.elapsed_seconds for c in self._chunks)

@property
def num_chunks(self) -> int:
"""Number of chunks processed."""
"""Number of chunks processed.

Returns:
Count of chunks.
"""
return len(self._chunks)

@property
def metadata(self) -> dict[str, Any]:
"""Protocol metadata."""
"""Protocol metadata.

Returns:
Dictionary of metadata.
"""
return self._metadata

def output_hash(self) -> str:
Expand Down Expand Up @@ -409,23 +433,29 @@ def from_json(cls, source: str | Path) -> ChunkedProtocol:
Returns:
ChunkedProtocol instance.
"""
if isinstance(source, Path) or (
isinstance(source, str) and not source.strip().startswith("{")
):
if isinstance(source, Path) or not source.strip().startswith("{"):
content = Path(source).read_text()
else:
content = source

return cls.from_dict(json.loads(content))

def __repr__(self) -> str:
"""Return string representation of the protocol."""
"""Return string representation of the protocol.

Returns:
Human-readable representation.
"""
return (
f"ChunkedProtocol({self.num_chunks} chunks, {self.total_input_rows} rows)"
)

def __len__(self) -> int:
"""Return number of chunks processed."""
"""Return number of chunks processed.

Returns:
Count of chunks.
"""
return self.num_chunks

def summary(self) -> str:
Expand All @@ -452,29 +482,32 @@ def summary(self) -> str:
lines.append(f"Partition key: {self._partition_key}")
if self._chunk_size:
lines.append(f"Target chunk size: {self._chunk_size:,}")
lines.append("-" * 70)

# Summary stats
lines.append(f"Chunks processed: {self.num_chunks}")
lines.append(f"Total input rows: {self.total_input_rows:,}")
lines.append(f"Total output rows: {self.total_output_rows:,}")
lines.extend(
[
"-" * 70,
f"Chunks processed: {self.num_chunks}",
f"Total input rows: {self.total_input_rows:,}",
f"Total output rows: {self.total_output_rows:,}",
]
)
rows_diff = self.total_output_rows - self.total_input_rows
if rows_diff != 0:
lines.append(f"Row change: {rows_diff:+,}")
lines.append(f"Total time: {self.total_elapsed_seconds:.4f}s")
if self.num_chunks > 0:
avg_time = self.total_elapsed_seconds / self.num_chunks
lines.append(f"Avg time per chunk: {avg_time:.4f}s")
lines.append(f"Output hash: {self.output_hash()}")
lines.append("-" * 70)
lines.extend((f"Output hash: {self.output_hash()}", "-" * 70))

# Per-chunk details
if self._chunks:
lines.append("")
lines.append(
f"{'#':<6} {'Input':<12} {'Output':<12} {'Change':<10} {'Time':<10} {'Hash':<16}"
lines.extend(
(
"",
f"{'#':<6} {'Input':<12} {'Output':<12} {'Change':<10} {'Time':<10} {'Hash':<16}",
"-" * 70,
)
)
lines.append("-" * 70)

for chunk in self._chunks:
idx = str(chunk.chunk_index)
Expand All @@ -494,10 +527,10 @@ def summary(self) -> str:

def print(self) -> None:
"""Print the protocol summary to stdout."""
print(self.summary())
print(self.summary()) # noqa: T201


def validate_chunked_pipeline(
def validate_chunked_pipeline( # noqa: C901
operations: list[tuple[Any, dict[str, Any]]],
partition_key: str | list[str] | None = None,
) -> ChunkValidationResult:
Expand Down
Loading