Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/reference/ray/aggregations/tensor_mean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ratiopath.ray.aggregate.TensorMean

::: ratiopath.ray.aggregate.TensorMean
3 changes: 3 additions & 0 deletions docs/reference/ray/aggregations/tensor_std.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ratiopath.ray.aggregate.TensorStd

::: ratiopath.ray.aggregate.TensorStd
3 changes: 3 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ nav:
- estimate_stain_vectors: reference/augmentations/estimate_stain_vectors.md
- StainAugmentor: reference/augmentations/stain_augmentor.md
- Ray:
- Aggregations:
- tensor_mean: reference/ray/aggregations/tensor_mean.md
- tensor_std: reference/ray/aggregations/tensor_std.md
- read_slides: reference/ray/read_slides.md
- VipsTiffDatasink: reference/ray/vips_tiff_datasink.md
- OpenSlide: reference/openslide.md
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ratiopath"
version = "1.2.0"
version = "1.3.0"
description = "A library for efficient processing and analysis of whole-slide pathology images."
authors = [
{ name = "Matěj Pekár", email = "matejpekar@mail.muni.cz" },
Expand All @@ -10,7 +10,7 @@ authors = [
readme = "README.md"
license = "MIT"
license-files = ["LICENSE"]
requires-python = ">=3.12"
requires-python = ">=3.12,<3.14"
dependencies = [
"albumentations>=2.0.8",
"imagecodecs>=2025.8.2",
Expand Down
5 changes: 5 additions & 0 deletions ratiopath/ray/aggregate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from ratiopath.ray.aggregate.tensor_mean import TensorMean
from ratiopath.ray.aggregate.tensor_std import TensorStd


__all__ = ["TensorMean", "TensorStd"]
146 changes: 146 additions & 0 deletions ratiopath/ray/aggregate/tensor_mean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from typing import cast

import numpy as np

from ray.data.aggregate import AggregateFnV2
from ray.data.block import Block, BlockAccessor


class TensorMean(AggregateFnV2[dict, np.ndarray | float]):
"""Calculates the mean (average) of a column containing Tensors.

This aggregator treats the data column as a high-dimensional array where
**axis 0 represents the batch dimension**. To satisfy the requirements
of a reduction and prevent memory growth proportional to the number of rows,
axis 0 must be included in the aggregation.


Args:
on: The name of the column containing tensors or numbers.
axis: The axis or axes along which the reduction is computed.
- `None`: Global reduction. Collapses all dimensions (including batch)
to a single scalar.
- `int`: Aggregates over both the batch (axis 0) AND the specified
tensor dimension. For example, `axis=1` collapses the batch and
the first dimension of the tensors.
- `tuple`: A sequence of axes that **must** explicitly include `0`.
ignore_nulls: Whether to ignore null values. Defaults to True.
alias_name: Optional name for the resulting column. Defaults to "mean(<on>)".

Raises:
ValueError: If `axis` is provided as a tuple but does not include `0`.

Note:
This aggregator is designed for "reduction" operations. If you wish to
calculate statistics per-row without collapsing the batch dimension,
use `.map()` instead.

Example:
>>> import ray
>>> import numpy as np
>>> from ratiopath.ray.aggregate import TensorMean
>>> # Dataset with 2x2 matrices: total shape (Batch=2, Dim1=2, Dim2=2)
>>> ds = ray.data.from_items(
... [
... {"m": np.array([[1, 1], [1, 1]])},
... {"m": np.array([[3, 3], [3, 3]])},
... ]
... )
>>> # 1. Global Mean (axis=None) -> Result: 2.0
>>> ds.aggregate(TensorMean(on="m", axis=None))
>>>
>>> # 2. Batch Mean (axis=0) -> Result: np.array([[2, 2], [2, 2]])
>>> ds.aggregate(TensorMean(on="m", axis=0))
>>>
>>> # 3. Mean across Batch and Rows (axis=(0, 1)) -> Result: np.array([2, 2])
>>> ds.aggregate(TensorMean(on="m", axis=(0, 1)))
"""

_aggregate_axis: tuple[int, ...] | None = None

def __init__(
self,
on: str,
axis: int | tuple[int, ...] | None = None,
ignore_nulls: bool = True,
alias_name: str | None = None,
):
super().__init__(
name=alias_name if alias_name else f"mean({on})",
on=on,
ignore_nulls=ignore_nulls,
# Initialize with identity values for summation
zero_factory=self.zero_factory,
)

if axis is not None:
axes = {0, axis} if isinstance(axis, int) else set(axis)

if 0 not in axes:
raise ValueError(
f"Invalid axis configuration: {axis}. Axis 0 (the batch dimension) "
"must be included to perform a reduction. To process rows "
"independently without collapsing the batch, use .map() instead."
)

self._aggregate_axis = tuple(axes)

@staticmethod
def zero_factory() -> dict:
return {"sum": 0, "shape": None, "count": 0}

def aggregate_block(self, block: Block) -> dict:
block_acc = BlockAccessor.for_block(block)

# Get exact counts before any NumPy conversion obscures the nulls
valid_count = cast(
"int",
block_acc.count(self._target_col_name, ignore_nulls=True), # type: ignore [arg-type]
)
total_count = cast(
"int",
block_acc.count(self._target_col_name, ignore_nulls=False), # type: ignore [arg-type]
)

# Catch nulls immediately if strict mode is on
if valid_count < total_count and not self._ignore_nulls:
raise ValueError(
f"Column '{self._target_col_name}' contains null values, but "
"ignore_nulls is False."
)

if valid_count == 0:
return self.zero_factory()

col_np = cast("np.ndarray", block_acc.to_numpy(self._target_col_name))

# Filter out nulls if necessary
if valid_count < total_count:
valid_tensors = [x for x in col_np if x is not None]
col_np = np.stack(valid_tensors)

# Perform the partial sum and calculate how many elements contributed
block_sum = np.sum(col_np, axis=self._aggregate_axis)
block_count = col_np.size // block_sum.size

return {
"sum": block_sum.flatten(),
"shape": block_sum.shape,
"count": block_count,
}

def combine(self, current_accumulator: dict, new: dict) -> dict:
return {
"sum": np.asarray(current_accumulator["sum"]) + np.asarray(new["sum"]),
"shape": current_accumulator["shape"] or new["shape"],
"count": current_accumulator["count"] + new["count"],
}

def finalize(self, accumulator: dict) -> np.ndarray | float: # type: ignore [override]
count = accumulator["count"]

if count == 0:
return np.nan

# Reshape the flattened sum back to original aggregated dimensions
return np.asarray(accumulator["sum"]).reshape(accumulator["shape"]) / count
190 changes: 190 additions & 0 deletions ratiopath/ray/aggregate/tensor_std.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from typing import cast

import numpy as np

from ray.data.aggregate import AggregateFnV2
from ray.data.block import Block, BlockAccessor


class TensorStd(AggregateFnV2[dict, np.ndarray | float]):
"""Calculates the standard deviation of a column containing Tensors.

This aggregator treats the data column as a high-dimensional array where
**axis 0 represents the batch dimension**. To satisfy the requirements
of a reduction and prevent memory growth proportional to the number of rows,
axis 0 must be included in the aggregation.

It uses a parallel variance accumulation algorithm (Chan's method) to maintain
numerical stability while processing data across multiple Ray blocks.

Args:
on: The name of the column containing tensors or numbers.
axis: The axis or axes along which the reduction is computed.
- `None`: Global reduction. Collapses all dimensions (including batch)
to a single scalar.
- `int`: Aggregates over both the batch (axis 0) AND the specified
tensor dimension. For example, `axis=1` collapses the batch and
the first dimension of the tensors.
- `tuple`: A sequence of axes that **must** explicitly include `0`.
ddof: Delta Degrees of Freedom. The divisor used in calculations
is $N - ddof$, where $N$ represents the number of elements.
Defaults to 1.0 (sample standard deviation).
ignore_nulls: Whether to ignore null values. Defaults to True.
alias_name: Optional name for the resulting column. Defaults to "std(<on>)".

Raises:
ValueError: If `axis` is provided as a tuple but does not include `0`.

Note:
This aggregator is designed for "reduction" operations. If you wish to
calculate statistics per-row without collapsing the batch dimension,
use `.map()` instead.

Example:
>>> import ray
>>> import numpy as np
>>> from ratiopath.ray.aggregate import TensorStd
>>> ds = ray.data.from_items(
... [
... {"m": np.array([[1, 2], [1, 2]])},
... {"m": np.array([[5, 6], [5, 6]])},
... ]
... )
>>> # 1. Global Std (axis=None) -> All elements reduced to one scalar
>>> ds.aggregate(TensorStd(on="m", axis=None))
>>>
>>> # 2. Batch Std (axis=0) -> Result is a 2x2 matrix of std values
>>> # calculated across the dataset rows.
>>> ds.aggregate(TensorStd(on="m", axis=0))
>>>
>>> # 3. Int shorthand (axis=1) -> Internally uses axis=(0, 1)
>>> # Collapses batch and the first dimension of the tensor.
>>> ds.aggregate(TensorStd(on="m", axis=1))
"""

_aggregate_axis: tuple[int, ...] | None = None

def __init__(
self,
on: str,
axis: int | tuple[int, ...] | None = None,
ddof: float = 1.0,
ignore_nulls: bool = True,
alias_name: str | None = None,
):
super().__init__(
name=alias_name if alias_name else f"std({on})",
on=on,
ignore_nulls=ignore_nulls,
zero_factory=self.zero_factory,
)

self._ddof = ddof

if axis is not None:
axes = {0, axis} if isinstance(axis, int) else set(axis)

if 0 not in axes:
raise ValueError(
f"Invalid axis configuration: {axis}. Axis 0 (the batch dimension) "
"must be included to perform a reduction. To process rows "
"independently without collapsing the batch, use .map() instead."
)

self._aggregate_axis = tuple(axes)

@staticmethod
def zero_factory() -> dict:
return {"mean": 0, "ssd": 0, "shape": None, "count": 0}

def aggregate_block(self, block: Block) -> dict:
block_acc = BlockAccessor.for_block(block)

# Get exact counts before any NumPy conversion obscures the nulls
valid_count = cast(
"int",
block_acc.count(self._target_col_name, ignore_nulls=True), # type: ignore [arg-type]
)
total_count = cast(
"int",
block_acc.count(self._target_col_name, ignore_nulls=False), # type: ignore [arg-type]
)

# Catch nulls immediately if strict mode is on
if valid_count < total_count and not self._ignore_nulls:
raise ValueError(
f"Column '{self._target_col_name}' contains null values, but "
"ignore_nulls is False."
)

if valid_count == 0:
return self.zero_factory()

col_np = cast("np.ndarray", block_acc.to_numpy(self._target_col_name))

# Filter out nulls if necessary
if valid_count < total_count:
valid_tensors = [x for x in col_np if x is not None]
col_np = np.stack(valid_tensors)

# Partial sum and element count
block_sum = np.sum(col_np, axis=self._aggregate_axis, keepdims=True)
block_count = col_np.size // block_sum.size

mean = block_sum / block_count
block_ssd = np.sum((col_np - mean) ** 2, axis=self._aggregate_axis)

return {
"mean": mean.ravel(),
"ssd": block_ssd.ravel(),
"shape": block_ssd.shape,
"count": block_count,
}

def combine(self, current_accumulator: dict, new: dict) -> dict:
if new["count"] == 0:
return current_accumulator

if current_accumulator["count"] == 0:
return new

n_current = current_accumulator["count"]
n_new = new["count"]
combined_count = n_current + n_new

mean_current = np.asarray(current_accumulator["mean"])
mean_new = np.asarray(new["mean"])

delta = mean_new - mean_current

# Chan's formula for the combined true mean
combined_mean = (mean_current * n_current + mean_new * n_new) / combined_count

combined_ssd = (
np.asarray(current_accumulator["ssd"])
+ np.asarray(new["ssd"])
+ (delta**2 * n_current * n_new / combined_count)
)

return {
"mean": combined_mean,
"ssd": combined_ssd,
"shape": new["shape"],
"count": combined_count,
}

def finalize(self, accumulator: dict) -> np.ndarray | float: # type: ignore [override]
count = accumulator["count"]

if count - self._ddof <= 0:
return np.nan

# np.maximum added as a safety net. Floating point jitter can occasionally
# result in trivially negative numbers (e.g., -1e-16), which crashes np.sqrt
variance = np.maximum(
0.0,
np.asarray(accumulator["ssd"]).reshape(accumulator["shape"])
/ (count - self._ddof),
)

return np.sqrt(variance)
Loading
Loading