diff --git a/formats/file-renaming-tool/.bumpversion.cfg b/formats/file-renaming-tool/.bumpversion.cfg index 9f1772079..33fe26444 100644 --- a/formats/file-renaming-tool/.bumpversion.cfg +++ b/formats/file-renaming-tool/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.4 +current_version = 0.2.5-dev0 commit = True tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-(?P[a-z]+)(?P\d+))? @@ -22,6 +22,12 @@ replace = version = "{new_version}" [bumpversion:file:plugin.json] +[bumpversion:file:FileRenaming.cwl] + +[bumpversion:file:ict.yaml] + [bumpversion:file:VERSION] +[bumpversion:file:README.md] + [bumpversion:file:src/polus/images/formats/file_renaming/__init__.py] diff --git a/formats/file-renaming-tool/CHANGELOG.md b/formats/file-renaming-tool/CHANGELOG.md index 02a40369f..92aa4ba84 100644 --- a/formats/file-renaming-tool/CHANGELOG.md +++ b/formats/file-renaming-tool/CHANGELOG.md @@ -2,3 +2,13 @@ ### Added - Pytests to test this plugin - Added a support for recursively searching for files within a directory and its subdirectories of specified pattern by passing value either raw or map for `mapDirectory` input argument. + +## [0.2.4-dev2] - 2024-12-17 +### Added +- Integrated filepattern in this tool +- Modified the sorting dictionary letters key with respect to length + +## [0.2.5-dev0] - 2026-03-04 +### Added +- Updated dependencies and base container image +- refactored code diff --git a/formats/file-renaming-tool/Dockerfile b/formats/file-renaming-tool/Dockerfile index 52c8c942e..7ff8a2b32 100644 --- a/formats/file-renaming-tool/Dockerfile +++ b/formats/file-renaming-tool/Dockerfile @@ -1,10 +1,11 @@ -FROM polusai/bfio:2.3.6 +FROM polusai/bfio:2.5.0 # environment variables defined in polusai/bfio ENV EXEC_DIR="/opt/executables" ENV POLUS_IMG_EXT=".ome.tif" ENV POLUS_TAB_EXT=".csv" ENV POLUS_LOG="INFO" +ENV NUM_THREADS=8 # Work directory defined in the base container WORKDIR ${EXEC_DIR} diff --git a/formats/file-renaming-tool/README.md b/formats/file-renaming-tool/README.md index 37e01d17d..a123137fa 100644 --- a/formats/file-renaming-tool/README.md +++ b/formats/file-renaming-tool/README.md @@ -1,6 +1,12 @@ -# File Renaming(0.2.4-dev0) -This WIPP plugin uses supplied file naming patterns to dynamically -rename and save files in an image collection to a new image collection. +# File Renaming(v0.2.5-dev0) +This WIPP plugin renames files in an image collection (typically microscopy images) using powerful, user-defined filename patterns for both input matching and output naming. + +It is particularly useful for: +- Standardizing file names across experiments +- Converting channel names (GFP, DAPI, TXRED…) to numeric indices +- Adding zero-padding consistently +- Reorganizing naming schemes +- Working with nested directory structures ## Example Usage * The user can upload an image collection where all files contain similar @@ -16,12 +22,14 @@ naming conventions. `newdata_x001_y001_c002.tif` `newdata_x001_y001_c003.tif` - * **User input pattern:** + * **filePattern:** `img_x{row:dd}_y{col:dd}_{channel:c+}.ome.tif` - * **User output pattern:** + * **outFilePattern:** `newdata_x{row:ddd}_y{col:ddd}_c{channel:ddd}.ome.tif` +**Important rules:** + * The user can format the output digit using the number of digits specified in the output format. * `d` represents *digit* @@ -38,7 +46,41 @@ exception: then the script sorts the strings that match the character pattern and assigns numbers 0+ to them. -* New optional feature `mapDirectory` implemented to include directory name in renamed files. This plugin also handles nested directories and one level up directory name is added to renamed files if `raw` value passed, `map` for mapped subdirectories `d0, d1, d2, ... dn` and if not passed then no directory name is added in renamed files. +* Implemented a new optional boolean feature `mapDirectory` to append mapped directory names in renamed files. + + +## Renaming files within a complex nested directory structure: +In specific scenarios where users need to rename files within nested subdirectories, this functionality can be leveraged by providing an appropriate pattern + +For Example + +``` +BBBC001 + └── raw + ├── Ground_Truth + │ └── groundtruth_images + │ ├── AS_09125_050118150001_A03f00d0.tif + │ ├── AS_09125_050118150001_A03f01d0.tif + │ ├── AS_09125_050118150001_A03f02d0.tif + │ ├── AS_09125_050118150001_A03f03d0.tif + │ ├── AS_09125_050118150001_A03f04d0.tif + │ └── AS_09125_050118150001_A03f05d0.tif + └── Images + └── human_ht29_colon_cancer_1_images + ├── AS_09125_050118150001_A03f00d0.tif + ├── AS_09125_050118150001_A03f01d0.tif + ├── AS_09125_050118150001_A03f02d0.tif + ├── AS_09125_050118150001_A03f03d0.tif + ├── AS_09125_050118150001_A03f04d0.tif + └── AS_09125_050118150001_A03f05d0.tif + +``` + +Now, renaming files within the `human_ht29_colon_cancer_1_images` is achievable by providing a `filepattern` such as `/.*/Images/(?P.*)/.*_{row:c}{col:dd}f{f:dd}d{channel:d}.tif`, and specifying `outFilePattern` as `x{row:dd}_y{col:dd}_p{f:dd}_c{channel:d}.tif`. If the mapDirectory option is not utilized, the raw directory name will be appended in the renamed files. To handle directory names containing both letters and digits, employ `(?P.*)`; use `{directory:c+}` or `{directory:d+}` if it contains solely letters or digits, respectively. + +#### Note: +To extract directory names, the pattern should start with a backslash + Contact [Melanie Parham](mailto:melanie.parham@axleinfo.com), [Hamdah Shafqat abbasi](mailto:hamdahshafqat.abbasi@nih.gov) for more @@ -57,6 +99,38 @@ To build the Docker image for the conversion plugin, run If WIPP is running, navigate to the plugins page and add a new plugin. Paste the contents of `plugin.json` into the pop-up window and submit. +## Docker / CLI Examples + +Basic + +``` +docker run --rm \ + -v "/path/to/input/images:/data/input" \ + -v "/path/to/output:/data/output" \ + polusai/file-renaming-tool:0.2.5-dev0 \ + --inpDir /data/input \ + --outDir /data/output \ + --filePattern 'img_x{row:dd}_y{col:dd}_{channel:c+}.tif' \ + --outFilePattern 'r{row:03d}_c{col:03d}_ch{channel:03d}.ome.tif' + + +``` +Directory mapping + +``` +docker run --rm \ + -v "/path/to/dataset:/data" \ + polusai/file-renaming-tool:0.2.5-dev0 \ + --inpDir /data/input \ + --outDir /data/output \ + --filePattern '/.*/Images/(?P.*)/.*_{row:c}{col:dd}f{f:dd}d{channel:d}.tif' \ + --outFilePattern 'x{row:dd}_y{col:dd}_p{f:dd}_c{channel:d}_dir{directory}.tif' \ + --mapDirectory + + +``` + + ## Options This plugin takes three input argument and one output argument: @@ -67,5 +141,5 @@ This plugin takes three input argument and one output argument: | `--filePattern` | Input filename pattern | Input | string | | `--outDir` | Output collection | Output | collection | | `--outFilePattern` | Output filename pattern | Input | string | -| `--mapDirectory` | Directory name (`raw`, `map`) | Input | enum | +| `--mapDirectory` | Extract mapped directory name | Input | boolean | | `--preview` | Generate a JSON file with outputs | Output | JSON | diff --git a/formats/file-renaming-tool/VERSION b/formats/file-renaming-tool/VERSION index abd410582..0eac58ed7 100644 --- a/formats/file-renaming-tool/VERSION +++ b/formats/file-renaming-tool/VERSION @@ -1 +1 @@ -0.2.4 +0.2.5-dev0 diff --git a/formats/file-renaming-tool/filerenaming.cwl b/formats/file-renaming-tool/filerenaming.cwl index 454a1dae8..89e937a6f 100644 --- a/formats/file-renaming-tool/filerenaming.cwl +++ b/formats/file-renaming-tool/filerenaming.cwl @@ -12,7 +12,7 @@ inputs: mapDirectory: inputBinding: prefix: --mapDirectory - type: string? + type: boolean? outDir: inputBinding: prefix: --outDir @@ -28,7 +28,7 @@ outputs: type: Directory requirements: DockerRequirement: - dockerPull: polusai/file-renaming-tool:0.2.4-dev0 + dockerPull: polusai/file-renaming-tool:0.2.5-dev0 InitialWorkDirRequirement: listing: - entry: $(inputs.outDir) diff --git a/formats/file-renaming-tool/ict.yaml b/formats/file-renaming-tool/ict.yaml index 56e75a25e..f5b78e1e2 100644 --- a/formats/file-renaming-tool/ict.yaml +++ b/formats/file-renaming-tool/ict.yaml @@ -1,65 +1,61 @@ author: -- Melanie Parham -- Hamdah Shafqat -contact: melanie.parham@axleinfo.com -container: polusai/file-renaming-tool:0.2.4-dev0 + - Hamdah Shafqat + - Melanie Parham +contact: hamdahshafqat.abbasi@nih.gov +container: polusai/file-renaming-tool:0.2.5-dev0 description: Rename and store image collection files in a new image collection entrypoint: python3 -m polus.images.formats.file_renaming inputs: -- description: Filename pattern used to separate data - format: - - string - name: filePattern - required: true - type: string -- description: Input image collection to be processed by this plugin - format: - - collection - name: inpDir - required: true - type: path -- description: Desired filename pattern used to rename and separate data - format: - - string - name: outFilePattern - required: true - type: string -- description: Get directory name incorporated in renamed files - format: - - enum - name: mapDirectory - required: false - type: string + - description: Input image collection to be processed by this plugin + format: + - collection + name: inpDir + required: true + type: path + - description: Filename pattern used to separate data + format: + - string + name: filePattern + required: true + type: string + - description: Desired filename pattern used to rename and separate data + format: + - string + name: outFilePattern + required: true + type: string + - description: Incorporate mapped directory names into renamed files + format: + - boolean + name: mapDirectory + required: false + type: boolean name: polusai/FileRenaming outputs: -- description: Output collection - format: - - collection - name: outDir - required: true - type: path -repository: https://github.com/PolusAI/polus-plugins + - description: Output collection + format: + - collection + name: outDir + required: true + type: path +repository: https://github.com/PolusAI/image-tools specVersion: 1.0.0 title: File Renaming ui: -- description: Filename pattern used to separate data - key: inputs.filePattern - title: Filename pattern - type: text -- description: Input image collection to be processed by this plugin - key: inputs.inpDir - title: Input collection - type: path -- description: Desired filename pattern used to rename and separate data - key: inputs.outFilePattern - title: Output filename pattern - type: text -- description: Get directory name incorporated in renamed files - fields: - - raw - - map - - default - key: inputs.mapDirectory - title: mapDirectory - type: select -version: 0.2.4-dev0 + - description: Input image collection to be processed by this plugin + key: inputs.inpDir + title: Input collection + type: path + - description: Filename pattern used to separate data + key: inputs.filePattern + title: Filename pattern + type: text + - description: Desired filename pattern used to rename and separate data + key: inputs.outFilePattern + title: Output filename pattern + type: text + - description: Incorporate mapped directory names into renamed files + key: inputs.mapDirectory + title: mapDirectory + type: checkbox +version: 0.2.5-dev0 diff --git a/formats/file-renaming-tool/plugin.json b/formats/file-renaming-tool/plugin.json index 082e9600c..5edd377e0 100644 --- a/formats/file-renaming-tool/plugin.json +++ b/formats/file-renaming-tool/plugin.json @@ -1,32 +1,32 @@ { "name": "File Renaming", - "version": "0.2.4", + "version": "0.2.5-dev0", "title": "File Renaming", "description": "Rename and store image collection files in a new image collection", - "author": "Melanie Parham (melanie.parham@axleinfo.com), Hamdah Shafqat Abbasi (hamdahshafqat.abbasi@nih.gov)", + "author": "Hamdah Shafqat Abbasi (hamdahshafqat.abbasi@nih.gov), Melanie Parham (melanie.parham@axleinfo.com)", "institution": "National Center for Advancing Translational Sciences, National Institutes of Health", "repository": "https://github.com/PolusAI/image-tools", "website": "https://ncats.nih.gov/preclinical/core/informatics", "citation": "", - "containerId": "polusai/file-renaming-tool:0.2.4", + "containerId": "polusai/file-renaming-tool:0.2.5-dev0", "baseCommand": [ "python3", "-m", "polus.images.formats.file_renaming" ], "inputs": [ - { - "name": "filePattern", - "type": "string", - "description": "Filename pattern used to separate data", - "required": true - }, { "name": "inpDir", "type": "collection", "description": "Input image collection to be processed by this plugin", "required": true }, + { + "name": "filePattern", + "type": "string", + "description": "Filename pattern used to separate data", + "required": true + }, { "name": "outFilePattern", "type": "string", @@ -35,16 +35,8 @@ }, { "name": "mapDirectory", - "type": "enum", - "description": "Get directory name incorporated in renamed files", - "default": "default", - "options": { - "values": [ - "raw", - "map", - "default" - ] - }, + "type": "boolean", + "description": "Incorporate mapped directory names into renamed files", "required": false } ], @@ -56,16 +48,16 @@ } ], "ui": [ - { - "key": "inputs.filePattern", - "title": "Filename pattern", - "description": "Filename pattern used to separate data" - }, { "key": "inputs.inpDir", "title": "Input collection", "description": "Input image collection to be processed by this plugin" }, + { + "key": "inputs.filePattern", + "title": "Filename pattern", + "description": "Filename pattern used to separate data" + }, { "key": "inputs.outFilePattern", "title": "Output filename pattern", @@ -74,8 +66,7 @@ { "key": "inputs.mapDirectory", "title": "mapDirectory", - "description": "Get directory name incorporated in renamed files", - "default": "" + "description": "Incorporate mapped directory names into renamed files" } ] } diff --git a/formats/file-renaming-tool/pyproject.toml b/formats/file-renaming-tool/pyproject.toml index 3f1d2dafb..3ad45366b 100644 --- a/formats/file-renaming-tool/pyproject.toml +++ b/formats/file-renaming-tool/pyproject.toml @@ -1,19 +1,20 @@ [tool.poetry] name = "polus-images-formats-file-renaming" -version = "0.2.4" -description = "Rename and store image collection files in a new image collection" +version = "0.2.5-dev0" +description = "Convert BioFormats datatypes to ome.tif or ome.zarr file format" authors = [ -"Melanie Parham ", -"Hamdah Shafqat abbasi " + "Hamdah Shafqat abbasi ", + "Melanie Parham ", ] readme = "README.md" packages = [{include = "polus", from = "src"}] [tool.poetry.dependencies] -python = ">=3.9,<3.12" -typer = "^0.7.0" +python = ">=3.11,<3.13" +typer = "0.24.1" tqdm = "^4.64.1" -numpy = "^1.26.3" +numpy = ">2.0.0" +filepattern = "2.1.4" [tool.poetry.group.dev.dependencies] bump2version = "^1.0.1" @@ -23,6 +24,26 @@ flake8 = "^6.0.0" mypy = "^1.0.1" pytest = "^7.2.1" +[tool.mypy] +mypy_path = "src" +strict = true +warn_unreachable = true +warn_no_return = true + +[[tool.mypy.overrides]] +module = "filepattern" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "filepattern.*" +ignore_missing_imports = true + + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + +[tool.pytest.ini_options] +pythonpath = [ + "." +] diff --git a/formats/file-renaming-tool/run-plugin.sh b/formats/file-renaming-tool/run-plugin.sh index c9b7a5ef3..868dfd003 100644 --- a/formats/file-renaming-tool/run-plugin.sh +++ b/formats/file-renaming-tool/run-plugin.sh @@ -1,4 +1,6 @@ -#!/bin/bash +!/bin/bash + + version=$( 1: - subnames = [pathlib.Path(sb).name for sb in subdirs] - sub_check = all(name == subnames[0] for name in subnames) - - for i, sub in enumerate(subdirs): - assert ( - len([f for f in pathlib.Path(sub).iterdir() if f.is_file()]) != 0 - ), "Files are missing in input directory!!!" - dir_pattern = r"^[A-Za-z0-9_]+$" - # Iterate over the directories and check if they match the pattern - matching_directories: Optional[Match[Any]] = re.match( - dir_pattern, - pathlib.Path(sub).stem, - ) - if matching_directories is not None: - matching_directories = matching_directories.group() - - if not sub_check and f"{map_directory}" == "raw": - outfile_pattern = f"{matching_directories}_{out_file_pattern}" - elif subnames and f"{map_directory}" == "raw": - logger.error( - "Subdirectoy names are same, should be different.", - ) - break - else: - outfile_pattern = f"d{i}_{out_file_pattern}" - fr.rename(sub, out_dir, file_pattern, outfile_pattern) - logger.info( - "Finished renaming files.", - ) - - if preview: + inp_dir = pathlib.Path(inp_dir).resolve() + out_dir = pathlib.Path(out_dir).resolve() + + if not inp_dir.is_dir(): + typer.echo(f"Error: Input directory not found: {inp_dir}", err=True) + raise typer.Exit(1) + + if not out_dir.exists(): + typer.echo(f"Creating output directory: {out_dir}") + out_dir.mkdir(parents=True, exist_ok=True) + + if not preview: + rename(inp_dir, out_dir, file_pattern, out_file_pattern, map_directory) + else: with pathlib.Path.open(pathlib.Path(out_dir, "preview.json"), "w") as jfile: + rename(inp_dir, out_dir, file_pattern, out_file_pattern, map_directory) out_json: dict[str, Any] = { "filepattern": out_file_pattern, "outDir": [], } for file in out_dir.iterdir(): - if file.is_file() and file.suffix != ".json": + if ( + file.is_file() + and file.suffix != ".json" + and not file.name.startswith(".") + ): out_name = file.name out_json["outDir"].append(out_name) + pathlib.Path.unlink(file) json.dump(out_json, jfile, indent=2) diff --git a/formats/file-renaming-tool/src/polus/images/formats/file_renaming/file_renaming.py b/formats/file-renaming-tool/src/polus/images/formats/file_renaming/file_renaming.py deleted file mode 100644 index 2b570d7b8..000000000 --- a/formats/file-renaming-tool/src/polus/images/formats/file_renaming/file_renaming.py +++ /dev/null @@ -1,406 +0,0 @@ -"""File Renaming.""" -import enum -import logging -import os -import pathlib -import re -import shutil -from concurrent.futures import ProcessPoolExecutor -from concurrent.futures import as_completed -from multiprocessing import cpu_count -from sys import platform -from typing import Any -from typing import Union - -from tqdm import tqdm - -EXT = (".csv", ".txt", ".cppipe", ".yml", ".yaml", ".xml", ".json") - -logger = logging.getLogger(__name__) -logger.setLevel(os.environ.get("POLUS_LOG", logging.INFO)) - -if platform == "linux" or platform == "linux2": - NUM_THREADS = len(os.sched_getaffinity(0)) # type: ignore -else: - NUM_THREADS = max(cpu_count() // 2, 2) - - -class MappingDirectory(str, enum.Enum): - """Map Directory information.""" - - RAW = "raw" - MAP = "map" - Default = "" - - -def image_directory(dirpath: pathlib.Path) -> Union[bool, None]: - """Fetching image directory only. - - Args: - dirpath: Path to directory. - - Returns: - bool. - """ - for file in dirpath.iterdir(): - return bool(file.is_file() and file.suffix not in EXT) - return None - - -def get_data(inp_dir: str) -> tuple[list[pathlib.Path], list[pathlib.Path]]: - """Get group names from pattern. Convert patterns (c+ or dd) to regex. - - Args: - inp_dir: Path to input directory. - - Returns: - A tuple of list of subdirectories and files path. - """ - filepath: list[pathlib.Path] = [] - dirpaths: list[pathlib.Path] = [] - for path in pathlib.Path(inp_dir).rglob("*"): - if path.is_dir(): - if path.parent in dirpaths: - dirpaths.remove(path.parent) - if image_directory(path): - dirpaths.append(path) - elif path.is_file() and not path.name.endswith(tuple(EXT)): - fpath = pathlib.Path(inp_dir).joinpath(path) - filepath.append(fpath) - - return dirpaths, filepath - - -def map_pattern_grps_to_regex(file_pattern: str) -> dict: - """Get group names from pattern. Convert patterns (c+ or dd) to regex. - - Args: - file_pattern: File pattern, with special characters escaped. - - Returns: - rgx_patterns: The key is a named regex group. The value is regex. - """ - logger.debug(f"pattern_to_regex() inputs: {file_pattern}") - #: Extract the group name and associated pattern (ex: {row:dd}) - group_and_pattern_tuples = re.findall(r"\{(\w+):([dc+]+)\}", file_pattern) - pattern_map = {"d": r"[0-9]", "c": r"[a-zA-Z]", "+": "+"} - rgx_patterns = {} - for group_name, groups_pattern in group_and_pattern_tuples: - rgx = "".join([pattern_map[pattern] for pattern in groups_pattern]) - #: ?P is included to specify that foo is a named group. - rgx_patterns[group_name] = rf"(?P<{group_name}>{rgx})" - logger.debug(f"pattern_to_regex() returns {rgx_patterns}") - - return rgx_patterns - - -def convert_to_regex(file_pattern: str, extracted_rgx_patterns: dict) -> str: - """Integrate regex into original file pattern. - - The extracted_rgx_patterns helps replace simple patterns (ie. dd, c+) - with regex in the correct location, based on named groups. - - Args: - file_pattern: file pattern provided by the user. - extracted_rgx_patterns: named group and regex value dictionary. - - Returns: - new_pattern: file pattern converted to regex. - """ - logger.debug(f"convert_to_regex() inputs: {file_pattern}, {extracted_rgx_patterns}") - rgx_pattern = file_pattern - for named_grp, regex_str in extracted_rgx_patterns.items(): - #: The prefix "fr" creates raw f-strings, which act like format() - rgx_pattern = re.sub(rf"\{{{named_grp}:.*?\}}", regex_str, rgx_pattern) - logger.debug(f"convert_to_regex() returns {rgx_pattern}") - return rgx_pattern - - -def specify_len(out_pattern: str) -> str: - """Update output file pattern to output correct number of digits. - - After extracting group names and associated patterns from the - outFilePattern, integrate format strings into the file pattern to - accomplish. - - Example: - "newdata_x{row:ddd}" becomes "new_data{row:03d}". - - Args: - out_pattern: output file pattern provided by the user. - - Returns: - new_out_pattern: file pattern converted to format string. - """ - logger.debug(f"specify_len() inputs: {out_pattern}") - #: Extract the group name and associated pattern (ex: {row:dd}) - group_and_pattern_tuples = re.findall(r"\{(\w+):([dc+]+)\}", out_pattern) - grp_rgx_dict = {} - #: Convert simple file patterns to format strings (ex: ddd becomes :03d). - for group_name, groups_pattern in group_and_pattern_tuples: - # Get the length of the string if not variable width - s_len = "" if "+" in groups_pattern else str(len(groups_pattern)) - # Set the formatting value - temp_pattern = "s" if groups_pattern[0] == "c" else "d" - # Prepend a 0 for padding digit format - if temp_pattern == "d": - s_len = "0" + s_len - grp_rgx_dict[group_name] = "{" + group_name + ":" + s_len + temp_pattern + "}" - new_out_pattern = out_pattern - for named_group, format_str in grp_rgx_dict.items(): - new_out_pattern = re.sub( - rf"\{{{named_group}:.*?\}}", - format_str, - new_out_pattern, - ) - logger.debug(f"specify_len() returns {new_out_pattern}") - - return new_out_pattern - - -def get_char_to_digit_grps(inp_pattern: str, out_pattern: str) -> list[str]: - """Return group names where input and output datatypes differ. - - If the input pattern is a character and the output pattern is a - digit, return the named group associated with those patterns. - - Args: - inp_pattern: Original input pattern. - out_pattern: Original output pattern. - - Returns: - special_categories: Named groups with c to d conversion or [None]. - """ - logger.debug(f"get_char_to_digit_grps() inputs: {inp_pattern}, {out_pattern}") - #: Extract the group name and associated pattern (ex: {row:dd}) - ingrp_and_pattern_tuples = re.findall(r"\{(\w+):([dc+]+)\}", inp_pattern) - outgrp_and_pattern_tuples = re.findall(r"\{(\w+):([dc+]+)\}", out_pattern) - - #: Get group names where input pattern is c and output pattern is d - special_categories = [] - for out_grp_name in dict(outgrp_and_pattern_tuples): - if dict(ingrp_and_pattern_tuples)[out_grp_name].startswith("c") and dict( - outgrp_and_pattern_tuples, - )[out_grp_name].startswith("d"): - special_categories.append(out_grp_name) - logger.debug(f"get_char_to_digit_grps() returns {special_categories}") - return special_categories - - -def extract_named_grp_matches( - rgx_pattern: str, - inp_files: list, -) -> list[dict[str, Union[str, Any]]]: - """Store matches from the substrings from each filename that vary. - - Loop through each file. Apply the regex pattern to each - filename. When a match occurs for a named group, add that match to - a dictionary, where the key is the named (regex capture) group and - the value is the corresponding match from the filename. - - Args: - rgx_pattern: input pattern in regex format. - inp_files: list of files in input directory. - - Returns: - grp_match_dict_list: list of dictionaries containing str matches. - """ - logger.debug(f"extract_named_grp_matches() inputs: {rgx_pattern}, {inp_files}") - grp_match_dict_list = [] - #: Build list of dicts, where key is capture group and value is match - for filename in inp_files: - try: - d = re.match(rgx_pattern, filename) - if d is None: - break - grp_match_dict = d.groupdict() - #: Add filename information to dictionary - grp_match_dict["fname"] = filename - grp_match_dict_list.append(grp_match_dict) - except AttributeError as e: - logger.error(e) - logger.error( - "File pattern does not match one or more files. " - "See README for pattern rules.", - ) - msg = "File pattern does not match with files." - raise AttributeError(msg) from e - except AssertionError as e: - if str(e).startswith("redefinition of group name"): - logger.error( - "Ensure that named groups in file patterns are unique. " - "({})".format(e), - ) - msg = f"Ensure that named groups in file patterns are unique. ({e})" - raise ValueError( - msg, - ) from e - - logger.debug(f"extract_named_grp_matches() returns {grp_match_dict_list}") - - return grp_match_dict_list - - -def str_to_int(dictionary: dict) -> dict: - """If a number in the dictionary is in str format, convert to int. - - Args: - dictionary: contains group, match, and filename info. - - Returns: - fixed_dictionary: input dict, with numeric str values to int. - """ - fixed_dictionary = {} - for key, value in dictionary.items(): - try: - fixed_dictionary[key] = int(value) - except Exception: # noqa: BLE001 - fixed_dictionary[key] = value - logger.debug(f"str_to_int() returns {fixed_dictionary}") - return fixed_dictionary - - -def letters_to_int(named_grp: str, all_matches: list) -> dict: - """Alphabetically number matches for the given named group for all files. - - Make a dictionary where each key is a match for each filename and - the corresponding value is a number indicating its alphabetical rank. - - Args: - named_grp: Group with c in input pattern and d in out pattern. - all_matches: list of dicts, k=grps, v=match, last item=file name. - - Returns: - cat_index_dict: dict key=category name, value=index after sorting. - """ - logger.debug(f"letters_to_int() inputs: {named_grp}, {all_matches}") - #: Generate list of strings belonging to the given category (element). - alphabetized_matches = sorted( - {namedgrp_match_dict[named_grp] for namedgrp_match_dict in all_matches}, - ) - str_alphabetindex_dict = {} - for i in range(0, len(alphabetized_matches)): - str_alphabetindex_dict[alphabetized_matches[i]] = i - logger.debug(f"letters_to_int() returns {str_alphabetindex_dict}") - return str_alphabetindex_dict - - -def rename( # noqa: C901, PLR0915, PLR0912 - inp_dir: str, - out_dir: pathlib.Path, - file_pattern: str, - out_file_pattern: str, -) -> None: - """Scalable Extraction of Nyxus Features. - - Args: - inp_dir : Path to image collection. - out_dir : Path to image collection storing copies of renamed files. - file_pattern : Input file pattern. - out_file_pattern : Output file pattern. - """ - logger.info("Start renaming files") - file_ext = re.split("\\.", file_pattern)[-1] - empty_ext = "" - ext_length = 5 - if file_ext == "*": - msg = "Please define filePattern including file extension!" - raise ValueError(msg) - if file_ext == empty_ext: - msg = "Please define filePattern including file extension!" - raise ValueError(msg) - if len(file_ext) > ext_length: - msg = "Please define filePattern including file extension!" - raise ValueError(msg) - - _, inpfiles = get_data(inp_dir) - - inp_files: list[str] = [ - f"{f.name}" for f in inpfiles if pathlib.Path(f).suffix == f".{file_ext}" - ] - - if len(inp_files) == 0: - msg = "Please check input directory again!! As it does not contain files" - raise ValueError(msg) - - chars_to_escape = ["(", ")", "[", "]", "$", "."] - for char in chars_to_escape: - file_pattern = file_pattern.replace(char, ("\\" + char)) - - if "\\.*" in file_pattern: - file_pattern = file_pattern.replace("\\.*", (".*")) - if "\\.+" in file_pattern: - file_pattern = file_pattern.replace("\\.+", (".+")) - groupname_regex_dict = map_pattern_grps_to_regex(file_pattern) - - # #: Integrate regex from dictionary into original file pattern - inp_pattern_rgx = convert_to_regex(file_pattern, groupname_regex_dict) - - # #: Integrate format strings into outFilePattern to specify digit/char len - out_pattern_fstring = specify_len(out_file_pattern) - - #: List named groups where input pattern=char & output pattern=digit - char_to_digit_categories = get_char_to_digit_grps(file_pattern, out_file_pattern) - - #: List a dictionary (k=named grp, v=match) for each filename - - all_grp_matches = extract_named_grp_matches(inp_pattern_rgx, inp_files) - - #: Convert numbers from strings to integers, if applicable - for i in range(0, len(all_grp_matches)): - tmp_match = all_grp_matches[i] - all_grp_matches[i] = str_to_int(tmp_match) - - if len(all_grp_matches) == 0: - msg = f"Please define filePattern: {file_pattern} again!!" - raise ValueError( - msg, - ) - - #: Populate dict if any matches need to be converted from char to digit - #: Key=named group, Value=Int representing matched chars - numbered_categories = {} - for named_grp in char_to_digit_categories: - numbered_categories[named_grp] = letters_to_int(named_grp, all_grp_matches) - # Check named groups that need c->d conversion - for named_grp in char_to_digit_categories: - for i in range(0, len(all_grp_matches)): - if all_grp_matches[i].get(named_grp): - #: Replace original matched letter with new digit - all_grp_matches[i][named_grp] = numbered_categories[named_grp][ - all_grp_matches[i][named_grp] - ] - - with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor: - threads = [] - for match in all_grp_matches: - # : If running on WIPP - if out_dir != inp_dir: - #: Apply str formatting to change digit or char length - out_name = out_dir.resolve() / out_pattern_fstring.format( - **match, - ) - old_file_name = pathlib.Path(inp_dir, match["fname"]) - threads.append(executor.submit(shutil.copy2, old_file_name, out_name)) - else: - out_name = out_pattern_fstring.format(**match) # type: ignore - old_file_name = match["fname"] # type: ignore - logger.info(f"Old name {old_file_name} & new name {out_name}") - threads.append( - executor.submit( - os.rename, - pathlib.Path(inp_dir, old_file_name), - pathlib.Path(out_dir, out_name), - ), - ) - - for f in tqdm( - as_completed(threads), - total=len(threads), - mininterval=5, - desc="converting images", - initial=0, - unit_scale=True, - colour="cyan", - ): - f.result() diff --git a/formats/file-renaming-tool/src/polus/images/formats/file_renaming/filerenaming.py b/formats/file-renaming-tool/src/polus/images/formats/file_renaming/filerenaming.py new file mode 100644 index 000000000..d25fce1d4 --- /dev/null +++ b/formats/file-renaming-tool/src/polus/images/formats/file_renaming/filerenaming.py @@ -0,0 +1,308 @@ +"""File Renaming.""" +import logging +import os +import pathlib +import re +import shutil +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed +from typing import Any + +import filepattern as fp +from tqdm import tqdm + +logger = logging.getLogger(__name__) +logger.setLevel(os.environ.get("POLUS_LOG", logging.INFO)) + + +def get_num_threads() -> int: + """Return thread count from NUM_THREADS env or a safe I/O-bound default.""" + try: + if env := os.getenv("NUM_THREADS"): + return max(1, int(env)) + except ValueError: + pass + + return min(32, (os.cpu_count() or 1) * 4) + + +NUM_THREADS = get_num_threads() + + +def specify_len(out_pattern: str) -> str: + """Update output file pattern to output correct number of digits. + + After extracting group names and associated patterns from the + outFilePattern, integrate format strings into the file pattern to + accomplish. + + Example: + "newdata_x{row:ddd}" becomes "new_data{row:03d}". + + Args: + out_pattern: output file pattern provided by the user. + + Returns: + new_out_pattern: file pattern converted to format string. + """ + logger.debug(f"specify_len() inputs: {out_pattern}") + #: Extract the group name and associated pattern (ex: {row:dd}) + group_and_pattern_tuples = re.findall(r"\{(\w+):([dc+]+)\}", out_pattern) + grp_rgx_dict = {} + #: Convert simple file patterns to format strings (ex: ddd becomes :03d). + for group_name, groups_pattern in group_and_pattern_tuples: + # Get the length of the string if not variable width + s_len = "" if "+" in groups_pattern else str(len(groups_pattern)) + # Set the formatting value + temp_pattern = "s" if groups_pattern[0] == "c" else "d" + # Prepend a 0 for padding digit format + if temp_pattern == "d": + s_len = "0" + s_len + grp_rgx_dict[group_name] = "{" + group_name + ":" + s_len + temp_pattern + "}" + new_out_pattern = out_pattern + for named_group, format_str in grp_rgx_dict.items(): + new_out_pattern = re.sub( + rf"\{{{named_group}:.*?\}}", + format_str, + new_out_pattern, + ) + logger.debug(f"specify_len() returns {new_out_pattern}") + + return new_out_pattern + + +def get_char_to_digit_grps(inp_pattern: str, out_pattern: str) -> list[str]: + """Return group names where input and output datatypes differ. + + If the input pattern is a character and the output pattern is a + digit, return the named group associated with those patterns. + + Args: + inp_pattern: Original input pattern. + out_pattern: Original output pattern. + + Returns: + special_categories: Named groups with c to d conversion or [None]. + """ + logger.debug(f"get_char_to_digit_grps() inputs: {inp_pattern}, {out_pattern}") + #: Extract the group name and associated pattern (ex: {row:dd}) + ingrp_and_pattern_tuples = re.findall(r"\{(\w+):([dc+]+)\}", inp_pattern) + outgrp_and_pattern_tuples = re.findall(r"\{(\w+):([dc+]+)\}", out_pattern) + + #: Get group names where input pattern is c and output pattern is d + special_categories = [] + for out_grp_name in dict(outgrp_and_pattern_tuples): + if dict(ingrp_and_pattern_tuples)[out_grp_name].startswith("c") and dict( + outgrp_and_pattern_tuples, + )[out_grp_name].startswith("d"): + special_categories.append(out_grp_name) + logger.debug(f"get_char_to_digit_grps() returns {special_categories}") + return special_categories + + +def str_to_int(dictionary: dict[str, Any]) -> dict[str, Any]: + """If a number in the dictionary is in str format, convert to int. + + Args: + dictionary: contains group, match, and filename info. + + Returns: + fixed_dictionary: input dict, with numeric str values to int. + """ + fixed_dictionary = {} + for key, value in dictionary.items(): + try: + fixed_dictionary[key] = int(value) + except (ValueError, TypeError): + fixed_dictionary[key] = value + logger.debug(f"str_to_int() returns {fixed_dictionary}") + return fixed_dictionary + + +def letters_to_int(named_grp: str, all_matches: list[dict[str, Any]]) -> dict[str, int]: + """Alphabetically number matches for the given named group for all files. + + Make a dictionary where each key is a match for each filename and + the corresponding value is a number indicating its alphabetical rank, + with single-letter keys sorted first, followed by double-letter keys. + + Args: + named_grp: Group with c in input pattern and d in out pattern. + all_matches: list of dicts, k=grps, v=match, last item=file name. + + Returns: + cat_index_dict: dict key=category name, value=index after sorting. + """ + logger.debug(f"letters_to_int() inputs: {named_grp}, {all_matches}") + + # Generate a set of unique matches for the given group + matches = {namedgrp_match_dict[named_grp] for namedgrp_match_dict in all_matches} + + # Sort with single-letter keys first, then double-letter keys + alphabetized_matches = sorted(matches, key=lambda x: (len(x) > 1, x)) + + # Create a dictionary mapping each match to its alphabetical rank + str_alphabetindex_dict = {match: i for i, match in enumerate(alphabetized_matches)} + + logger.debug(f"letters_to_int() returns {str_alphabetindex_dict}") + return str_alphabetindex_dict + + +def _prepare_file_matches( + inp_dir: pathlib.Path, + file_pattern: str, + out_file_pattern: str, + map_directory: bool | None = False, +) -> tuple[list[Any], list[str], str, bool, dict[str, Any] | None]: + """Validate inputs and prepare transformed file matches. + + Returns: + Tuple of (inp_files, fpaths, out_pattern_fstring, check_dir_var, map_dict) + """ + # Check if the directory is empty without creating a full list + file_count = sum(1 for _ in inp_dir.iterdir()) + if file_count == 0: + msg = f"Input directory is empty: {file_count} files found." + raise ValueError(msg) + + logger.info(f"Number of files found: {file_count}") + + recursive = bool(map_directory) + files = fp.FilePattern(inp_dir, file_pattern, recursive=recursive) + + if len(files) == 0: + msg = f"Please define filePattern: {file_pattern} again!" + raise ValueError(msg) + + inp_files: list[Any] = [file[0] for file in files()] + fpaths: list[str] = [file[1] for file in files()] + + # Integrate format strings into outFilePattern to specify digit/char len + out_pattern_fstring = specify_len(out_file_pattern) + + # List named groups where input pattern=char & output pattern=digit + char_to_digit_categories = get_char_to_digit_grps(file_pattern, out_file_pattern) + + # Convert numbers from strings to integers, if applicable + for i in range(len(inp_files)): + inp_files[i] = str_to_int(inp_files[i]) + + # Populate dict if any matches need to be converted from char to digit + # Key=named group, Value=Int representing matched chars + numbered = {grp: letters_to_int(grp, inp_files) for grp in char_to_digit_categories} + + # Check named groups that need c->d conversion + for named_grp in char_to_digit_categories: + for i in range(len(inp_files)): + if inp_files[i].get(named_grp): + #: Replace original matched letter with new digit + inp_files[i][named_grp] = numbered[named_grp][inp_files[i][named_grp]] + + # To create a dictionary mapping for folder names, + # The keys represent folder names and the values represent corresponding mappings. + check_dir_var = bool([d for d in inp_files if "directory" in list(d.keys())]) + map_dict = None + if map_directory: + if not check_dir_var: + logger.error("directory variable is not included in filepattern correctly") + else: + subdirs = sorted({d["directory"] for d in inp_files if d["directory"]}) + map_dict = dict(zip(subdirs, [f"d{i}" for i in range(1, len(subdirs) + 1)])) + + return inp_files, fpaths, out_pattern_fstring, check_dir_var, map_dict + + +class _ResolvePathArgs: + """Arguments for _resolve_output_path.""" + + def __init__( # noqa: PLR0913 + self, + match: dict[str, Any], + out_dir: pathlib.Path, + out_pattern_fstring: str, + check_dir_var: bool, + map_directory: bool | None, + map_dict: dict[str, Any] | None, + ) -> None: + self.match = match + self.out_dir = out_dir + self.out_pattern_fstring = out_pattern_fstring + self.check_dir_var = check_dir_var + self.map_directory = map_directory + self.map_dict = map_dict + + +def _resolve_output_path(args: _ResolvePathArgs) -> pathlib.Path | None: + """Resolve the output file path for a single file match.""" + # Apply str formatting to change digit or char length + out_name = args.out_pattern_fstring.format(**args.match) + + if not args.check_dir_var: + return pathlib.Path(args.out_dir, out_name) + + directory = args.match.get("directory") + if args.map_directory: + if not args.map_dict or directory not in args.map_dict: + logger.error(f"{directory} is not provided in filePattern") + return None + return pathlib.Path(args.out_dir, f"{args.map_dict[directory]}_{out_name}") + + return pathlib.Path(args.out_dir, f"{directory}_{out_name}") + + +def rename( + inp_dir: pathlib.Path, + out_dir: pathlib.Path, + file_pattern: str, + out_file_pattern: str, + map_directory: bool | None = False, +) -> None: + """Scalable Extraction of Nyxus Features. + + Args: + inp_dir : Path to image collection. + out_dir : Path to image collection storing copies of renamed files. + file_pattern : Input file pattern. + out_file_pattern : Output file pattern. + map_directory : Mapping of folder name. + """ + logger.info("Start renaming files") + + ( + inp_files, + fpaths, + out_pattern_fstring, + check_dir_var, + map_dict, + ) = _prepare_file_matches(inp_dir, file_pattern, out_file_pattern, map_directory) + + with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: + threads = [] + for match, p in zip(inp_files, fpaths): + try: + args = _ResolvePathArgs( + match, + out_dir, + out_pattern_fstring, + check_dir_var, + map_directory, + map_dict, + ) + out_path = _resolve_output_path(args) + if out_path is None: + continue + old_file_name = pathlib.Path(inp_dir, p[0]) + threads.append(executor.submit(shutil.copy2, old_file_name, out_path)) + except ValueError: + logger.error(f"filePattern:{file_pattern} is incorrectly defined!!!") + + for f in tqdm( + as_completed(threads), + total=len(threads), + mininterval=5, + desc="Renaming images", + initial=0, + unit_scale=True, + colour="cyan", + ): + f.result() diff --git a/formats/file-renaming-tool/src/polus/images/formats/file_renaming/py.typed b/formats/file-renaming-tool/src/polus/images/formats/file_renaming/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/formats/file-renaming-tool/tests/test_main.py b/formats/file-renaming-tool/tests/test_main.py index e9d981bda..6603186d0 100644 --- a/formats/file-renaming-tool/tests/test_main.py +++ b/formats/file-renaming-tool/tests/test_main.py @@ -1,41 +1,40 @@ """Testing of File Renaming.""" - import json import pathlib import shutil import tempfile -from typing import Any -from typing import DefaultDict -from typing import Tuple +from typing import cast + import click import pytest -import numpy as np +from polus.images.formats.file_renaming import filerenaming as fr +from polus.images.formats.file_renaming.__main__ import app from typer.testing import CliRunner -from polus.images.formats.file_renaming import file_renaming as fr -from polus.images.formats.file_renaming.__main__ import app as app - runner = CliRunner() class CreateData: """Generate tabular data with several different file format.""" - def __init__(self): + def __init__(self) -> None: """Define instance attributes.""" self.dirpath = pathlib.Path(__file__).parent self.jsonpath = self.dirpath.joinpath("file_rename_test.json") def input_directory(self) -> pathlib.Path: """Create temporary input directory.""" - return tempfile.mkdtemp(dir=self.dirpath) + return pathlib.Path(tempfile.mkdtemp(dir=self.dirpath)) def output_directory(self) -> pathlib.Path: """Create temporary output directory.""" - return tempfile.mkdtemp(dir=self.dirpath) + return pathlib.Path(tempfile.mkdtemp(dir=self.dirpath)) def runcommands( - self, inputs: pathlib.Path, inp_pattern: str, out_pattern: str + self, + inputs: list[str], + inp_pattern: str, + out_pattern: str, ) -> click.testing.Result: """Run command line arguments.""" inp_dir = self.input_directory() @@ -43,7 +42,7 @@ def runcommands( for inp in inputs: pathlib.Path.open(pathlib.Path(inp_dir, inp), "w").close() - outputs = runner.invoke( + return runner.invoke( app, [ "--inpDir", @@ -56,13 +55,12 @@ def runcommands( out_pattern, ], ) - return outputs - def load_json(self, x: str) -> DefaultDict[Any, Any]: + def load_json(self, x: str) -> list[str]: """Json file containing image filenames.""" with pathlib.Path.open(self.jsonpath) as file: data = json.load(file) - return data[x] + return list(data[x]) def clean_directories(self) -> None: """Remove files.""" @@ -130,72 +128,56 @@ def clean_directories(self) -> None: "0({mo:dd}-{day:dd})0({mo2:dd}-{day2:dd})-({a:d}-{b:d})-{col:ddd}.ome.tif", "0({mo:ddd}-{day:ddd})0{mo2:dd}-{day2:dd})-({a:dd}-{b:dd})-{col:ddd}.ome.tif", ), - ] + ], ] @pytest.fixture(params=fixture_params) -def poly(request: Tuple[str, str]) -> pytest.FixtureRequest: +def poly(request: pytest.FixtureRequest) -> list[tuple[str, str]]: """To get the parameter of the fixture.""" - return request.param - - -def test_duplicate_channels_to_digit(poly: pytest.FixtureRequest) -> None: - """Testing of duplicate channels to digits.""" - d = CreateData() - inputs = d.load_json("duplicate_channels_to_digit") - (inp_pattern, out_pattern) = poly[0] - outputs = d.runcommands(inputs, inp_pattern, out_pattern) - assert outputs.exit_code == 0 - - -def test_duplicate_channels_to_digit_non_spec_digit_len( - poly: pytest.FixtureRequest, -) -> None: - """Testing of duplicate channels to digits with non specified length of digits.""" - d = CreateData() - inputs = d.load_json("duplicate_channels_to_digit") - (inp_pattern, out_pattern) = poly[1] - outputs = d.runcommands(inputs, inp_pattern, out_pattern) - assert outputs.exit_code == 0 + return cast(list[tuple[str, str]], request.param) -def test_invalid_input_raises_error(poly: pytest.FixtureRequest) -> None: +def test_invalid_input_raises_error(poly: list[tuple[str, str]]) -> None: """Testing of invalid input filepattern.""" d = CreateData() inputs = d.load_json("duplicate_channels_to_digit") (inp_pattern, out_pattern) = poly[0] d.runcommands(inputs, inp_pattern, out_pattern) + d.clean_directories() -def test_non_alphanum_inputs_percentage_sign(poly: pytest.FixtureRequest) -> None: +def test_non_alphanum_inputs_percentage_sign(poly: list[tuple[str, str]]) -> None: """Testing of filename with non alphanumeric inputs such as percentage sign.""" d = CreateData() inputs = d.load_json("percentage_file") (inp_pattern, out_pattern) = poly[3] outputs = d.runcommands(inputs, inp_pattern, out_pattern) assert outputs.exit_code == 0 + d.clean_directories() -def test_numeric_fixed_width(poly: pytest.FixtureRequest) -> None: +def test_numeric_fixed_width(poly: list[tuple[str, str]]) -> None: """Testing of filename with numeric fixed length.""" d = CreateData() inputs = d.load_json("robot") (inp_pattern, out_pattern) = poly[4] outputs = d.runcommands(inputs, inp_pattern, out_pattern) assert outputs.exit_code == 0 + d.clean_directories() -def test_alphanumeric_fixed_width(poly: pytest.FixtureRequest) -> None: +def test_alphanumeric_fixed_width(poly: list[tuple[str, str]]) -> None: """Testing of filename with alphanumeric fixed length.""" d = CreateData() inputs = d.load_json("brain") (inp_pattern, out_pattern) = poly[5] outputs = d.runcommands(inputs, inp_pattern, out_pattern) assert outputs.exit_code == 0 + d.clean_directories() -def test_alphanumeric_variable_width(poly: pytest.FixtureRequest) -> None: +def test_alphanumeric_variable_width(poly: list[tuple[str, str]]) -> None: """Testing of filename with alphanumeric variable width.""" d = CreateData() inputs = d.load_json("variable") @@ -205,43 +187,37 @@ def test_alphanumeric_variable_width(poly: pytest.FixtureRequest) -> None: d.clean_directories() -def test_parenthesis(poly: pytest.FixtureRequest) -> None: - """Testing of filename with parenthesis.""" - d = CreateData() - inputs = d.load_json("parenthesis") - (inp_pattern, out_pattern) = poly[7] - outputs = d.runcommands(inputs, inp_pattern, out_pattern) - assert outputs.exit_code == 0 - - -def test_two_chan_to_digit(poly: pytest.FixtureRequest) -> None: +def test_two_chan_to_digit(poly: list[tuple[str, str]]) -> None: """Testing conversion of two channels to digits.""" d = CreateData() inputs = d.load_json("two_chan") (inp_pattern, out_pattern) = poly[8] outputs = d.runcommands(inputs, inp_pattern, out_pattern) assert outputs.exit_code == 0 + d.clean_directories() -def test_three_chan_to_digit(poly: pytest.FixtureRequest) -> None: +def test_three_chan_to_digit(poly: list[tuple[str, str]]) -> None: """Test conversion of three channels to digits.""" d = CreateData() inputs = d.load_json("three_chan") (inp_pattern, out_pattern) = poly[9] outputs = d.runcommands(inputs, inp_pattern, out_pattern) assert outputs.exit_code == 0 + d.clean_directories() -def test_three_char_chan(poly: pytest.FixtureRequest) -> None: +def test_three_char_chan(poly: list[tuple[str, str]]) -> None: """Test conversion of three character channels to digits.""" d = CreateData() inputs = d.load_json("three_char_chan") (inp_pattern, out_pattern) = poly[10] outputs = d.runcommands(inputs, inp_pattern, out_pattern) assert outputs.exit_code == 0 + d.clean_directories() -def test_varied_digits(poly: pytest.FixtureRequest) -> None: +def test_varied_digits(poly: list[tuple[str, str]]) -> None: """Test varied digits.""" d = CreateData() inputs = d.load_json("tissuenet-val-labels-45-C") @@ -251,16 +227,17 @@ def test_varied_digits(poly: pytest.FixtureRequest) -> None: d.clean_directories() -def test_spaces(poly: pytest.FixtureRequest) -> None: +def test_spaces(poly: list[tuple[str, str]]) -> None: """Test non-alphanumeric chars such as spaces.""" d = CreateData() inputs = d.load_json("non_alphanum_int") (inp_pattern, out_pattern) = poly[12] outputs = d.runcommands(inputs, inp_pattern, out_pattern) assert outputs.exit_code == 0 + d.clean_directories() -def test_non_alphanum_float(poly: pytest.FixtureRequest) -> None: +def test_non_alphanum_float(poly: list[tuple[str, str]]) -> None: """Test non-alphanumeric chars such as spaces, periods, commas, brackets.""" d = CreateData() inputs = d.load_json("non_alphanum_float") @@ -270,67 +247,6 @@ def test_non_alphanum_float(poly: pytest.FixtureRequest) -> None: d.clean_directories() -def test_dashes_parentheses(poly: pytest.FixtureRequest) -> None: - """Test non-alphanumeric chars are handled properly such as dashes, parenthesis.""" - d = CreateData() - inputs = d.load_json("kph-kirill") - (inp_pattern, out_pattern) = poly[14] - outputs = d.runcommands(inputs, inp_pattern, out_pattern) - assert outputs.exit_code == 0 - d.clean_directories() - - -def test_map_pattern_grps_to_regex_valid_input() -> None: - """Test of mapping input pattern.""" - test_cases = [ - ( - ("img_x{row:dd}_y{col:dd}_{channel:c+}.tif"), - ( - { - "row": "(?P[0-9][0-9])", - "col": "(?P[0-9][0-9])", - "channel": "(?P[a-zA-Z]+)", - } - ), - ), - (("img_x{row:c+}.tif"), ({"row": "(?P[a-zA-Z]+)"})), - ((""), ({})), - ] - for test_case in test_cases: - (from_val, to_val) = test_case - result = fr.map_pattern_grps_to_regex(from_val) - assert result == to_val - - -def test_convert_to_regex_valid_input() -> None: - """Test of converting to regular expression pattern.""" - test_cases = [ - ( - ("img_x{row:dd}_y{col:dd}_{channel:c+}.tif"), - ( - { - "row": "(?P[0-9][0-9])", - "col": "(?P[0-9][0-9])", - "channel": "(?P[a-zA-Z]+)", - } - ), - ( - "img_x(?P[0-9][0-9])_y(?P[0-9][0-9])_(?P[a-zA-Z]+).tif" - ), - ), - ( - ("img_x{row:c+}.tif"), - ({"row": "(?P[a-zA-Z]+)"}), - ("img_x(?P[a-zA-Z]+).tif"), - ), - (("img_x01.tif"), ({}), ("img_x01.tif")), - ] - for test_case in test_cases: - (from_val1, from_val2, to_val) = test_case - result = fr.convert_to_regex(from_val1, from_val2) - assert result == to_val - - def test_specify_len_valid_input() -> None: """Test of sepcifying length.""" test_cases = [ @@ -364,60 +280,6 @@ def test_get_char_to_digit_grps_returns_unique_keys_valid_input() -> None: assert result == to_val -def test_extract_named_grp_matches_valid_input() -> None: - """Test of extracting group names.""" - test_cases = [ - ( - ( - "img_x(?P[0-9][0-9])_y(?P[0-9][0-9])_(?P[a-zA-Z]+).tif" - ), - (["img_x01_y01_DAPI.tif", "img_x01_y01_GFP.tif", "img_x01_y01_TXRED.tif"]), - ( - [ - { - "row": "01", - "col": "01", - "channel": "DAPI", - "fname": "img_x01_y01_DAPI.tif", - }, - { - "row": "01", - "col": "01", - "channel": "GFP", - "fname": "img_x01_y01_GFP.tif", - }, - { - "row": "01", - "col": "01", - "channel": "TXRED", - "fname": "img_x01_y01_TXRED.tif", - }, - ] - ), - ), - (("img_x01.tif"), (["img_x01.tif"]), ([{"fname": "img_x01.tif"}])), - ] - for test_case in test_cases: - (from_val1, from_val2, to_val) = test_case - result = fr.extract_named_grp_matches(from_val1, from_val2) - assert result == to_val - - -def test_extract_named_grp_matches_bad_pattern_invalid_input_fails() -> None: - """Test of invalid input pattern.""" - test_cases = [ - ( - ("img_x(?P[a-zA-Z]+).tif"), - (["img_x01_y01_DAPI.tif", "img_x01_y01_GFP.tif", "img_x01_y01_TXRED.tif"]), - ) - ] - for test_case in test_cases: - (from_val1, from_val2) = test_case - - result = fr.extract_named_grp_matches(from_val1, from_val2) - assert len(result) == 0 - - def test_str_to_int_valid_input() -> None: """Test of string to integer.""" test_cases = [ @@ -489,7 +351,7 @@ def test_letters_to_int_returns_cat_index_dict_valid_input() -> None: }, ], ({"DAPI": 0, "GFP": 1, "TXRED": 2}), - ) + ), ] for test_case in test_cases: (from_val1, from_val2, to_val) = test_case @@ -497,28 +359,12 @@ def test_letters_to_int_returns_cat_index_dict_valid_input() -> None: assert result == to_val -@pytest.mark.xfail -def test_extract_named_grp_matches_duplicate_namedgrp_invalid_input() -> None: - """Test of invalid input pattern.""" - test_cases = [ - ( - ( - "x(?P[0-9][0-9])_y(?P[0-9][0-9])_c(?P[a-zA-Z]+).ome.tif" - ), - (["img_x01_y01_DAPI.tif", "img_x01_y01_GFP.tif", "img_x01_y01_TXRED.tif"]), - ) - ] - for test_case in test_cases: - (from_val1, from_val2) = test_case - fr.extract_named_grp_matches(from_val1, from_val2) - - @pytest.mark.xfail def test_letters_to_int_returns_error_invalid_input() -> None: """Test of invalid inputs.""" test_cases = [ ( - (2), + ("2"), [ { "row": 1, @@ -542,113 +388,62 @@ def test_letters_to_int_returns_error_invalid_input() -> None: @pytest.fixture -def create_subfolders() -> Tuple[pathlib.Path, str, str, str]: - """Creating directory and subdirectories.""" +def create_subfolders() -> tuple[pathlib.Path, str, str, str, str]: + """Create temporary input subfolders with test files.""" data = { "complex": [ - ["A9 p5d.tif", "A9 p5f.tif", "A9 p7f.tif"], - "96 ( -)* test_", - "{row:c}{col:d}.*p{f:d+}{character:c}.tif", - "x{row:dd}_y{col:dd}_p{f:dd}{character:c}_c01.tif", - ], - "simple": [ [ - "taoe005-u2os-72h-cp-a-au00044859_a01_s3_w23db644df-02ee-429d-9559-09cf4625c62b.tif", - "taoe005-u2os-72h-cp-a-au00044859_b01_s3_w3add254c8-0c7b-4cf0-a5dc-bf0cf8de8cec.tif", - "taoe005-u2os-72h-cp-a-au00044859_b07_s5_w2da098211-f7c1-453d-954f-b7d4751f6daa.tif", - "taoe005-u2os-72h-cp-a-au00044859_c15_s2_w3aea523fa-3b89-46a7-95e3-604017151895.tif", + "AS_09125_050118150001_A03f00d0.tif", + "AS_09125_050118150001_A03f01d0.tif", + "AS_09125_050118150001_A03f02d0.tif", + "AS_09125_050118150001_A03f03d0.tif", + "AS_09125_050118150001_A03f04d0.tif", ], - "folder_", - ".*_{row:c}{col:dd}_s{s:d}_w{channel:d}.*.tif", - "x{row:dd}_y{col:dd}_p{s:dd}_c{channel:d}.tif", + "BBBC/BBBC001/raw/Images/human_ht29_colon_cancer_1_images", + "(?P.*)/AS_09125_050118150001_{row:c}{col:dd}f{f:dd}d{channel:d}.tif", + "x{row:dd}_y{col:dd}_p{f:dd}{channel:d}_c01.tif", + "True", ], } - for name in ["complex", "simple"]: - d = CreateData() - dir_path = d.input_directory() - for i in range(5): - dirname = pathlib.Path(dir_path, f"{data[name][1]}{i}") - pathlib.Path(dirname).mkdir(exist_ok=False, parents=False) - for fl in data[name][0]: - temp_file = pathlib.Path.open(pathlib.Path(dirname, fl), "w") - temp_file.close() - - return pathlib.Path(dir_path), data[name][1], data[name][2], data[name][3] - - -def test_recursive_searching_files() -> None: - """Test recursive searching of files nested directories.""" - - dir_path = tempfile.mkdtemp(dir=pathlib.Path.cwd()) - out_dir = tempfile.mkdtemp(dir=pathlib.Path.cwd()) - for i in range(2): - dirname1 = "image_folder_" - dirname2 = "groundtruth_folder_" - dirname1 = pathlib.Path(dir_path, f"BBBC/BBBC001/Images/{dirname1}{i}") - dirname2 = pathlib.Path(dir_path, f"BBBC/BBBC001/Groundtruth/{dirname2}{i}") - pathlib.Path(dirname1).mkdir(exist_ok=False, parents=True) - pathlib.Path(dirname2).mkdir(exist_ok=False, parents=True) - - flist = [ - "AS_09125_050118150001_A03f00d0.tif", - "AS_09125_050118150001_A03f01d0.tif", - "AS_09125_050118150001_A03f02d0.tif", - "AS_09125_050118150001_A03f03d0.tif", - "AS_09125_050118150001_A03f04d0.tif", - "AS_09125_050118150001_A03f05d0.tif", - ] - - for fl in flist: - temp_file = pathlib.Path.open(pathlib.Path(dirname1, fl), "w") - temp_file = pathlib.Path.open(pathlib.Path(dirname2, fl), "w") + name = "complex" + d = CreateData() + dir_path = d.input_directory() + for i in range(1): + dirname = pathlib.Path(dir_path, f"{data[name][1]}_{i}") + if not pathlib.Path(dirname).exists(): + pathlib.Path(dirname).mkdir(parents=True, exist_ok=True) + for fl in data[name][0]: + temp_file = pathlib.Path.open(pathlib.Path(dirname, fl), "w") temp_file.close() - file_pattern = ".*_{row:c}{col:dd}f{f:dd}d{channel:d}.tif" - out_file_pattern = "x{row:dd}_y{col:dd}_p{f:dd}_c{channel:d}.tif" - map_directory = "raw" - - runner.invoke( - app, - [ - "--inpDir", - dir_path, - "--filePattern", - file_pattern, - "--outDir", - out_dir, - "--outFilePattern", - out_file_pattern, - "--mapDirectory", - map_directory, - ], + + return ( + pathlib.Path(dir_path), + str(data[name][1]), + str(data[name][2]), + str(data[name][3]), + str(data[name][4]), ) - assert list( - np.unique([p.name.split("_")[0] for p in pathlib.Path(out_dir).iterdir()]) - ) == ["groundtruth", "image"] - shutil.rmtree(dir_path) - shutil.rmtree(out_dir) -def test_cli(create_subfolders: pytest.FixtureRequest) -> None: +def test_cli(create_subfolders: tuple[pathlib.Path, str, str, str, str]) -> None: """Test Cli.""" - dir_path, _, file_pattern, out_file_pattern = create_subfolders - for i in ["raw", "map"]: - d = CreateData() - out_dir = d.output_directory() - result = runner.invoke( - app, - [ - "--inpDir", - dir_path, - "--filePattern", - file_pattern, - "--outDir", - out_dir, - "--outFilePattern", - out_file_pattern, - "--mapDirectory", - i, - ], - ) - assert result.exit_code == 0 + dir_path, _, file_pattern, out_file_pattern, _ = create_subfolders + + d = CreateData() + out_dir = d.output_directory() + params = [ + "--inpDir", + str(dir_path), + "--filePattern", + file_pattern, + "--outDir", + str(out_dir), + "--outFilePattern", + out_file_pattern, + "--mapDirectory", + ] + + result = runner.invoke(app, params) + assert result.exit_code == 0 d.clean_directories()