Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/tensorswitch_v2/readers/czi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
"""

from typing import Dict, Optional, List
import numpy as np
# Import utility functions from v2 utils (independent from v1)
from ..utils import load_czi_stack, extract_czi_metadata
from ..utils.format_loaders import _get_czidoc
from .base import DaskReader

# Maps our axis labels back to the CZI plane-dict keys used by pylibCZIrw.
_AXIS_TO_CZI_KEY = {'t': 'V', 'c': 'C', 'z': 'Z'}


class CZIReader(DaskReader):
"""
Expand Down Expand Up @@ -132,6 +137,81 @@ def axes_order(self) -> List[str]:
self._load()
return ['t' if ax == 'v' else ax for ax in self._axes_order]

def _read_fn(self, domain, array, read_params):
"""Read a CZI plane directly from the cached czidoc, bypassing dask.

The base-class path goes through the 107K-task dask graph on every
call, adding 100-500 ms of graph-traversal overhead per plane even
though only one task needs to run. This override calls czidoc.read()
directly, keeping the same LRU plane cache for cross-shard reuse.

Locking strategy:
_chunk_cache_lock — guards the in-memory plane cache (fast, no IO)
czidoc read_lock — serialises czidoc.read() calls (not thread-safe)
"""
self._load()
axes = self._get_dimension_names() # e.g. ['t','c','z','y','x']
ndim = domain.ndim
origin = [int(domain.origin[i]) for i in range(ndim)]
shape = [int(domain.shape[i]) for i in range(ndim)]

# Build the CZI plane dict and locate y/x dimensions
plane_dict = {}
y_idx = x_idx = None
for i, ax in enumerate(axes):
if ax in _AXIS_TO_CZI_KEY:
plane_dict[_AXIS_TO_CZI_KEY[ax]] = origin[i]
elif ax == 'y':
y_idx = i
elif ax == 'x':
x_idx = i

y_start, x_start = origin[y_idx], origin[x_idx]
y_size, x_size = shape[y_idx], shape[x_idx]

# Cache key: all non-spatial coordinates (identifies the full plane)
plane_key = tuple(origin[i] for i, ax in enumerate(axes)
if ax not in ('y', 'x'))

# Slice index for writing into the output array (all leading dims = 0)
ns_idx = (0,) * (ndim - 2)

# --- cache hit ---
with self._chunk_cache_lock:
if plane_key in self._chunk_cache:
plane = self._chunk_cache[plane_key]
self._chunk_cache_order.remove(plane_key)
self._chunk_cache_order.append(plane_key)
array[ns_idx] = plane[y_start:y_start + y_size,
x_start:x_start + x_size]
return

# --- cache miss: read from CZI (outside cache lock) ---
czidoc, read_lock = _get_czidoc(self.path)
with read_lock:
result = czidoc.read(plane=plane_dict)

if result.ndim == 3 and result.shape[2] == 1:
result = result[:, :, 0]
result = np.ascontiguousarray(result)

# Store full plane in LRU cache
with self._chunk_cache_lock:
plane_bytes = result.nbytes
while (self._chunk_cache_bytes + plane_bytes > self._CHUNK_CACHE_MAX_BYTES
and self._chunk_cache_order):
evict_key = self._chunk_cache_order.pop(0)
evicted = self._chunk_cache.pop(evict_key, None)
if evicted is not None:
self._chunk_cache_bytes -= evicted.nbytes
if plane_bytes <= self._CHUNK_CACHE_MAX_BYTES:
self._chunk_cache[plane_key] = result
self._chunk_cache_order.append(plane_key)
self._chunk_cache_bytes += plane_bytes

array[ns_idx] = result[y_start:y_start + y_size,
x_start:x_start + x_size]

def __repr__(self) -> str:
view_str = f", view_index={self._view_index}" if self._view_index is not None else ""
return f"CZIReader(path='{self.path}'{view_str})"
45 changes: 37 additions & 8 deletions src/tensorswitch_v2/utils/format_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,38 @@
All voxel sizes are returned in nanometers (standard unit).
"""

import atexit
import os
import threading
import numpy as np
import dask.array as da

# Process-level cache of open CZI readers.
# Opening a CZI file parses the full subblock directory (tens of MB for large
# files with 100K+ planes), which takes seconds on NFS. Re-opening per plane
# read multiplies that cost by the number of planes and causes jobs to hang.
_czi_reader_cache: dict = {}
_czi_reader_locks: dict = {}
_czi_cache_init_lock = threading.Lock()


def _get_czidoc(czi_file: str):
"""Return a cached (czidoc, read_lock) pair for the given CZI path.

The file is opened once per process; subsequent calls return the cached
reader without re-parsing the subblock directory. A per-file Lock
serialises concurrent reads because czidoc.read() is not thread-safe.
"""
with _czi_cache_init_lock:
if czi_file not in _czi_reader_cache:
from pylibCZIrw import czi as _czi_mod
ctx = _czi_mod.open_czi(czi_file)
czidoc = ctx.__enter__()
atexit.register(ctx.__exit__, None, None, None)
_czi_reader_cache[czi_file] = czidoc
_czi_reader_locks[czi_file] = threading.Lock()
return _czi_reader_cache[czi_file], _czi_reader_locks[czi_file]


def convert_to_nanometers(value: float, unit: str) -> float:
"""
Expand Down Expand Up @@ -618,19 +646,20 @@ def extract_ims_metadata(ims_file):


def _read_czi_plane(czi_file, plane_dict, y_size, x_size):
"""
Read a single plane from CZI file.
"""Read a single plane from a CZI file using the process-level cached reader.

This function opens and closes the file for each read, which is necessary
for dask delayed execution where each task runs independently.
The first call for a given path opens the file and caches the reader.
Subsequent calls reuse the cached reader, avoiding repeated subblock-
directory parsing (which dominates wall time for large CZI files on NFS).
Reads are serialised with a per-file Lock because czidoc.read() is not
thread-safe.
"""
from pylibCZIrw import czi

with czi.open_czi(czi_file) as czidoc:
czidoc, lock = _get_czidoc(czi_file)
with lock:
result = czidoc.read(plane=plane_dict)
if result.ndim == 3 and result.shape[2] == 1:
result = result[:, :, 0]
return result
return result.copy()


def load_czi_stack(czi_file, view_index=None):
Expand Down
Loading