Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion transforms/tabular-merger-tool/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.4-dev0
current_version = 0.1.5-dev0
commit = True
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-(?P<release>[a-z]+)(?P<dev>\d+))?
Expand Down
1 change: 1 addition & 0 deletions transforms/tabular-merger-tool/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,4 @@ src/polus/plugins/_plugins/manifests/*

#husky
node_modules
uv.lock
13 changes: 8 additions & 5 deletions transforms/tabular-merger-tool/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM polusai/bfio:2.3.6
FROM python:3.13-slim

# environment variables defined in polusai/bfio
ENV EXEC_DIR="/opt/executables"
Expand All @@ -11,13 +11,16 @@ WORKDIR ${EXEC_DIR}

# TODO: Change the tool_dir to the tool directory
ENV TOOL_DIR="transforms/tabular-merger-tool"
# bfio/Bioformats needs a JVM (Debian Trixie has openjdk-21, not 17)
RUN apt-get update && apt-get install -y --no-install-recommends \
openjdk-21-jre-headless \
&& rm -rf /var/lib/apt/lists/*

# Copy the repository into the container
RUN mkdir tabular-tools
COPY . ${EXEC_DIR}/tabular-tools
# Build context = repo root. Copy only this tool.
COPY transforms/tabular-merger-tool /opt/executables/app

# Install the tool
RUN pip3 install "${EXEC_DIR}/tabular-tools/${TOOL_DIR}" --no-cache-dir
RUN pip install --no-cache-dir /opt/executables/app

# Set the entrypoint
# TODO: Change the entrypoint to the tool entrypoint
Expand Down
2 changes: 1 addition & 1 deletion transforms/tabular-merger-tool/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Tabular Merger (v0.1.4-dev0)
# Tabular Merger (v0.1.5-dev0)

This WIPP plugin merges all tabular files with vaex supported file formats into a combined file using either row or column merging.

Expand Down
2 changes: 1 addition & 1 deletion transforms/tabular-merger-tool/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.4-dev0
0.1.5-dev0
4 changes: 2 additions & 2 deletions transforms/tabular-merger-tool/plugin.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"name": "Tabular Merger",
"version": "0.1.4-dev0",
"version": "0.1.5-dev0",
"title": "Tabular Merger",
"description": "Merge vaex supported tabular file format into a single merged file.",
"author": "Nicholas Schaub (nick.schaub@nih.gov), Hamdah Shafqat Abbasi (hamdahshafqat.abbasi@nih.gov)",
"institution": "National Center for Advancing Translational Sciences, National Institutes of Health",
"repository": "https://github.com/PolusAI/tabular-tools",
"website": "https://ncats.nih.gov/preclinical/core/informatics",
"citation": "",
"containerId": "polusai/tabular-merger-tool:0.1.4-dev0",
"containerId": "polusai/tabular-merger-tool:0.1.5-dev0",
"baseCommand": [
"python3",
"-m",
Expand Down
67 changes: 51 additions & 16 deletions transforms/tabular-merger-tool/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "polus-tabular-transforms-tabular-merger"
version = "0.1.4-dev0"
version = "0.1.5-dev0"
description = "Merge vaex supported tabular file format into a single merged file."
authors = [
"Nick Schaub <nick.schaub@nih.gov>",
Expand All @@ -9,26 +9,61 @@ authors = [
readme = "README.md"
packages = [{include = "polus", from = "src"}]

[tool.poetry.dependencies]
python = ">=3.9"
typer = "^0.7.0"
blake3 = ">=0.3,<0.4"
llvmlite = ">=0.43,<0.44"
vaex = "^4.17.0"
tqdm = "^4.65.0"
filepattern = "^2.0.5"
pyarrow = ">=16.0,<17.0"
numba = ">=0.60,<0.61"
[project]
name = "polus-tabular-transforms-tabular-merger"
version = "0.1.5-dev0"
description = "Merge vaex supported tabular file format into a single merged file."
readme = "README.md"
requires-python = ">=3.12,<3.14"
dependencies = [
"typer>=0.24.0",
"blake3>=1.0.0",
"llvmlite>=0.46,<0.47",
"tqdm>=4.67.0",
"filepattern>=2.1.0",
"pyarrow>=16.0,<24.0",
"numba>=0.61,<0.65",
"pandas>=2.0.0",
"vaex<=4.19.0; python_version < \"3.13\"",
]

[project.optional-dependencies]
dev = [
"bump2version>=1.0.1",
"pre-commit>=4.5.0",
"black>=26.3.0",
"flake8>=7.3.0",
"mypy>=1.19.0",
"pytest>=9.0.0",
"ruff>=0.8.0",
]

[tool.poetry.dependencies]
python = ">=3.12,<3.14"
typer = "^0.23.0"
blake3 = "^1.0.0"
llvmlite = ">=0.46,<0.47"
vaex = {version = "<=4.19.0", python = "<3.13"}
tqdm = "^4.67.0"
filepattern = "^2.1.0"
pyarrow = ">=16.0,<24.0"
numba = ">=0.61,<0.65"
pandas = ">=2.0.0"

[tool.poetry.group.dev.dependencies]
bump2version = "^1.0.1"
pre-commit = "^3.1.0"
black = "^23.1.0"
flake8 = "^6.0.0"
mypy = "^1.0.1"
pytest = "^7.2.1"
pre-commit = "^4.5.0"
black = "^26.3.0"
flake8 = "^7.3.0"
mypy = "^1.19.0"
pytest = "^9.0.0"
ruff = "^0.8.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
filterwarnings = [
"ignore:invalid escape sequence:SyntaxWarning",
]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Tabular Merger."""
__version__ = "0.1.4-dev0"
__version__ = "0.1.5-dev0"

from . import tabular_merger
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,23 @@
import logging
import os
import pathlib
import sys
from collections import Counter
from typing import Any
from typing import Optional

import numpy as np
import vaex
from tqdm import tqdm

try:
import vaex
except ImportError:
vaex = None # type: ignore[assignment]

import pandas as pd

_use_pandas = vaex is None or sys.version_info >= (3, 13)

logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("POLUS_LOG", logging.INFO))
POLUS_TAB_EXT = os.environ.get("POLUS_TAB_EXT", ".arrow")
Expand All @@ -24,9 +34,7 @@ class Dimensions(str, enum.Enum):
Default = "rows"


def sorted_dataframe_list(
x: list[vaex.dataframe.DataFrameLocal],
) -> list[vaex.dataframe.DataFrameLocal]:
def sorted_dataframe_list(x: list[Any]) -> list[Any]:
"""Reordering of list of dataframes based on the size.

Args:
Expand Down Expand Up @@ -71,6 +79,139 @@ def remove_files(curr_dir: pathlib.Path) -> None:
f.unlink()


def _load_file_pandas(in_file: pathlib.Path) -> pd.DataFrame:
"""Load a single file into a pandas DataFrame."""
p = pathlib.Path(in_file)
if p.suffix == ".csv":
return pd.read_csv(p)
if p.suffix == ".parquet":
return pd.read_parquet(p)
if p.suffix in (".feather", ".arrow"):
return pd.read_feather(p)
if p.suffix == ".hdf5":
return pd.read_hdf(p)
msg = f"Unsupported file format: {p.suffix}"
raise ValueError(msg)


def _sorted_dataframe_list_pandas(dfs: list[pd.DataFrame]) -> list[pd.DataFrame]:
"""Reorder list of DataFrames by size (largest first).

For column merge; ties get lower keys.
"""
sizes = [len(d) for d in dfs]
size_by_idx = list(sizes)
for s in set(sizes):
indices_with_s = [i for i in range(len(dfs)) if size_by_idx[i] == s]
if len(indices_with_s) > 1:
for j, i in enumerate(indices_with_s):
size_by_idx[i] = s - j # tie-break so largest size still sorts first
ordered_indices = sorted(
range(len(dfs)),
key=lambda i: size_by_idx[i],
reverse=True,
)
return [dfs[i] for i in ordered_indices]


def _merge_files_pandas( # noqa: C901, PLR0912, PLR0913, PLR0915
inp_dir_files: list,
strip_extension: bool,
dim: Dimensions,
same_rows: Optional[bool],
same_columns: Optional[bool],
map_var: Optional[str],
out_path: pathlib.Path,
curr_dir: pathlib.Path,
) -> None:
"""Pandas-based merge used when vaex is not available (e.g. Python 3.13)."""
if dim == "columns" and same_rows:
logger.info("Merging data with identical number of rows...")
dfs: list[pd.DataFrame] = []
headers: list[list[str]] = []
for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"):
df = _load_file_pandas(in_file).copy()
df = df.rename(columns={c: in_file.stem + "_" + c for c in df.columns})
headers.append(list(df.columns))
dfs.append(df)
common = set(headers[0]).intersection(*[set(h) for h in headers[1:]])
if len(common) != 0:
msg = "Duplicated column names in dataframes"
raise ValueError(msg)
df_final = dfs[0]
for right in dfs[1:]:
df_final = df_final.join(right, how="left")
df_final.to_feather(out_path)

elif dim == "columns" and not same_rows:
if not map_var:
msg = f"mapVar name should be defined {map_var}"
raise ValueError(msg)
dfs = []
headers = []
for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"):
df = _load_file_pandas(in_file).copy()
idx_col = [str(i) + "_" + str(p) for i, p in enumerate(df[map_var])]
df["indexcolumn"] = idx_col
rename = {
c: in_file.stem + "_" + c
for c in df.columns
if c not in (map_var, "indexcolumn")
}
rename[map_var] = in_file.stem + "_" + map_var
df = df.rename(columns=rename)
headers.append(list(df.columns))
dfs.append(df)
dfs = _sorted_dataframe_list_pandas(dfs)
common = set(headers[0]).intersection(*[set(h) for h in headers[1:]])
if len(common) != 1 or "indexcolumn" not in common:
msg = "Duplicated column names in dataframes"
raise ValueError(msg)
df_final = dfs[0]
for right in dfs[1:]:
df_final = df_final.merge(right, on="indexcolumn", how="left")
df_final.to_feather(out_path)

elif dim == "rows" and same_columns:
logger.info("Getting all common headers in input files...")
all_headers: list[list[str]] = []
for in_file in inp_dir_files:
df = _load_file_pandas(in_file)
all_headers.append(list(df.columns))
common_headers = set(all_headers[0]).intersection(
*[set(h) for h in all_headers[1:]],
)
common_header_list: list[str] = list(common_headers)
logger.info("Merging the data along rows...")
dfs = []
for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"):
df = _load_file_pandas(in_file)[common_header_list].copy()
outname = in_file.stem if strip_extension else in_file.name
df["file"] = outname
cols = ["file"] + [c for c in df.columns if c != "file"]
df = df[cols]
dfs.append(df)
df_final = pd.concat(dfs, ignore_index=True)
df_final.to_feather(out_path)

else:
logger.info("Merging the data along rows...")
dfs = []
for in_file in tqdm(inp_dir_files, total=len(inp_dir_files), desc="Loading"):
df = _load_file_pandas(in_file).copy()
if "file" in df.columns:
df = df.drop(columns=["file"])
outname = in_file.stem if strip_extension else in_file.name
df["file"] = outname
cols = ["file"] + [c for c in df.columns if c != "file"]
df = df[cols]
dfs.append(df)
df_final = pd.concat(dfs, ignore_index=True)
df_final.to_feather(out_path)

remove_files(curr_dir)


def merge_files( # noqa: PLR0915 PLR0912 PLR0913 C901
inp_dir_files: list,
strip_extension: bool,
Expand Down Expand Up @@ -98,10 +239,22 @@ def merge_files( # noqa: PLR0915 PLR0912 PLR0913 C901
map_var: Variable Name used to join file column wise.
out_dir:Path to output directory
"""
# Generate the path to the output file
out_path = pathlib.Path(out_dir).joinpath(f"merged{POLUS_TAB_EXT}")
curr_dir = pathlib.Path(".").cwd()

if _use_pandas:
_merge_files_pandas(
inp_dir_files,
strip_extension,
dim,
same_rows,
same_columns,
map_var,
out_path,
curr_dir,
)
return

# Case One: If merging by columns and have same number of rows:
if dim == "columns" and same_rows:
logger.info("Merging data with identical number of rows...")
Expand Down
Loading