diff --git a/docs/source/conf.py b/docs/source/conf.py index 5b6da12..7b3dd39 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -21,7 +21,7 @@ # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration extensions = [ - "sphinx.ext.napoleon", + "sphinx.ext.napoleon", "sphinx.ext.intersphinx", "sphinx.ext.viewcode", "sphinx.ext.doctest", @@ -30,7 +30,7 @@ "sphinx_design", "sphinxcontrib.autodoc_pydantic", "sphinx_tabs.tabs", - "sphinx_copybutton", + "sphinx_copybutton", "enum_tools.autoenum", ] diff --git a/docs/source/local_get_structure.md b/docs/source/local_get_structure.md new file mode 100644 index 0000000..d34c9cd --- /dev/null +++ b/docs/source/local_get_structure.md @@ -0,0 +1,115 @@ +# Inspecting Local File Structure + +:::{admonition} You Will Learn: +:class: note +- How to inspect the branch structure of local ROOT files with `local_get_structure()` +- How to filter branches, print output, and save results to a text file +- How to get an Awkward Array type representation of the file structure +::: + +This page describes `local_get_structure()`, a utility that reads the TTree and branch structure of local ROOT files directly — without going through ServiceX or a container backend. + +## Overview + +`local_get_structure()` is useful when you want to quickly explore a ROOT file before writing a query. It reads the file structure using `uproot` and formats the output as a readable tree summary showing each TTree, its branches, and their dtypes. + +## Basic Usage + +Pass one or more file paths and a LocalX `Config` object: + +```python +from servicex_local import local_get_structure, xAODConfig + +config = xAODConfig(release=25) + +result = local_get_structure("path/to/file.root", config) +print(result) +``` + +The return value is a formatted string. Example output: + +``` +--------------------------- +📁 Sample: path/to/file.root +--------------------------- + +File Metadata â„šī¸ : + +No FileMetaData found in dataset. + +--------------------------- + +File structure with branch filter đŸŒŋ '': + + +đŸŒŗ Tree: background + ├── Branches: + │ ├── branch1 ; dtype: AsDtype('>f8') + │ ├── branch2 ; dtype: AsDtype('>f8') + +đŸŒŗ Tree: signal + ├── Branches: + │ ├── branch1 ; dtype: AsDtype('>f8') +``` + +## Dataset Input Formats + +`local_get_structure()` accepts the same input formats as `local_deliver()`: + +| Input type | Behaviour | +|---|---| +| `str` | Single file path. The path is used as the sample name. | +| `list[str]` | Multiple file paths, each used as its own sample name. | +| `dict` | Maps custom sample names to file paths: `{"my_sample": "path/to/file.root"}`. | + +```python +# Multiple files +result = local_get_structure(["file1.root", "file2.root"], config) + +# Custom sample names +result = local_get_structure({"signal": "sig.root", "background": "bkg.root"}, config) +``` + +## Filtering Branches + +Use the `filter_branch` keyword argument to show only branches whose names contain a given string: + +```python +result = local_get_structure("file.root", config, filter_branch="Electron") +``` + +Only branches with `"Electron"` in their name will appear in the output. + +## Printing Directly + +Pass `do_print=True` to print the structure to the terminal instead of returning a string: + +```python +local_get_structure("file.root", config, do_print=True) +``` + +When `do_print=True`, the function returns `None`. + +## Saving to a File + +Pass `save_to_txt=True` to write the output to `samples_structure.txt` in the current directory: + +```python +local_get_structure("file.root", config, save_to_txt=True) +``` + +The function returns the message `"File structure saved to 'samples_structure.txt'."` when this option is used. + +## Getting an Array Type Representation + +Pass `array_out=True` to get an Awkward Array type object instead of the formatted string. This returns a dictionary mapping each sample name to an `ak.Array` type that mirrors the TTree structure with correct field names and dtypes: + +```python +types = local_get_structure("file.root", config, array_out=True) +``` + +This is useful for verifying that branch names and types match what a query expects before running through ServiceX. + +:::{note} +`array_out=True` and `save_to_txt=True` / `do_print=True` are mutually exclusive. When `array_out=True` is set, the formatting keyword arguments are ignored. +::: diff --git a/samples_structure.txt b/samples_structure.txt new file mode 100644 index 0000000..6337833 --- /dev/null +++ b/samples_structure.txt @@ -0,0 +1,22 @@ + +--------------------------- +📁 Sample: test_file +--------------------------- + +File Metadata â„šī¸ : + +No FileMetaData found in dataset. + +--------------------------- + +File structure with branch filter đŸŒŋ '': + + +đŸŒŗ Tree: background + ├── Branches: + │ ├── branch1 ; dtype: AsDtype('>f8') + │ ├── branch2 ; dtype: AsDtype('>f8') + +đŸŒŗ Tree: signal + ├── Branches: + │ ├── branch1 ; dtype: AsDtype('>f8') \ No newline at end of file diff --git a/servicex_local/__init__.py b/servicex_local/__init__.py index de1877b..9320442 100644 --- a/servicex_local/__init__.py +++ b/servicex_local/__init__.py @@ -1,7 +1,3 @@ -from .science_images import WSL2ScienceImage, DockerScienceImage # noqa: F401 -from .science_images import SingularityScienceImage # noqa: F401 -from .codegen import LocalXAODCodegen, DockerCodegen # noqa: F401 -from .adaptor import SXLocalAdaptor # noqa: F401 from .deliver import local_deliver # noqa: F401 -from .configurations import xAODConfig # noqa: F401 -from .utils import install_sx_local, Platform # noqa: F401 +from .configurations import xAODConfig, Platform, Config # noqa: F401 +from .utils import local_get_structure # noqa: F401 diff --git a/servicex_local/configurations.py b/servicex_local/configurations.py index 654b588..ab20913 100644 --- a/servicex_local/configurations.py +++ b/servicex_local/configurations.py @@ -4,7 +4,15 @@ import urllib.request from dataclasses import dataclass from typing import TYPE_CHECKING, Union -from .utils import Platform +from enum import Enum + + +class Platform(Enum): + """Options for which platform to use for the runtime environment.""" + + docker = "docker" + singularity = "singularity" + wsl2 = "wsl2" if TYPE_CHECKING: diff --git a/servicex_local/deliver.py b/servicex_local/deliver.py index ea98b14..fc82089 100644 --- a/servicex_local/deliver.py +++ b/servicex_local/deliver.py @@ -3,7 +3,7 @@ import logging from datetime import datetime from pathlib import Path -from typing import Any, Generator, List +from typing import Any, Generator, List, Union, Mapping from deprecated import deprecated from make_it_sync import make_sync @@ -12,15 +12,111 @@ from servicex.models import ResultFormat, TransformRequest, TransformStatus from servicex.query_core import QueryStringGenerator from servicex.servicex_client import GuardList +from servicex.yaml_parser import YAML -from servicex_local import SXLocalAdaptor -from servicex_local.adaptor import MinioLocalAdaptor +from .adaptor import SXLocalAdaptor, MinioLocalAdaptor +from .codegen import LocalXAODCodegen +from .configurations import Config, Platform +from servicex_analysis_utils import to_awk -from .configurations import Config +logger = logging.getLogger(__name__) -from servicex_local.utils import install_sx_local -from servicex_local.utils import Platform as _SxPlatform -from servicex_analysis_utils import to_awk + +def _load_ServiceXSpec( + config: Union[ServiceXSpec, Mapping[str, Any], str, Path], +) -> ServiceXSpec: + if isinstance(config, Mapping): + logger.debug("Config from dictionary") + config = ServiceXSpec(**config) + elif isinstance(config, ServiceXSpec): + logger.debug("Config from ServiceXSpec") + elif isinstance(config, str) or isinstance(config, Path): + logger.debug("Config from file") + + if isinstance(config, str): + file_path = Path(config) + else: + file_path = config + + import sys + + yaml = YAML() + + if sys.version_info < (3, 10): + from importlib_metadata import entry_points + else: + from importlib.metadata import entry_points + + plugins = entry_points(group="servicex.query") + for _ in plugins: + yaml.register_class(_.load()) + plugins = entry_points(group="servicex.dataset") + for _ in plugins: + yaml.register_class(_.load()) + + conf = yaml.load(file_path) + config = ServiceXSpec(**conf) + else: + raise TypeError(f"Unknown config type: {type(config)}") + + return config + + +def install_sx_local( + image: str, platform: Platform = Platform.docker, host_port: int = 5001 +): + """Set up a local ServiceX endpoint for data transformation. + + Args: + image (str): Image name for the container. + platform (Platform): Which platform to use. + host_port (int): Local host port to expose. + + Returns: + Tuple[str, SXLocalAdaptor]: Codegen name, adaptor. + """ + from servicex.configuration import Configuration + + try: + sx_cfg = Configuration.read() + cache_dir = Path(sx_cfg.cache_path).resolve() + except NameError: + import tempfile + + cache_dir = Path(tempfile.mkdtemp()).resolve() + logging.warning( + "Could not read a ServiceX.yaml. Using temporary directory %s for cache.", + cache_dir, + ) + + codegen = LocalXAODCodegen() + + if platform == Platform.docker: + from .science_images import DockerScienceImage + + science_runner = DockerScienceImage(image) + + elif platform == Platform.singularity: + from .science_images import SingularityScienceImage + + science_runner = SingularityScienceImage(image) + + elif platform == Platform.wsl2: + from .science_images import WSL2ScienceImage + + container, release = image.split(":") + science_runner = WSL2ScienceImage(container, release) + + else: + raise ValueError(f"Unknown platform {platform}") + + adaptor = SXLocalAdaptor( + codegen, science_runner, cache_dir, f"http://localhost:{host_port}" + ) + + logging.info(f"Using local ServiceX endpoint: {codegen}") + logging.info(f"Cache being save to; {adaptor.cache_dir}") + return adaptor def _sample_run_info( @@ -117,7 +213,7 @@ def _save_cache(cache: dict[str, Any], cache_dir: Path) -> None: async def deliver_async( - spec: ServiceXSpec, + spec: Union[ServiceXSpec, Mapping[str, Any], str, Path], adaptor: SXLocalAdaptor, ignore_local_cache: bool = False, display_progress: bool = True, @@ -144,7 +240,9 @@ async def deliver_async( results: dict[str, GuardList] = {} cache = _load_cache(adaptor.cache_dir) # Load cache from file system - all_tqs = list(_sample_run_info(spec.General, spec.Sample)) + config = _load_ServiceXSpec(spec) + + all_tqs = list(_sample_run_info(config.General, config.Sample)) total_files = sum(len(tq.file_list or []) for tq in all_tqs) with ExpandableProgress(display_progress=display_progress) as progress: @@ -218,7 +316,7 @@ async def deliver_async( def local_deliver( - spec: ServiceXSpec, + spec: Union[ServiceXSpec, Mapping[str, Any], str, Path], config: Config, display_progress: bool = True, ): @@ -230,7 +328,7 @@ def local_deliver( image = f"docker://{_DOCKER_IMAGE}:{config.version}" else: image = f"{_DOCKER_IMAGE}:{config.version}" - sx_platform = _SxPlatform(config.platform.value) + sx_platform = Platform(config.platform.value) adaptor = install_sx_local(image, sx_platform) sx_result = _deliver_sync( diff --git a/servicex_local/utils.py b/servicex_local/utils.py index b09617c..11a6ed4 100644 --- a/servicex_local/utils.py +++ b/servicex_local/utils.py @@ -1,69 +1,334 @@ import logging -from enum import Enum -from pathlib import Path +from .configurations import Config +from servicex import query, dataset +import uproot +import numpy as np +import awkward as ak +import json +from servicex.dataset_identifier import DataSetIdentifier -class Platform(Enum): - """Options for which platform to use for the runtime environment.""" +def run_query( + input_filenames, +): + import uproot + import awkward as ak + import json - docker = "docker" - singularity = "singularity" - wsl2 = "wsl2" + def is_tree(obj): + """Helper to check if a root file item is TTree.""" + if hasattr(obj, "classname"): + cls_attr = obj.classname + cls_value = cls_attr() if callable(cls_attr) else cls_attr + return "TTree" in cls_value + elif hasattr(obj, "classnames"): + cls_attr = obj.classnames + cls_values = cls_attr() if callable(cls_attr) else cls_attr + return any("TTree" in cls for cls in cls_values) + return False + """ + Opens a ROOT file and returns a JSON-formatted string describing the structure, + encoded inside an ak.Array for ServiceX. + """ + tree_dict = {} -def install_sx_local( - image: str, platform: Platform = Platform.docker, host_port: int = 5001 -): - """Set up a local ServiceX endpoint for data transformation. + with uproot.open(input_filenames) as file: + + for tree_name in file.keys(): + tree_name_clean = tree_name.rstrip(";1") + tree = file[tree_name] + + if not is_tree(tree): + continue + + if tree_name_clean == "MetaData": + fm_branches = [ + b for b in tree.keys() if b.startswith("FileMetaDataAuxDyn.") + ] + # remove the prefix in keys + meta_dict = { + p[19:]: str(tree[p].array(library="ak")[0]) for p in fm_branches + } + tree_dict["FileMetaData"] = meta_dict + + branch_dict = {} + for branch_name, branch in tree.items(): + branch_type = str(branch.interpretation) + branch_dict[branch_name] = branch_type + + tree_dict[tree_name_clean] = branch_dict - Args: - image (str): Image name for the container. - platform (Platform): Which platform to use. - host_port (int): Local host port to expose. + # Serialize tree_dict to JSON string + json_str = json.dumps(tree_dict) + + # Return JSON string wrapped in an awkward array + return ak.Array([json_str]) + + +def build_deliver_spec(datasets): + """ + Helper to build the servicex.deliver configuration. + Supports multiple inputs for multiple sample queries. + + Parameters: + datasets (str, [str], dict, DataSetIdentifier): Rucio DIDs (str) or DataSetIdentifier. + If dict, custom names can be given per dataset Returns: - Tuple[str, SXLocalAdaptor]: Codegen name, adaptor. + spec_python (dict): The specification for the python function query containing + Name, Query, Dataset, NFiles """ - from servicex_local import LocalXAODCodegen, SXLocalAdaptor - from servicex.configuration import Configuration + # Servicex query using the PythonFunction backend + query_PythonFunction = query.PythonFunction().with_uproot_function(run_query) + + # Create a dict with sample name for ServiceX query & datasetID + dataset_dict = {} + user_in = type(datasets) + + # TODO: Add processing of the URL to make sure the it exists + + user_in = type(datasets) + + # Rucio DID as str + if user_in == str: + dataset_dict.update({datasets: datasets}) # Use dataset ID as sample name + elif user_in == list and type(datasets[0]) is str: + for ds in datasets: + dataset_dict.update({ds: ds}) + elif user_in == dict: # Custom sample names + dataset_dict = datasets + # Single DataSetIdentifier object + elif isinstance(datasets, DataSetIdentifier): + dataset_dict.update({"Dataset": datasets}) + else: + raise ValueError( + f"Unsupported dataset input type: {user_in}.\n" + "Input must be str or list of str of Rucio DIDs, " + "a DataSetIdentifier object or a dict ('sample_name':'dataset_id')" + ) + + sample_list = [ + { + "NFiles": 1, + "Name": name, + "Dataset": did if isinstance(did, DataSetIdentifier) else dataset.Rucio(did), + "Query": query_PythonFunction, + } + for name, did in dataset_dict.items() + ] + spec_python = {"Sample": sample_list} + + return spec_python - try: - sx_cfg = Configuration.read() - cache_dir = Path(sx_cfg.cache_path).resolve() - except NameError: - import tempfile - cache_dir = Path(tempfile.mkdtemp()).resolve() +def open_delivered_file(sample, path): + """ + Opens the first file delivered by ServiceX for a given sample and returns the + structure encoded in the "servicex/branch" branch. + If no files are found, logs a warning and returns None. + Parameters: + sample (str): The sample name for which to open the file. + path (list): List of file paths delivered by ServiceX for the sample. + """ + + if not path: logging.warning( - "Could not read a ServiceX.yaml. Using temporary directory %s for cache.", - cache_dir, + f"Warning: No files found for sample '{sample}' in delivered results. Skipping." ) + return None + + try: + with uproot.open(path[0]) as f: + return f["servicex"]["branch"].array()[0] + except Exception as e: + logging.error(f"Error opening file for sample '{sample}': {e}") + return None - codegen = LocalXAODCodegen() - if platform == Platform.docker: - from servicex_local import DockerScienceImage +def print_structure_from_str( + json_by_sample, filter_branch="", save_to_txt=False, do_print=False +): + """ + Re-formats the JSON structure string from ServiceX into a readable summary. - science_runner = DockerScienceImage(image) + Parameters: + json_by_sample (dict): mapping of sample names to JSON structure strings. + filter_branch (str): If provided, only branches containing this string are included. + save_to_txt (bool): If True, saves output to a text file instead of returning it. + do_print (bool): If True, prints the output to the terminal and returns None. - elif platform == Platform.singularity: - from servicex_local import SingularityScienceImage + Returns: + result_str (str): The formatted file structure. + """ + output_lines = [] - science_runner = SingularityScienceImage(image) + for sample_name, structure_str in json_by_sample.items(): + if structure_str is None: + continue + if isinstance(structure_str, list): + structure_str = open_delivered_file(sample_name, structure_str) + if structure_str is None: + continue + structure_dict = json.loads(structure_str) - elif platform == Platform.wsl2: - from servicex_local import WSL2ScienceImage + output_lines.append( + "\n---------------------------\n" + f"\U0001f4c1 Sample: {sample_name}\n" + "---------------------------" + ) + + # Get the metadata first + output_lines.append("\nFile Metadata \u2139\ufe0f :\n") + if "FileMetaData" not in structure_dict: + output_lines.append("No FileMetaData found in dataset.") + else: + for key, value in structure_dict.get("FileMetaData", {}).items(): + output_lines.append(f"── {key}: {value}") + output_lines.append("\n---------------------------") - container, release = image.split(":") - science_runner = WSL2ScienceImage(container, release) + # drop the File metadata from the trees + structure_dict.pop("FileMetaData", {}) + + output_lines.append( + f"\nFile structure with branch filter \U0001f33f '{filter_branch}':\n" + ) + for tree_name, branches in structure_dict.items(): + output_lines.append(f"\n\U0001f333 Tree: {tree_name}") + output_lines.append(" ├── Branches:") + for branch_name, dtype in branches.items(): + if filter_branch and filter_branch not in branch_name: + continue + output_lines.append(f" │ ├── {branch_name} ; dtype: {dtype}") + + result_str = "\n".join(output_lines).encode("utf-8").decode("utf-8") + + if save_to_txt: + with open("samples_structure.txt", "w", encoding="utf-8") as f: + f.write(result_str) + return "File structure saved to 'samples_structure.txt'." + elif do_print: + print(result_str) + return + else: + return result_str + + +def parse_jagged_depth_and_dtype(dtype_str): + """ + Helper to decode the dtype str for each branch. + + Parses uproot-style interpretation strings such as: + - "AsJagged(AsJagged(AsDtype('>f4')))" + + Returns the number of nested layers and the inner dtype. + Used in str_to_array to reconstruct the ak.array. + + Parameters: + dtype_str (str): The dtype part of a branch info str; from the delivered file structure. + + Returns: + int, str: jagged_depth, base_numpy_dtype_str or None if not recognized. + """ + depth = 0 + current = dtype_str.strip() + + # Count how many nested AsJagged(...) wrappers exist + while current.startswith("AsJagged("): + depth += 1 + current = current[ + len("AsJagged("):-1 + ].strip() # Strip outermost wrapper, up to -1 to remove ) + + # Extract the base dtype string from AsDtype('') + if current.startswith("AsDtype('") and current.endswith("')"): + base_dtype = current[len("AsDtype('"):-2] + return depth, base_dtype else: - raise ValueError(f"Unknown platform {platform}") + return depth, None + + +def str_to_array(encoded_json_str): + """ + Helper to reconstruct ak.Arrays from a JSON-formatted file-structure string. + Returns an array mimicking TTrees and TBranches with correct field names and dtypes. + + Parameters: + encoded_json_str (str): JSON string from run_query. + + Returns: + ak.Array: An array containing a dictionary of trees with branch structures + and dummy typed values. + """ + reconstructed_data = {} + structure_dict = json.loads(encoded_json_str) + # drop the File metadata from the trees + structure_dict.pop("FileMetaData", {}) + + for treename, branch_dict in structure_dict.items(): + branches = {} + + for branch_name, dtype_str in branch_dict.items(): + # Get jagged depth and numpy base dtype + depth, base_dtype_str = parse_jagged_depth_and_dtype(dtype_str) + if base_dtype_str is None: + branches[branch_name] = None + continue + + try: + np_dtype = np.dtype(base_dtype_str) + except TypeError: + branches[branch_name] = None + continue + + dummy = np_dtype.type(0) + for _ in range(depth): + dummy = [dummy] + + branches[branch_name] = ak.Array([dummy]) + + if branches: + reconstructed_data[treename] = ak.Array([branches]) + + return ak.Array(reconstructed_data).type + + +def local_get_structure(datasets, config: Config, array_out=False, **kwargs): + """ + Utility function. + Reads the structure of local ROOT files directly using uproot. + Calls print_structure_from_str() to dump the structure in a user-friendly format. + + Parameters: + datasets (dict,str,[str]): The datasets from which to print the file structures. + A custom sample name per dataset can be given in a dict form: + {'sample_name':'file_path'} + kwargs : Arguments to be propagated to print_structure_from_str + """ + user_in = type(datasets) + dataset_dict = {} + if user_in == str: + dataset_dict[datasets] = datasets + elif user_in == list and type(datasets[0]) is str: + for ds in datasets: + dataset_dict[ds] = ds + elif user_in == dict: + dataset_dict = datasets + elif isinstance(datasets, DataSetIdentifier): + dataset_dict["Dataset"] = datasets + else: + raise ValueError( + f"Unsupported dataset input type: {user_in}.\n" + "Input must be str or list of str of file paths, " + "a DataSetIdentifier object or a dict ('sample_name':'file_path')" + ) - adaptor = SXLocalAdaptor( - codegen, science_runner, cache_dir, f"http://localhost:{host_port}" - ) + json_by_sample = {} + for sample_name, file_path in dataset_dict.items(): + result = run_query(file_path) + json_by_sample[sample_name] = str(result[0]) - logging.info(f"Using local ServiceX endpoint: {codegen}") - logging.info(f"Cache being save to; {adaptor.cache_dir}") - return adaptor + if array_out: + return {name: str_to_array(s) for name, s in json_by_sample.items()} + return print_structure_from_str(json_by_sample, **kwargs) diff --git a/tests/data/expected_metadata.txt b/tests/data/expected_metadata.txt new file mode 100644 index 0000000..4efd138 --- /dev/null +++ b/tests/data/expected_metadata.txt @@ -0,0 +1,19 @@ + +--------------------------- +📁 Sample: test_file +--------------------------- + +File Metadata â„šī¸ : + +── test_100: 100 +── test_abc: abc + +--------------------------- + +File structure with branch filter đŸŒŋ '': + + +đŸŒŗ Tree: MetaData + ├── Branches: + │ ├── FileMetaDataAuxDyn.test_100 ; dtype: AsDtype('>i8') + │ ├── FileMetaDataAuxDyn.test_abc ; dtype: AsStrings() \ No newline at end of file diff --git a/tests/data/expected_structure.txt b/tests/data/expected_structure.txt new file mode 100644 index 0000000..6337833 --- /dev/null +++ b/tests/data/expected_structure.txt @@ -0,0 +1,22 @@ + +--------------------------- +📁 Sample: test_file +--------------------------- + +File Metadata â„šī¸ : + +No FileMetaData found in dataset. + +--------------------------- + +File structure with branch filter đŸŒŋ '': + + +đŸŒŗ Tree: background + ├── Branches: + │ ├── branch1 ; dtype: AsDtype('>f8') + │ ├── branch2 ; dtype: AsDtype('>f8') + +đŸŒŗ Tree: signal + ├── Branches: + │ ├── branch1 ; dtype: AsDtype('>f8') \ No newline at end of file diff --git a/tests/test_adaptor.py b/tests/test_adaptor.py index ffdcefc..ad74a62 100644 --- a/tests/test_adaptor.py +++ b/tests/test_adaptor.py @@ -9,8 +9,7 @@ from servicex import ResultDestination from servicex.models import ResultFormat, Status, TransformRequest, TransformStatus -from servicex_local import SXLocalAdaptor -from servicex_local.adaptor import MinioLocalAdaptor +from servicex_local.adaptor import MinioLocalAdaptor, SXLocalAdaptor def test_adaptor_url(): diff --git a/tests/test_codegen.py b/tests/test_codegen.py index 984d95c..b5da057 100644 --- a/tests/test_codegen.py +++ b/tests/test_codegen.py @@ -1,9 +1,7 @@ import pytest -from servicex_local import DockerCodegen +from servicex_local.codegen import DockerCodegen, LocalXAODCodegen from pathlib import Path -from servicex_local import LocalXAODCodegen - def test_docker_codegen_xaod(tmp_path, request): "Do a basic func_adl uproot code generation from an official docker image" diff --git a/tests/test_deliver.py b/tests/test_deliver.py index 0787066..ebc1f48 100644 --- a/tests/test_deliver.py +++ b/tests/test_deliver.py @@ -5,6 +5,7 @@ from datetime import datetime from pathlib import Path from unittest.mock import patch +from unittest.mock import MagicMock import pytest from servicex import General, Sample, ServiceXSpec, dataset @@ -18,7 +19,13 @@ from servicex_local import local_deliver from servicex_local.configurations import Config -from servicex_local.deliver import deliver +from servicex_local.deliver import deliver, install_sx_local, Platform + +from servicex_local.science_images import ( + DockerScienceImage, + SingularityScienceImage, + WSL2ScienceImage, +) @pytest.fixture(autouse=True) @@ -329,3 +336,77 @@ def test_local_deliver_applies_logging_level_over_existing_handlers( def test_config_logging_level_default_is_warning(): "Default logging_level is WARNING (a recognised level name)." assert Config(version="25.2.41").logging_level == "WARNING" + + +@pytest.mark.parametrize( + "image, platform, expected_class", + [ + ( + "sslhep/servicex_func_adl_xaod_transformer:25.2.41", + Platform.docker, + DockerScienceImage, + ), + ( + "docker://sslhep/servicex_func_adl_xaod_transformer:25.2.41", + Platform.singularity, + SingularityScienceImage, + ), + ("servicex_func_adl_xaod_transformer:25.2.41", Platform.wsl2, WSL2ScienceImage), + ], +) +def test_install_sx_local(monkeypatch, image, platform, expected_class): + adaptor = install_sx_local(image, platform) + assert isinstance(adaptor.science_runner, expected_class) + + +def test_install_sx_local_errors(): + with pytest.raises(ValueError, match="Unknown platform"): + install_sx_local("some_image", platform="invalid_platform") + + +def test_install_sx_local_uses_yaml_cache_path(monkeypatch, tmp_path): + "install_sx_local takes cache_path from servicex.yaml when one is found." + yaml_cache = tmp_path / "from_yaml" + yaml_cache.mkdir() + + fake_cfg = MagicMock() + fake_cfg.cache_path = str(yaml_cache) + monkeypatch.setattr( + "servicex.configuration.Configuration.read", + classmethod(lambda cls: fake_cfg), + ) + + adaptor = install_sx_local( + "sslhep/servicex_func_adl_xaod_transformer:25.2.41", Platform.docker + ) + + assert adaptor.cache_dir == yaml_cache.resolve() / f"servicex_{getpass.getuser()}" + + +def test_install_sx_local_no_yaml_fallback(monkeypatch): + """install_sx_local falls back to a usable cache dir when no + servicex.yaml is found. + + Regression: the fallback used to assign cache_dir from inside a + ``with tempfile.TemporaryDirectory()`` block, leaving the path dangling + after the block exited. + """ + + def raise_nameerror(cls): + raise NameError("Can't find .servicex or servicex.yaml config file") + + monkeypatch.setattr( + "servicex.configuration.Configuration.read", + classmethod(raise_nameerror), + ) + + adaptor = install_sx_local( + "sslhep/servicex_func_adl_xaod_transformer:25.2.41", Platform.docker + ) + + # The fallback must produce a cache_dir whose parent exists on disk so + # the adaptor can actually write into it. + assert adaptor.cache_dir.parent.exists(), ( + f"Fallback cache_dir parent does not exist: {adaptor.cache_dir.parent}. " + "The TemporaryDirectory was likely cleaned up before being used." + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 13713fb..4625247 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,80 +1,290 @@ -import getpass -from unittest.mock import MagicMock - import pytest -from servicex_local import install_sx_local, Platform -from servicex_local import DockerScienceImage, SingularityScienceImage, WSL2ScienceImage - - -@pytest.mark.parametrize( - "image, platform, expected_class", - [ - ( - "sslhep/servicex_func_adl_xaod_transformer:25.2.41", - Platform.docker, - DockerScienceImage, - ), - ( - "docker://sslhep/servicex_func_adl_xaod_transformer:25.2.41", - Platform.singularity, - SingularityScienceImage, - ), - ("servicex_func_adl_xaod_transformer:25.2.41", Platform.wsl2, WSL2ScienceImage), - ], +import uproot +import json +import awkward as ak +import numpy as np +import os +import re +import filecmp +from servicex_local import utils +from servicex import dataset +from servicex.python_dataset import PythonFunction +from servicex.dataset_identifier import ( + RucioDatasetIdentifier, + FileListDataset, + CERNOpenDataDatasetIdentifier, + XRootDDatasetIdentifier, ) -def test_install_sx_local(monkeypatch, image, platform, expected_class): - adaptor = install_sx_local(image, platform) - assert isinstance(adaptor.science_runner, expected_class) +from pathlib import Path -def test_install_sx_local_errors(): - with pytest.raises(ValueError, match="Unknown platform"): - install_sx_local("some_image", platform="invalid_platform") +@pytest.fixture +def build_encoding_samples(tmp_path): + test_path = str(tmp_path / "test_file1.root") + # example data for two branches + tree_data1 = {"branch1": np.ones(100), "branch2": np.zeros(100)} + tree_data2 = { + "branch1": np.ones(10), + } -def test_install_sx_local_uses_yaml_cache_path(monkeypatch, tmp_path): - "install_sx_local takes cache_path from servicex.yaml when one is found." - yaml_cache = tmp_path / "from_yaml" - yaml_cache.mkdir() + # Create tmp .root files + with uproot.create(test_path) as file: + file.mktree("background", {"branch1": "float64", "branch2": "float64"}) + file["background"].extend(tree_data1) + file.mktree("signal", {"branch1": "float64"}) + file["signal"].extend(tree_data2) - fake_cfg = MagicMock() - fake_cfg.cache_path = str(yaml_cache) - monkeypatch.setattr( - "servicex.configuration.Configuration.read", - classmethod(lambda cls: fake_cfg), - ) + return test_path - adaptor = install_sx_local( - "sslhep/servicex_func_adl_xaod_transformer:25.2.41", Platform.docker - ) - assert adaptor.cache_dir == yaml_cache.resolve() / f"servicex_{getpass.getuser()}" +# Test run_query and print_structure_from_str +def test_encoding(build_encoding_samples, tmp_path, capsys): + + path = build_encoding_samples + query_output = utils.run_query(path) + + # Check return types + assert isinstance( + query_output, ak.Array + ), "run_query() does not produce an awkward.Array" + encoded_str = query_output[0] + assert isinstance(encoded_str, str), "run_query array content is not str" + + encoded_result = json.loads(encoded_str) + # Check result + expected_result = { + "background": {"branch1": "AsDtype('>f8')", "branch2": "AsDtype('>f8')"}, + "signal": {"branch1": "AsDtype('>f8')"}, + } -def test_install_sx_local_no_yaml_fallback(monkeypatch): - """install_sx_local falls back to a usable cache dir when no - servicex.yaml is found. + assert encoded_result == expected_result - Regression: the fallback used to assign cache_dir from inside a - ``with tempfile.TemporaryDirectory()`` block, leaving the path dangling - after the block exited. - """ + # Produce servicex.deliver() like dict + # i.e {"Sample Name":"Path"} + tree_data = {"branch": query_output} + with uproot.create(tmp_path / "encoded.root") as file: + file["servicex"] = tree_data + assert os.path.exists( + tmp_path / "encoded.root" + ), "servicex-like test file not found." + deliver_dict = {"test_file": [str(tmp_path / "encoded.root")]} - def raise_nameerror(cls): - raise NameError("Can't find .servicex or servicex.yaml config file") + # Test str formating on the deliver-like dict + # save_to_txt + utils.print_structure_from_str(deliver_dict, save_to_txt=True) + out_txt = "samples_structure.txt" + assert os.path.exists(out_txt), f"save_to_txt arg not producing {out_txt}" - monkeypatch.setattr( - "servicex.configuration.Configuration.read", - classmethod(raise_nameerror), + with open(out_txt, "r", encoding="utf-8") as f: + written_str = f.read() + + # direct return + output_str = utils.print_structure_from_str(deliver_dict) + + # do_print + utils.print_structure_from_str(deliver_dict, do_print=True) + captured = capsys.readouterr() + + # Check if all returns match + assert ( + captured.out[0:-1] == written_str == output_str + ), "saved, printed and direct return formats should not be different" + + # Compare with expected return + test_txt = "tests/data/expected_structure.txt" + assert filecmp.cmp( + out_txt, test_txt + ), "Formatted str does not match expected return" + + # Test filter_branch + filtered_str = utils.print_structure_from_str( + deliver_dict, filter_branch="branch2" ) + assert ( + "branch1" not in filtered_str + ), "filter_branch argument is not removing branch1" + - adaptor = install_sx_local( - "sslhep/servicex_func_adl_xaod_transformer:25.2.41", Platform.docker +# Test spec builder for deliver +def test_spec_builder(): + # Get spec + test_did_str = "random_space:did" + spec = utils.build_deliver_spec(test_did_str) + + # Check return type + assert isinstance(spec, dict), "build_deliver_spec does not return a dict" + assert "Sample" in spec, "Key 'Sample' is missing in the returned dict" + assert isinstance(spec["Sample"], list), "'Sample' should be a list" + + # Get return size + size = len(spec["Sample"]) + assert size == 1, f"Only one did given but sample item of spec is not len 1: {size}" + + # Check first sample + first_entry = spec["Sample"][0] + assert isinstance(first_entry, dict), "Each entry in 'Sample' should be a dict" + + # Check each key type + assert isinstance(first_entry["NFiles"], int), "'NFiles' should be an integer" + assert isinstance(first_entry["Name"], str), "'Name' should be a string" + + assert isinstance( + first_entry["Dataset"], RucioDatasetIdentifier + ), "'Dataset' should be a RucioDatasetIdentifier" + + assert isinstance( + first_entry["Query"], PythonFunction + ), "'Query' should be a PythonFunction" + + # Different input types + # list with two DIDs + test_did_list = [test_did_str, test_did_str + "2"] + spec_from_list = utils.build_deliver_spec(test_did_list) + assert ( + len(spec_from_list["Sample"]) == 2 + ), "Wrong number of samples in deliver configuration" + + # dict with sample name + test_did_dict = {"Custom-Name": test_did_str} + spec_from_dict = utils.build_deliver_spec(test_did_dict) + assert spec_from_dict["Sample"][0]["Name"] == "Custom-Name" + + # wrong input type + wrong_did = 1234 + expected_msg = ( + f"Unsupported dataset input type: {type(wrong_did)}.\n" + "Input must be str or list of str of Rucio DIDs, " + "a DataSetIdentifier object or a dict " + "('sample_name':'dataset_id')" ) - # The fallback must produce a cache_dir whose parent exists on disk so - # the adaptor can actually write into it. - assert adaptor.cache_dir.parent.exists(), ( - f"Fallback cache_dir parent does not exist: {adaptor.cache_dir.parent}. " - "The TemporaryDirectory was likely cleaned up before being used." + with pytest.raises( + ValueError, + match=re.escape(expected_msg), + ): + utils.build_deliver_spec(wrong_did) + + +def test_spec_builder_with_dataset_identifier(): + # Build multiple types of dataset identifiers + ds1 = dataset.Rucio("random_space:did") + ds2 = dataset.XRootD("root://server/file.root") + ds3 = dataset.CERNOpenData("cernopendata:12345") + ds4 = dataset.FileList(["file1.root", "file2.root"]) + + ds_list = [ds1, ds2, ds3, ds4] + ds_types = [ + RucioDatasetIdentifier, + XRootDDatasetIdentifier, + CERNOpenDataDatasetIdentifier, + FileListDataset, + ] + for did, did_type in zip(ds_list, ds_types): + spec = utils.build_deliver_spec(did) + + # Check return type + assert isinstance(spec, dict), "build_deliver_spec does not return a dict" + assert "Sample" in spec, "Key 'Sample' is missing in the returned dict" + assert isinstance(spec["Sample"], list), "'Sample' should be a list" + + # Get return size + size = len(spec["Sample"]) + assert ( + size == 1 + ), f"Only one did given but sample item of spec is not len 1: {size}" + + # Check first sample + first_entry = spec["Sample"][0] + assert isinstance(first_entry, dict), "Each entry in 'Sample' should be a dict" + + # Check each key type + assert isinstance(first_entry["NFiles"], int), "'NFiles' should be an integer" + assert isinstance(first_entry["Name"], str), "'Name' should be a string" + + assert isinstance( + first_entry["Query"], PythonFunction + ), "'Query' should be a PythonFunction" + + assert isinstance( + first_entry["Dataset"], did_type + ), ( + f"Input Dataset identifier {did} should be a {did_type} " + f"but is {type(first_entry['Dataset'])}" + ) + + +def test_decoding_to_array(build_encoding_samples, array_out=True): + path = build_encoding_samples + query_output = utils.run_query(path) + encoded_result = query_output[0] + + result = utils.str_to_array(encoded_result) + + # Test type + assert isinstance( + result, ak.types.arraytype.ArrayType + ), "str_to_array does not return an awkward array type" + expected_type_str = ( + "1 * {background: {branch1: var * float64, branch2: var * float64}," + " signal: {branch1: var * float64}}" ) + assert str(result) == expected_type_str + + +@pytest.fixture +def build_test_samples(tmp_path): + + test_path = str(tmp_path / "test_metadata.root") + + # Create tmp .root files + with uproot.create(test_path) as file: + file.mktree("MetaData", { + "FileMetaDataAuxDyn.test_100": "int64", + "FileMetaDataAuxDyn.test_abc": str, + }) + file["MetaData"].extend({ + "FileMetaDataAuxDyn.test_100": np.array([100], dtype="int64"), + "FileMetaDataAuxDyn.test_abc": ["abc"], + }) + + return test_path + + +# Test run_query and print_structure_from_str +def test_metadata_retrieval(build_test_samples, tmp_path, capsys): + + path = build_test_samples + query_output = utils.run_query(path) + # Check result + expected_result = { + "FileMetaData": {"test_100": "100", "test_abc": "abc"}, + "MetaData": { + "FileMetaDataAuxDyn.test_100": "AsDtype('>i8')", + "FileMetaDataAuxDyn.test_abc": "AsStrings()", + }, + } + encoded_result = json.loads(query_output[0]) + + assert encoded_result == expected_result + + # Produce servicex.deliver() like dict + # i.e {"Sample Name":"Path"} + tree_data = {"branch": query_output} + with uproot.create(tmp_path / "encoded.root") as file: + file["servicex"] = tree_data + assert os.path.exists( + tmp_path / "encoded.root" + ), "servicex-like test file not found." + deliver_dict = {"test_file": [str(tmp_path / "encoded.root")]} + + # Test str formating + output_str = utils.print_structure_from_str(deliver_dict) + + expected_path = Path("tests/data/expected_metadata.txt") + expected = expected_path.read_text(encoding="utf-8") + + assert ( + expected == output_str + ), f"Output does not match expected.\n Output: {output_str}"