From 99935bef9409178491dc57defe2e7bd2ad005a00 Mon Sep 17 00:00:00 2001 From: Jane Van Lam <75lam@cua.edu> Date: Tue, 10 Mar 2026 23:55:53 -0400 Subject: [PATCH 1/3] update all packages, Use python3.12, pass all unit tests and pre-commit --- transforms/tabular-merger-tool/.gitignore | 1 + transforms/tabular-merger-tool/pyproject.toml | 63 ++++++++++++++----- 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/transforms/tabular-merger-tool/.gitignore b/transforms/tabular-merger-tool/.gitignore index a07072c..bfc1bb6 100644 --- a/transforms/tabular-merger-tool/.gitignore +++ b/transforms/tabular-merger-tool/.gitignore @@ -177,3 +177,4 @@ src/polus/plugins/_plugins/manifests/* #husky node_modules +uv.lock diff --git a/transforms/tabular-merger-tool/pyproject.toml b/transforms/tabular-merger-tool/pyproject.toml index 831096e..8a6cf7f 100644 --- a/transforms/tabular-merger-tool/pyproject.toml +++ b/transforms/tabular-merger-tool/pyproject.toml @@ -9,26 +9,59 @@ authors = [ readme = "README.md" packages = [{include = "polus", from = "src"}] -[tool.poetry.dependencies] -python = ">=3.9" -typer = "^0.7.0" -blake3 = ">=0.3,<0.4" -llvmlite = ">=0.43,<0.44" -vaex = "^4.17.0" -tqdm = "^4.65.0" -filepattern = "^2.0.5" -pyarrow = ">=16.0,<17.0" -numba = ">=0.60,<0.61" +[project] +name = "polus-tabular-transforms-tabular-merger" +version = "0.1.4-dev0" +description = "Merge vaex supported tabular file format into a single merged file." +readme = "README.md" +requires-python = ">=3.12,<3.13" +dependencies = [ + "typer>=0.24.0", + "blake3>=1.0.0", + "llvmlite>=0.43,<0.47", + "vaex>=4.19.0", + "tqdm>=4.67.0", + "filepattern>=2.1.0", + "pyarrow>=16.0,<24.0", + "numba>=0.60,<0.65", +] +[project.optional-dependencies] +dev = [ + "bump2version>=1.0.1", + "pre-commit>=4.5.0", + "black>=26.3.0", + "flake8>=7.3.0", + "mypy>=1.19.0", + "pytest>=9.0.0", + "ruff>=0.8.0", +] + +[tool.poetry.dependencies] +python = ">=3.12,<3.13" +typer = "^0.24.0" +blake3 = "^1.0.0" +llvmlite = ">=0.43,<0.47" +vaex = "^4.19.0" +tqdm = "^4.67.0" +filepattern = "^2.1.0" +pyarrow = ">=16.0,<24.0" +numba = ">=0.60,<0.65" [tool.poetry.group.dev.dependencies] bump2version = "^1.0.1" -pre-commit = "^3.1.0" -black = "^23.1.0" -flake8 = "^6.0.0" -mypy = "^1.0.1" -pytest = "^7.2.1" +pre-commit = "^4.5.0" +black = "^26.3.0" +flake8 = "^7.3.0" +mypy = "^1.19.0" +pytest = "^9.0.0" +ruff = "^0.8.0" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + +[tool.pytest.ini_options] +filterwarnings = [ + "ignore:invalid escape sequence:SyntaxWarning", +] From 4d4ad2c17a4f3d96267f458642da50743ff48786 Mon Sep 17 00:00:00 2001 From: Jane Van Lam <75lam@cua.edu> Date: Thu, 12 Mar 2026 14:01:17 -0400 Subject: [PATCH 2/3] update tabular_merger.py and tests_main.py to work with vaex, pandas to support cp313 --- transforms/tabular-merger-tool/pyproject.toml | 20 +-- .../tabular_merger/tabular_merger.py | 143 +++++++++++++++++- .../tabular-merger-tool/tests/test_main.py | 34 +++-- 3 files changed, 172 insertions(+), 25 deletions(-) diff --git a/transforms/tabular-merger-tool/pyproject.toml b/transforms/tabular-merger-tool/pyproject.toml index 8a6cf7f..891fa39 100644 --- a/transforms/tabular-merger-tool/pyproject.toml +++ b/transforms/tabular-merger-tool/pyproject.toml @@ -14,16 +14,17 @@ name = "polus-tabular-transforms-tabular-merger" version = "0.1.4-dev0" description = "Merge vaex supported tabular file format into a single merged file." readme = "README.md" -requires-python = ">=3.12,<3.13" +requires-python = ">=3.12,<3.14" dependencies = [ "typer>=0.24.0", "blake3>=1.0.0", - "llvmlite>=0.43,<0.47", - "vaex>=4.19.0", + "llvmlite>=0.46,<0.47", "tqdm>=4.67.0", "filepattern>=2.1.0", "pyarrow>=16.0,<24.0", - "numba>=0.60,<0.65", + "numba>=0.61,<0.65", + "pandas>=2.0.0", + "vaex<=4.19.0; python_version < \"3.13\"", ] [project.optional-dependencies] @@ -38,15 +39,16 @@ dev = [ ] [tool.poetry.dependencies] -python = ">=3.12,<3.13" -typer = "^0.24.0" +python = ">=3.12,<3.14" +typer = "^0.23.0" blake3 = "^1.0.0" -llvmlite = ">=0.43,<0.47" -vaex = "^4.19.0" +llvmlite = ">=0.46,<0.47" +vaex = {version = "<=4.19.0", python = "<3.13"} tqdm = "^4.67.0" filepattern = "^2.1.0" pyarrow = ">=16.0,<24.0" -numba = ">=0.60,<0.65" +numba = ">=0.61,<0.65" +pandas = ">=2.0.0" [tool.poetry.group.dev.dependencies] bump2version = "^1.0.1" diff --git a/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py b/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py index 18c653c..719fbfd 100644 --- a/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py +++ b/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py @@ -4,13 +4,22 @@ import logging import os import pathlib +import sys from collections import Counter -from typing import Optional +from typing import Any, Optional import numpy as np -import vaex from tqdm import tqdm +try: + import vaex +except ImportError: + vaex = None # type: ignore[assignment] + +import pandas as pd + +_use_pandas = vaex is None or sys.version_info >= (3, 13) + logger = logging.getLogger(__name__) logger.setLevel(os.environ.get("POLUS_LOG", logging.INFO)) POLUS_TAB_EXT = os.environ.get("POLUS_TAB_EXT", ".arrow") @@ -24,9 +33,7 @@ class Dimensions(str, enum.Enum): Default = "rows" -def sorted_dataframe_list( - x: list[vaex.dataframe.DataFrameLocal], -) -> list[vaex.dataframe.DataFrameLocal]: +def sorted_dataframe_list(x: list[Any]) -> list[Any]: """Reordering of list of dataframes based on the size. Args: @@ -71,6 +78,125 @@ def remove_files(curr_dir: pathlib.Path) -> None: f.unlink() +def _load_file_pandas(in_file: pathlib.Path) -> pd.DataFrame: + """Load a single file into a pandas DataFrame.""" + p = pathlib.Path(in_file) + if p.suffix == ".csv": + return pd.read_csv(p) + if p.suffix == ".parquet": + return pd.read_parquet(p) + if p.suffix in (".feather", ".arrow"): + return pd.read_feather(p) + if p.suffix == ".hdf5": + return pd.read_hdf(p) + raise ValueError(f"Unsupported file format: {p.suffix}") + + +def _sorted_dataframe_list_pandas(dfs: list[pd.DataFrame]) -> list[pd.DataFrame]: + """Reorder list of DataFrames by size (largest first) for column merge; ties get lower keys.""" + sizes = [len(d) for d in dfs] + size_by_idx = list(sizes) + for s in set(sizes): + indices_with_s = [i for i in range(len(dfs)) if size_by_idx[i] == s] + if len(indices_with_s) > 1: + for j, i in enumerate(indices_with_s): + size_by_idx[i] = s - j # tie-break so largest size still sorts first + ordered_indices = sorted(range(len(dfs)), key=lambda i: size_by_idx[i], reverse=True) + return [dfs[i] for i in ordered_indices] + + +def _merge_files_pandas( + inp_dir_files: list, + strip_extension: bool, + dim: Dimensions, + same_rows: Optional[bool], + same_columns: Optional[bool], + map_var: Optional[str], + out_path: pathlib.Path, + curr_dir: pathlib.Path, +) -> None: + """Pandas-based merge used when vaex is not available (e.g. Python 3.13).""" + if dim == "columns" and same_rows: + logger.info("Merging data with identical number of rows...") + dfs: list[pd.DataFrame] = [] + headers: list[list[str]] = [] + for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"): + df = _load_file_pandas(in_file).copy() + df = df.rename(columns={c: in_file.stem + "_" + c for c in df.columns}) + headers.append(list(df.columns)) + dfs.append(df) + common = set(headers[0]).intersection(*[set(h) for h in headers[1:]]) + if len(common) != 0: + raise ValueError("Duplicated column names in dataframes") + df_final = dfs[0] + for right in dfs[1:]: + df_final = df_final.join(right, how="left") + df_final.to_feather(out_path) + + elif dim == "columns" and not same_rows: + if not map_var: + raise ValueError(f"mapVar name should be defined {map_var}") + dfs = [] + headers = [] + for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"): + df = _load_file_pandas(in_file).copy() + idx_col = [str(i) + "_" + str(p) for i, p in enumerate(df[map_var])] + df["indexcolumn"] = idx_col + rename = { + c: in_file.stem + "_" + c + for c in df.columns + if c not in (map_var, "indexcolumn") + } + rename[map_var] = in_file.stem + "_" + map_var + df = df.rename(columns=rename) + headers.append(list(df.columns)) + dfs.append(df) + dfs = _sorted_dataframe_list_pandas(dfs) + common = set(headers[0]).intersection(*[set(h) for h in headers[1:]]) + if len(common) != 1 or "indexcolumn" not in common: + raise ValueError("Duplicated column names in dataframes") + df_final = dfs[0] + for right in dfs[1:]: + df_final = df_final.merge(right, on="indexcolumn", how="left") + df_final.to_feather(out_path) + + elif dim == "rows" and same_columns: + logger.info("Getting all common headers in input files...") + all_headers: list[list[str]] = [] + for in_file in inp_dir_files: + df = _load_file_pandas(in_file) + all_headers.append(list(df.columns)) + headers = list(set(all_headers[0]).intersection(*all_headers)) + logger.info("Merging the data along rows...") + dfs = [] + for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"): + df = _load_file_pandas(in_file)[headers].copy() + outname = in_file.stem if strip_extension else in_file.name + df["file"] = outname + cols = ["file"] + [c for c in df.columns if c != "file"] + df = df[cols] + dfs.append(df) + df_final = pd.concat(dfs, ignore_index=True) + df_final.to_feather(out_path) + + else: + logger.info("Merging the data along rows...") + dfs = [] + for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"): + df = _load_file_pandas(in_file).copy() + if "file" in df.columns: + df = df.drop(columns=["file"]) + outname = in_file.stem if strip_extension else in_file.name + df["file"] = outname + cols = ["file"] + [c for c in df.columns if c != "file"] + df = df[cols] + dfs.append(df) + df_final = pd.concat(dfs, ignore_index=True) + df_final.to_feather(out_path) + + remove_files(curr_dir) + + def merge_files( # noqa: PLR0915 PLR0912 PLR0913 C901 inp_dir_files: list, strip_extension: bool, @@ -98,10 +224,15 @@ def merge_files( # noqa: PLR0915 PLR0912 PLR0913 C901 map_var: Variable Name used to join file column wise. out_dir:Path to output directory """ - # Generate the path to the output file out_path = pathlib.Path(out_dir).joinpath(f"merged{POLUS_TAB_EXT}") curr_dir = pathlib.Path(".").cwd() + if _use_pandas: + _merge_files_pandas( + inp_dir_files, strip_extension, dim, same_rows, same_columns, map_var, out_path, curr_dir + ) + return + # Case One: If merging by columns and have same number of rows: if dim == "columns" and same_rows: logger.info("Merging data with identical number of rows...") diff --git a/transforms/tabular-merger-tool/tests/test_main.py b/transforms/tabular-merger-tool/tests/test_main.py index bf05f04..9f26675 100644 --- a/transforms/tabular-merger-tool/tests/test_main.py +++ b/transforms/tabular-merger-tool/tests/test_main.py @@ -7,7 +7,14 @@ import numpy as np import pandas as pd import pytest -import vaex + +try: + import vaex + HAS_VAEX = True +except ImportError: + HAS_VAEX = False + vaex = None # type: ignore[assignment] + from polus.tabular.transforms.tabular_merger import tabular_merger as tm @@ -85,7 +92,8 @@ def arrow_func(self) -> None: self.df.to_feather(pathlib.Path(self.inp_dir, self.out_name)) def hdf_func(self) -> None: - """Convert pandas dataframe to hdf5 file format.""" + """Convert pandas dataframe to hdf5 file format (requires vaex; skipped on Python 3.13).""" + assert HAS_VAEX, "HDF5 test data requires vaex (not used on Python 3.13)" v_df = vaex.from_pandas(self.df, copy_index=False) v_df.export(pathlib.Path(self.inp_dir, self.out_name)) @@ -109,7 +117,13 @@ def clean_directories(self) -> None: f.unlink() -FILE_EXT = [[".hdf5", ".parquet", ".csv", ".feather", ".arrow"]] +# On Python 3.13 vaex is not installed; skip .hdf5 so we don't need vaex to create test data +FILE_EXT = [[".hdf5", ".parquet", ".csv", ".feather", ".arrow"]] if HAS_VAEX else [[".parquet", ".csv", ".feather", ".arrow"]] + + +def open_merged(path: pathlib.Path) -> pd.DataFrame: + """Read merged .arrow output so tests work with or without vaex (e.g. on Python 3.13).""" + return pd.read_feather(path) @pytest.fixture(params=FILE_EXT) @@ -141,7 +155,7 @@ def test_mergingfiles_row_wise_samerows(poly: list[str]) -> None: ) outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] - merged = vaex.open(outfile) + merged = open_merged(outfile) assert len(merged["file"].unique()) == 3 d1.clean_directories() @@ -178,7 +192,7 @@ def test_mergingfiles_row_wise_unequalrows(poly: list[str]) -> None: out_dir=d1.get_out_dir(), ) outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] - merged = vaex.open(outfile) + merged = open_merged(outfile) assert len(merged["file"].unique()) == 3 assert merged.shape[0] > 300 d1.clean_directories() @@ -206,8 +220,8 @@ def test_mergingfiles_column_wise_equalrows(poly: list[str]) -> None: out_dir=d1.get_out_dir(), ) outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] - merged = vaex.open(outfile) - assert len(merged.get_column_names()) == 12 + merged = open_merged(outfile) + assert len(merged.columns) == 12 assert merged.shape[0] == 100 d1.clean_directories() @@ -239,8 +253,8 @@ def test_mergingfiles_column_wise_unequalrows(poly: list[str]) -> None: out_dir=d1.get_out_dir(), ) outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] - merged = vaex.open(outfile) - assert len(merged.get_column_names()) == 13 - assert "indexcolumn" in merged.get_column_names() + merged = open_merged(outfile) + assert len(merged.columns) == 13 + assert "indexcolumn" in merged.columns assert merged.shape[0] == 200 d1.clean_directories() From 5060889c96632a66cb4a3241da89a778e5753529 Mon Sep 17 00:00:00 2001 From: Jane Van Lam <75lam@cua.edu> Date: Thu, 12 Mar 2026 15:12:40 -0400 Subject: [PATCH 3/3] =?UTF-8?q?Bump=20version:=200.1.4-dev0=20=E2=86=92=20?= =?UTF-8?q?0.1.5-dev0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tabular-merger-tool/.bumpversion.cfg | 2 +- transforms/tabular-merger-tool/Dockerfile | 13 +++--- transforms/tabular-merger-tool/README.md | 2 +- transforms/tabular-merger-tool/VERSION | 2 +- transforms/tabular-merger-tool/plugin.json | 4 +- transforms/tabular-merger-tool/pyproject.toml | 4 +- .../transforms/tabular_merger/__init__.py | 2 +- .../tabular_merger/tabular_merger.py | 44 ++++++++++++++----- .../tabular-merger-tool/tests/test_main.py | 7 ++- 9 files changed, 55 insertions(+), 25 deletions(-) diff --git a/transforms/tabular-merger-tool/.bumpversion.cfg b/transforms/tabular-merger-tool/.bumpversion.cfg index 7629ec0..f4f0a99 100644 --- a/transforms/tabular-merger-tool/.bumpversion.cfg +++ b/transforms/tabular-merger-tool/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.1.4-dev0 +current_version = 0.1.5-dev0 commit = True tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-(?P[a-z]+)(?P\d+))? diff --git a/transforms/tabular-merger-tool/Dockerfile b/transforms/tabular-merger-tool/Dockerfile index 84a7dfe..8ce933a 100755 --- a/transforms/tabular-merger-tool/Dockerfile +++ b/transforms/tabular-merger-tool/Dockerfile @@ -1,4 +1,4 @@ -FROM polusai/bfio:2.3.6 +FROM python:3.13-slim # environment variables defined in polusai/bfio ENV EXEC_DIR="/opt/executables" @@ -11,13 +11,16 @@ WORKDIR ${EXEC_DIR} # TODO: Change the tool_dir to the tool directory ENV TOOL_DIR="transforms/tabular-merger-tool" +# bfio/Bioformats needs a JVM (Debian Trixie has openjdk-21, not 17) +RUN apt-get update && apt-get install -y --no-install-recommends \ + openjdk-21-jre-headless \ + && rm -rf /var/lib/apt/lists/* -# Copy the repository into the container -RUN mkdir tabular-tools -COPY . ${EXEC_DIR}/tabular-tools +# Build context = repo root. Copy only this tool. +COPY transforms/tabular-merger-tool /opt/executables/app # Install the tool -RUN pip3 install "${EXEC_DIR}/tabular-tools/${TOOL_DIR}" --no-cache-dir +RUN pip install --no-cache-dir /opt/executables/app # Set the entrypoint # TODO: Change the entrypoint to the tool entrypoint diff --git a/transforms/tabular-merger-tool/README.md b/transforms/tabular-merger-tool/README.md index 29d3f68..64d8861 100644 --- a/transforms/tabular-merger-tool/README.md +++ b/transforms/tabular-merger-tool/README.md @@ -1,4 +1,4 @@ -# Tabular Merger (v0.1.4-dev0) +# Tabular Merger (v0.1.5-dev0) This WIPP plugin merges all tabular files with vaex supported file formats into a combined file using either row or column merging. diff --git a/transforms/tabular-merger-tool/VERSION b/transforms/tabular-merger-tool/VERSION index 197c2b5..731bd39 100644 --- a/transforms/tabular-merger-tool/VERSION +++ b/transforms/tabular-merger-tool/VERSION @@ -1 +1 @@ -0.1.4-dev0 +0.1.5-dev0 diff --git a/transforms/tabular-merger-tool/plugin.json b/transforms/tabular-merger-tool/plugin.json index eaf6023..a6c1ffc 100644 --- a/transforms/tabular-merger-tool/plugin.json +++ b/transforms/tabular-merger-tool/plugin.json @@ -1,6 +1,6 @@ { "name": "Tabular Merger", - "version": "0.1.4-dev0", + "version": "0.1.5-dev0", "title": "Tabular Merger", "description": "Merge vaex supported tabular file format into a single merged file.", "author": "Nicholas Schaub (nick.schaub@nih.gov), Hamdah Shafqat Abbasi (hamdahshafqat.abbasi@nih.gov)", @@ -8,7 +8,7 @@ "repository": "https://github.com/PolusAI/tabular-tools", "website": "https://ncats.nih.gov/preclinical/core/informatics", "citation": "", - "containerId": "polusai/tabular-merger-tool:0.1.4-dev0", + "containerId": "polusai/tabular-merger-tool:0.1.5-dev0", "baseCommand": [ "python3", "-m", diff --git a/transforms/tabular-merger-tool/pyproject.toml b/transforms/tabular-merger-tool/pyproject.toml index 891fa39..eb8c957 100644 --- a/transforms/tabular-merger-tool/pyproject.toml +++ b/transforms/tabular-merger-tool/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "polus-tabular-transforms-tabular-merger" -version = "0.1.4-dev0" +version = "0.1.5-dev0" description = "Merge vaex supported tabular file format into a single merged file." authors = [ "Nick Schaub ", @@ -11,7 +11,7 @@ packages = [{include = "polus", from = "src"}] [project] name = "polus-tabular-transforms-tabular-merger" -version = "0.1.4-dev0" +version = "0.1.5-dev0" description = "Merge vaex supported tabular file format into a single merged file." readme = "README.md" requires-python = ">=3.12,<3.14" diff --git a/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/__init__.py b/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/__init__.py index e7e0cdb..bffc062 100644 --- a/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/__init__.py +++ b/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/__init__.py @@ -1,4 +1,4 @@ """Tabular Merger.""" -__version__ = "0.1.4-dev0" +__version__ = "0.1.5-dev0" from . import tabular_merger diff --git a/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py b/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py index 719fbfd..720647b 100644 --- a/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py +++ b/transforms/tabular-merger-tool/src/polus/tabular/transforms/tabular_merger/tabular_merger.py @@ -6,7 +6,8 @@ import pathlib import sys from collections import Counter -from typing import Any, Optional +from typing import Any +from typing import Optional import numpy as np from tqdm import tqdm @@ -89,11 +90,15 @@ def _load_file_pandas(in_file: pathlib.Path) -> pd.DataFrame: return pd.read_feather(p) if p.suffix == ".hdf5": return pd.read_hdf(p) - raise ValueError(f"Unsupported file format: {p.suffix}") + msg = f"Unsupported file format: {p.suffix}" + raise ValueError(msg) def _sorted_dataframe_list_pandas(dfs: list[pd.DataFrame]) -> list[pd.DataFrame]: - """Reorder list of DataFrames by size (largest first) for column merge; ties get lower keys.""" + """Reorder list of DataFrames by size (largest first). + + For column merge; ties get lower keys. + """ sizes = [len(d) for d in dfs] size_by_idx = list(sizes) for s in set(sizes): @@ -101,11 +106,15 @@ def _sorted_dataframe_list_pandas(dfs: list[pd.DataFrame]) -> list[pd.DataFrame] if len(indices_with_s) > 1: for j, i in enumerate(indices_with_s): size_by_idx[i] = s - j # tie-break so largest size still sorts first - ordered_indices = sorted(range(len(dfs)), key=lambda i: size_by_idx[i], reverse=True) + ordered_indices = sorted( + range(len(dfs)), + key=lambda i: size_by_idx[i], + reverse=True, + ) return [dfs[i] for i in ordered_indices] -def _merge_files_pandas( +def _merge_files_pandas( # noqa: C901, PLR0912, PLR0913, PLR0915 inp_dir_files: list, strip_extension: bool, dim: Dimensions, @@ -127,7 +136,8 @@ def _merge_files_pandas( dfs.append(df) common = set(headers[0]).intersection(*[set(h) for h in headers[1:]]) if len(common) != 0: - raise ValueError("Duplicated column names in dataframes") + msg = "Duplicated column names in dataframes" + raise ValueError(msg) df_final = dfs[0] for right in dfs[1:]: df_final = df_final.join(right, how="left") @@ -135,7 +145,8 @@ def _merge_files_pandas( elif dim == "columns" and not same_rows: if not map_var: - raise ValueError(f"mapVar name should be defined {map_var}") + msg = f"mapVar name should be defined {map_var}" + raise ValueError(msg) dfs = [] headers = [] for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"): @@ -154,7 +165,8 @@ def _merge_files_pandas( dfs = _sorted_dataframe_list_pandas(dfs) common = set(headers[0]).intersection(*[set(h) for h in headers[1:]]) if len(common) != 1 or "indexcolumn" not in common: - raise ValueError("Duplicated column names in dataframes") + msg = "Duplicated column names in dataframes" + raise ValueError(msg) df_final = dfs[0] for right in dfs[1:]: df_final = df_final.merge(right, on="indexcolumn", how="left") @@ -166,11 +178,14 @@ def _merge_files_pandas( for in_file in inp_dir_files: df = _load_file_pandas(in_file) all_headers.append(list(df.columns)) - headers = list(set(all_headers[0]).intersection(*all_headers)) + common_headers = set(all_headers[0]).intersection( + *[set(h) for h in all_headers[1:]], + ) + common_header_list: list[str] = list(common_headers) logger.info("Merging the data along rows...") dfs = [] for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"): - df = _load_file_pandas(in_file)[headers].copy() + df = _load_file_pandas(in_file)[common_header_list].copy() outname = in_file.stem if strip_extension else in_file.name df["file"] = outname cols = ["file"] + [c for c in df.columns if c != "file"] @@ -229,7 +244,14 @@ def merge_files( # noqa: PLR0915 PLR0912 PLR0913 C901 if _use_pandas: _merge_files_pandas( - inp_dir_files, strip_extension, dim, same_rows, same_columns, map_var, out_path, curr_dir + inp_dir_files, + strip_extension, + dim, + same_rows, + same_columns, + map_var, + out_path, + curr_dir, ) return diff --git a/transforms/tabular-merger-tool/tests/test_main.py b/transforms/tabular-merger-tool/tests/test_main.py index 9f26675..fd35573 100644 --- a/transforms/tabular-merger-tool/tests/test_main.py +++ b/transforms/tabular-merger-tool/tests/test_main.py @@ -10,6 +10,7 @@ try: import vaex + HAS_VAEX = True except ImportError: HAS_VAEX = False @@ -118,7 +119,11 @@ def clean_directories(self) -> None: # On Python 3.13 vaex is not installed; skip .hdf5 so we don't need vaex to create test data -FILE_EXT = [[".hdf5", ".parquet", ".csv", ".feather", ".arrow"]] if HAS_VAEX else [[".parquet", ".csv", ".feather", ".arrow"]] +FILE_EXT = ( + [[".hdf5", ".parquet", ".csv", ".feather", ".arrow"]] + if HAS_VAEX + else [[".parquet", ".csv", ".feather", ".arrow"]] +) def open_merged(path: pathlib.Path) -> pd.DataFrame: