From 6b29f16f450516b2cc5ad280a0ae483ffba9f9d0 Mon Sep 17 00:00:00 2001 From: zubovy Date: Thu, 28 Aug 2025 10:58:19 -0400 Subject: [PATCH 1/8] add single-scale ome-ngff (0.4) metadata generation when creating a zarr array. --- cellmap_flow/blockwise/blockwise_processor.py | 27 ++++++++++++++--- cellmap_flow/utils/ds.py | 30 +++++++++++++++++++ 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/cellmap_flow/blockwise/blockwise_processor.py b/cellmap_flow/blockwise/blockwise_processor.py index ba177ea..eba4bc4 100644 --- a/cellmap_flow/blockwise/blockwise_processor.py +++ b/cellmap_flow/blockwise/blockwise_processor.py @@ -6,13 +6,16 @@ import numpy as np from funlib.geometry.coordinate import Coordinate from funlib.persistence import Array, open_ds, prepare_ds -from zarr.storage import DirectoryStore +from zarr.storage import NestedDirectoryStore +from zarr.hierarchy import open_group + from cellmap_flow.globals import g from cellmap_flow.image_data_interface import ImageDataInterface from cellmap_flow.inferencer import Inferencer from cellmap_flow.utils.config_utils import build_models, load_config from cellmap_flow.utils.serilization_utils import get_process_dataset +from cellmap_flow.utils.ds import generate_singlescale_metadata logger = logging.getLogger(__name__) @@ -121,23 +124,39 @@ def __init__(self, yaml_config: str, create=False): if create: try: array = prepare_ds( - DirectoryStore(self.output_path / channel / "s0"), + NestedDirectoryStore(self.output_path / channel / "s0"), output_shape, dtype=self.dtype, chunk_shape=self.block_shape, voxel_size=self.output_voxel_size, axis_names=["z", "y", "x"], - units=["nm", "nm", "nm"], + units=["nanometer",]*3, offset=(0, 0, 0), ) except Exception as e: raise Exception( f"Failed to prepare {self.output_path/channel/'s0'} \n try deleting it manually and run again ! {e}" ) + try: + z_store = NestedDirectoryStore(self.output_path / channel) + zg = open_group(store=z_store, mode='a') + if 'multiscales' in list(zg.attrs): + raise ValueError(f'multiscales attribute already exists in {z_store.path}') + else: + zattrs = generate_singlescale_metadata(ds_name='s0', + voxel_size=self.output_voxel_size, + translation=[0.0,]*3, + units=['nanometer',]*3, + axes=['z', 'y', 'x']) + zg.attrs['multiscales'] = zattrs['multiscales'] + except Exception as e: + raise Exception( + f"Failed to prepare ome-ngff metadata for {self.output_path/channel/'s0'}, {e}" + ) else: try: array = open_ds( - DirectoryStore(self.output_path / channel / "s0"), + NestedDirectoryStore(self.output_path / channel / "s0"), "a", ) except Exception as e: diff --git a/cellmap_flow/utils/ds.py b/cellmap_flow/utils/ds.py index cb01723..a58f87f 100644 --- a/cellmap_flow/utils/ds.py +++ b/cellmap_flow/utils/ds.py @@ -16,6 +16,36 @@ from cellmap_flow.globals import g +def generate_singlescale_metadata( + ds_name: str, + voxel_size: list, + translation: list, + units: str, + axes: list, +): + z_attrs: dict = {"multiscales": [{}]} + z_attrs["multiscales"][0]["axes"] = [ + {"name": axis, "type": "space", "unit": units} for axis in axes + ] + z_attrs["multiscales"][0]["coordinateTransformations"] = [ + {"scale": [1.0, 1.0, 1.0], "type": "scale"} + ] + z_attrs["multiscales"][0]["datasets"] = [ + { + "coordinateTransformations": [ + {"scale": voxel_size, "type": "scale"}, + {"translation": translation, "type": "translation"}, + ], + "path": ds_name, + } + ] + + z_attrs["multiscales"][0]["name"] = "" + z_attrs["multiscales"][0]["version"] = "0.4" + + return z_attrs + + def get_scale_info(zarr_grp): attrs = zarr_grp.attrs From 4e32157d13857486b52cca01f905c00bcee27d65 Mon Sep 17 00:00:00 2001 From: Marwan Zouinkhi Date: Tue, 2 Sep 2025 12:18:11 -0400 Subject: [PATCH 2/8] track progress --- cellmap_flow/blockwise/blockwise_processor.py | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/cellmap_flow/blockwise/blockwise_processor.py b/cellmap_flow/blockwise/blockwise_processor.py index ba177ea..b4e95f6 100644 --- a/cellmap_flow/blockwise/blockwise_processor.py +++ b/cellmap_flow/blockwise/blockwise_processor.py @@ -7,7 +7,7 @@ from funlib.geometry.coordinate import Coordinate from funlib.persistence import Array, open_ds, prepare_ds from zarr.storage import DirectoryStore - +from functools import partial from cellmap_flow.globals import g from cellmap_flow.image_data_interface import ImageDataInterface from cellmap_flow.inferencer import Inferencer @@ -36,6 +36,7 @@ def __init__(self, yaml_config: str, create=False): self.output_path = self.config["output_path"] self.output_path = Path(self.output_path) + output_channels = None if "output_channels" in self.config: output_channels = self.config["output_channels"].split(",") @@ -50,12 +51,15 @@ def __init__(self, yaml_config: str, create=False): if "workers" not in self.config: logger.error("Missing required field in YAML: workers") return + + task_name = self.config["task_name"] self.workers = self.config["workers"] if self.workers <= 1: logger.error("Workers should be greater than 1.") return self.cpu_workers = self.config.get("cpu_workers", 12) - if "create" in self.config: + # Added and create == True to fix client error when create: True in the yaml, so when it is a client it will not be changed + if "create" in self.config and create == True: create = self.config["create"] if isinstance(create, str): logger.warning( @@ -63,7 +67,13 @@ def __init__(self, yaml_config: str, create=False): ) create = create.lower() == "true" - task_name = self.config["task_name"] + if "tmp_dir" not in self.config: + logger.error("Missing required field in YAML: tmp_dir, it is mandatory to track progress") + return + + self.tmp_dir = Path(self.config["tmp_dir"]) / f"tmp_flow_daisy_progress_{task_name}" + if not self.tmp_dir.exists(): + self.tmp_dir.mkdir(parents=True, exist_ok=True) # Build model configuration objects models = build_models(self.config["models"]) @@ -101,7 +111,7 @@ def __init__(self, yaml_config: str, create=False): if json_data: g.input_norms, g.postprocess = get_process_dataset(json_data) - self.inferencer = Inferencer(self.model_config) + self.inferencer = Inferencer(self.model_config, use_half_prediction=False) self.idi_raw = ImageDataInterface( self.input_path, voxel_size=self.input_voxel_size @@ -156,9 +166,6 @@ def process_fn(self, block): chunk_data = chunk_data.astype(self.dtype) - if self.output_arrays[0][block.write_roi].any(): - return - for i, array in enumerate(self.output_arrays): if chunk_data.shape == 3: @@ -177,7 +184,8 @@ def process_fn(self, block): self.output_voxel_size, ) array[write_roi] = predictions.to_ndarray(write_roi) - logger.info(f"Processed block {block.id} with write ROI {write_roi}") + + def client(self): client = daisy.Client() @@ -185,9 +193,14 @@ def client(self): with client.acquire_block() as block: if block is None: break - self.process_fn(block) + try: + self.process_fn(block) - block.status = daisy.BlockStatus.SUCCESS + block.status = daisy.BlockStatus.SUCCESS + (self.tmp_dir / f"{block.block_id[1]}").touch() + except Exception as e: + logger.error(f"Error processing block {block}: {e}") + block.status = daisy.BlockStatus.FAILED def run(self): @@ -217,17 +230,18 @@ def run(self): self.queue, ncpu=self.cpu_workers, ), - read_write_conflict=True, + check_function=partial(check_block, self.tmp_dir), + read_write_conflict=False, fit="overhang", max_retries=0, timeout=None, num_workers=self.workers, ) - daisy.run_blockwise([task]) - # , multiprocessing= False - - + task_state = daisy.run_blockwise([task]) + logger.info(f"Task state: {task_state}") +def check_block(tmp_dir, block: daisy.Block) -> bool: + return (tmp_dir / f"{block.block_id[1]}").exists() def spawn_worker(name, yaml_config, charge_group, queue, ncpu=12): def run_worker(): if not Path("prediction_logs").exists(): From 8f7b963104121ff841787de81eae3193fa985147 Mon Sep 17 00:00:00 2001 From: Marwan Zouinkhi Date: Tue, 2 Sep 2025 12:18:37 -0400 Subject: [PATCH 3/8] support diff size fly --- cellmap_flow/cli/fly_model.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cellmap_flow/cli/fly_model.py b/cellmap_flow/cli/fly_model.py index a9dd12f..1cabb44 100644 --- a/cellmap_flow/cli/fly_model.py +++ b/cellmap_flow/cli/fly_model.py @@ -31,7 +31,8 @@ def main(): if "charge_group" not in data: raise ValueError("charge_group is required in the YAML file") charge_group = data["charge_group"] - + input_size = tuple(data.get("input_size", (178, 178, 178))) + output_size = tuple(data.get("output_size", (56, 56, 56))) g.charge_group = charge_group threads = [] for run_name, run_items in data["runs"].items(): @@ -51,6 +52,8 @@ def main(): input_voxel_size=res, output_voxel_size=res, name=run_name, + input_size=input_size, + output_size=output_size, ) model_command = f"fly -c {model_config.checkpoint_path} -ch {','.join(model_config.channels)} -ivs {','.join(map(str,model_config.input_voxel_size))} -ovs {','.join(map(str,model_config.output_voxel_size))}" command = f"{SERVER_COMMAND} {model_command} -d {data_path}" From d40bb0630170facd819a26eefbdb80255e49eaa3 Mon Sep 17 00:00:00 2001 From: zubovy Date: Tue, 16 Sep 2025 10:54:11 -0400 Subject: [PATCH 4/8] :bug: write unit into a .zattrs metadata as a string, not as a list --- cellmap_flow/blockwise/blockwise_processor.py | 2 +- cellmap_flow/utils/ds.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cellmap_flow/blockwise/blockwise_processor.py b/cellmap_flow/blockwise/blockwise_processor.py index 8835550..d39d5b1 100644 --- a/cellmap_flow/blockwise/blockwise_processor.py +++ b/cellmap_flow/blockwise/blockwise_processor.py @@ -153,7 +153,7 @@ def __init__(self, yaml_config: str, create=False): if 'multiscales' in list(zg.attrs): raise ValueError(f'multiscales attribute already exists in {z_store.path}') else: - zattrs = generate_singlescale_metadata(ds_name='s0', + zattrs = generate_singlescale_metadata(arr_name='s0', voxel_size=self.output_voxel_size, translation=[0.0,]*3, units=['nanometer',]*3, diff --git a/cellmap_flow/utils/ds.py b/cellmap_flow/utils/ds.py index a58f87f..d6b3537 100644 --- a/cellmap_flow/utils/ds.py +++ b/cellmap_flow/utils/ds.py @@ -17,7 +17,7 @@ from cellmap_flow.globals import g def generate_singlescale_metadata( - ds_name: str, + arr_name: str, voxel_size: list, translation: list, units: str, @@ -25,7 +25,7 @@ def generate_singlescale_metadata( ): z_attrs: dict = {"multiscales": [{}]} z_attrs["multiscales"][0]["axes"] = [ - {"name": axis, "type": "space", "unit": units} for axis in axes + {"name": axis, "type": "space", "unit": unit} for axis, unit in zip(axes, units) ] z_attrs["multiscales"][0]["coordinateTransformations"] = [ {"scale": [1.0, 1.0, 1.0], "type": "scale"} @@ -36,7 +36,7 @@ def generate_singlescale_metadata( {"scale": voxel_size, "type": "scale"}, {"translation": translation, "type": "translation"}, ], - "path": ds_name, + "path": arr_name, } ] From f031287e2d7b6d3b3b55294538d48b861a8194c0 Mon Sep 17 00:00:00 2001 From: Yurii Zubov <13329590+yuriyzubov@users.noreply.github.com> Date: Tue, 16 Sep 2025 11:04:09 -0400 Subject: [PATCH 5/8] Update cellmap_flow/blockwise/blockwise_processor.py don't convert zg.attrs to list Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cellmap_flow/blockwise/blockwise_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cellmap_flow/blockwise/blockwise_processor.py b/cellmap_flow/blockwise/blockwise_processor.py index d39d5b1..c837bf3 100644 --- a/cellmap_flow/blockwise/blockwise_processor.py +++ b/cellmap_flow/blockwise/blockwise_processor.py @@ -150,7 +150,7 @@ def __init__(self, yaml_config: str, create=False): try: z_store = NestedDirectoryStore(self.output_path / channel) zg = open_group(store=z_store, mode='a') - if 'multiscales' in list(zg.attrs): + if 'multiscales' in zg.attrs: raise ValueError(f'multiscales attribute already exists in {z_store.path}') else: zattrs = generate_singlescale_metadata(arr_name='s0', From ef39800c616fc5ce5d7307d9d37e9fbe8dd7590f Mon Sep 17 00:00:00 2001 From: zubovy Date: Tue, 16 Sep 2025 11:12:43 -0400 Subject: [PATCH 6/8] cast metadata dimensions that would match dimensionality of the zarr array --- cellmap_flow/blockwise/blockwise_processor.py | 12 ++++++------ cellmap_flow/utils/ds.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cellmap_flow/blockwise/blockwise_processor.py b/cellmap_flow/blockwise/blockwise_processor.py index d39d5b1..da46af3 100644 --- a/cellmap_flow/blockwise/blockwise_processor.py +++ b/cellmap_flow/blockwise/blockwise_processor.py @@ -139,9 +139,9 @@ def __init__(self, yaml_config: str, create=False): dtype=self.dtype, chunk_shape=self.block_shape, voxel_size=self.output_voxel_size, - axis_names=["z", "y", "x"], - units=["nanometer",]*3, - offset=(0, 0, 0), + axis_names=["z", "y", "x"][-len(self.block_shape):], + units=["nanometer",]*len(self.block_shape), + offset=(0,)*len(self.block_shape), ) except Exception as e: raise Exception( @@ -155,9 +155,9 @@ def __init__(self, yaml_config: str, create=False): else: zattrs = generate_singlescale_metadata(arr_name='s0', voxel_size=self.output_voxel_size, - translation=[0.0,]*3, - units=['nanometer',]*3, - axes=['z', 'y', 'x']) + translation=[0.0,]*len(self.block_shape), + units=['nanometer',]*len(self.block_shape), + axes=['z', 'y', 'x'][-len(self.block_shape):]) zg.attrs['multiscales'] = zattrs['multiscales'] except Exception as e: raise Exception( diff --git a/cellmap_flow/utils/ds.py b/cellmap_flow/utils/ds.py index d6b3537..ea2c4de 100644 --- a/cellmap_flow/utils/ds.py +++ b/cellmap_flow/utils/ds.py @@ -28,7 +28,7 @@ def generate_singlescale_metadata( {"name": axis, "type": "space", "unit": unit} for axis, unit in zip(axes, units) ] z_attrs["multiscales"][0]["coordinateTransformations"] = [ - {"scale": [1.0, 1.0, 1.0], "type": "scale"} + {"scale": [1.0,]*len(voxel_size), "type": "scale"} ] z_attrs["multiscales"][0]["datasets"] = [ { From ada6e2a16834aadb9823955d9d69e91243079ffa Mon Sep 17 00:00:00 2001 From: zubovy Date: Tue, 16 Sep 2025 11:15:59 -0400 Subject: [PATCH 7/8] add default name parameter in .zattrs --- cellmap_flow/utils/ds.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cellmap_flow/utils/ds.py b/cellmap_flow/utils/ds.py index ea2c4de..853a384 100644 --- a/cellmap_flow/utils/ds.py +++ b/cellmap_flow/utils/ds.py @@ -40,7 +40,7 @@ def generate_singlescale_metadata( } ] - z_attrs["multiscales"][0]["name"] = "" + z_attrs["multiscales"][0]["name"] = "/" z_attrs["multiscales"][0]["version"] = "0.4" return z_attrs From a0007c45f823f1547ebea714c23e4d1989cc1d4d Mon Sep 17 00:00:00 2001 From: zubovy Date: Tue, 16 Sep 2025 12:00:23 -0400 Subject: [PATCH 8/8] :recycle: add ndim to improve readability --- cellmap_flow/blockwise/blockwise_processor.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cellmap_flow/blockwise/blockwise_processor.py b/cellmap_flow/blockwise/blockwise_processor.py index ca003a5..e171f2f 100644 --- a/cellmap_flow/blockwise/blockwise_processor.py +++ b/cellmap_flow/blockwise/blockwise_processor.py @@ -131,6 +131,7 @@ def __init__(self, yaml_config: str, create=False): print(f"type: {self.dtype}") print(f"output_path: {self.output_path}") for channel in self.output_channels: + ndim = len(self.block_shape) if create: try: array = prepare_ds( @@ -139,9 +140,9 @@ def __init__(self, yaml_config: str, create=False): dtype=self.dtype, chunk_shape=self.block_shape, voxel_size=self.output_voxel_size, - axis_names=["z", "y", "x"][-len(self.block_shape):], - units=["nanometer",]*len(self.block_shape), - offset=(0,)*len(self.block_shape), + axis_names=["z", "y", "x"][-ndim:], + units=["nanometer",]*ndim, + offset=(0,)*ndim, ) except Exception as e: raise Exception( @@ -155,9 +156,9 @@ def __init__(self, yaml_config: str, create=False): else: zattrs = generate_singlescale_metadata(arr_name='s0', voxel_size=self.output_voxel_size, - translation=[0.0,]*len(self.block_shape), - units=['nanometer',]*len(self.block_shape), - axes=['z', 'y', 'x'][-len(self.block_shape):]) + translation=[0.0,]*ndim, + units=['nanometer',]*ndim, + axes=['z', 'y', 'x'][-ndim:]) zg.attrs['multiscales'] = zattrs['multiscales'] except Exception as e: raise Exception(