diff --git a/src/mock_features/__init__.py b/src/mock_features/__init__.py new file mode 100644 index 0000000..d1d64c3 --- /dev/null +++ b/src/mock_features/__init__.py @@ -0,0 +1 @@ +from .mock_features import * diff --git a/src/mock_features/mock_features.py b/src/mock_features/mock_features.py new file mode 100644 index 0000000..87545d0 --- /dev/null +++ b/src/mock_features/mock_features.py @@ -0,0 +1,50 @@ +from typing import Any, Dict + +class MockFeatures: + """ + A unified stub to replace nsim.features.Features and provide + default configuration values for simulations and nmesh. + """ + def __init__(self): + # Default configuration for simulations + self._data: Dict[str, Dict[str, Any]] = { + 'etc': { + 'runid': 'nmag_simulation', # Default output filename + 'savedir': '.', # Default output directory + }, + 'nmag': { + 'clean': False, # Don't delete old files automatically + 'restart': False, # Don't try to restart from h5 + } + } + + def from_file(self, file_path): + """Stub for loading features from a file.""" + pass + + def from_string(self, string): + """Stub for loading features from a string.""" + pass + + def add_section(self, section: str): + """Adds a section to the features if it doesn't exist.""" + if section not in self._data: + self._data[section] = {} + + def get(self, section: str, name: str, raw: bool = False) -> Any: + """Retrieves a value from the specified section and name.""" + return self._data.get(section, {}).get(name) + + def set(self, section: str, name: str, value: Any): + """Sets a value in the specified section and name.""" + if section not in self._data: + self._data[section] = {} + self._data[section][name] = value + + def items(self, section: str): + """Returns items in a given section.""" + return self._data.get(section, {}).items() + + def to_string(self) -> str: + """Returns string representation of the data.""" + return str(self._data) diff --git a/src/nmesh/__init__.py b/src/nmesh/__init__.py new file mode 100644 index 0000000..671625b --- /dev/null +++ b/src/nmesh/__init__.py @@ -0,0 +1 @@ +from .nmesh import * diff --git a/src/nmesh/nmesh.py b/src/nmesh/nmesh.py new file mode 100644 index 0000000..6be5968 --- /dev/null +++ b/src/nmesh/nmesh.py @@ -0,0 +1,559 @@ +import math +import logging +from typing import List, Tuple, Optional, Any, Union +from pathlib import Path +import itertools +from mock_features import MockFeatures +from . import utils + +log = logging.getLogger(__name__) + +class OCamlStub: + """Stub for the OCaml backend interface, these will be removed soon once we have python equivalents.""" + + # Mesher defaults setters + def mesher_defaults_set_shape_force_scale(self, mesher, scale): pass + def mesher_defaults_set_volume_force_scale(self, mesher, scale): pass + def mesher_defaults_set_neigh_force_scale(self, mesher, scale): pass + def mesher_defaults_set_irrel_elem_force_scale(self, mesher, scale): pass + def mesher_defaults_set_time_step_scale(self, mesher, scale): pass + def mesher_defaults_set_thresh_add(self, mesher, thresh): pass + def mesher_defaults_set_thresh_del(self, mesher, thresh): pass + def mesher_defaults_set_topology_threshold(self, mesher, thresh): pass + def mesher_defaults_set_tolerated_rel_movement(self, mesher, scale): pass + def mesher_defaults_set_max_relaxation_steps(self, mesher, steps): pass + def mesher_defaults_set_initial_settling_steps(self, mesher, steps): pass + def mesher_defaults_set_sliver_correction(self, mesher, scale): pass + def mesher_defaults_set_smallest_allowed_volume_ratio(self, mesher, scale): pass + def mesher_defaults_set_movement_max_freedom(self, mesher, scale): pass + + # Mesh operations + def mesh_scale_node_positions(self, raw_mesh, scale): pass + def mesh_writefile(self, path, raw_mesh): pass + def mesh_nr_simplices(self, raw_mesh): return 0 + def mesh_nr_points(self, raw_mesh): return 0 + def mesh_plotinfo(self, raw_mesh): return [[], [], [[], [], []]] + def mesh_plotinfo_points(self, raw_mesh): return [] + def mesh_plotinfo_pointsregions(self, raw_mesh): return [] + def mesh_plotinfo_simplices(self, raw_mesh): return [] + def mesh_plotinfo_simplicesregions(self, raw_mesh): return [] + def mesh_plotinfo_surfaces_and_surfacesregions(self, raw_mesh): return [[], []] + def mesh_plotinfo_links(self, raw_mesh): return [] + def mesh_dim(self, raw_mesh): return 3 + def mesh_plotinfo_regionvolumes(self, raw_mesh): return [] + def mesh_plotinfo_periodic_points_indices(self, raw_mesh): return [] + def mesh_set_vertex_distribution(self, raw_mesh, dist): pass + def mesh_get_permutation(self, raw_mesh): return [] + def mesh_readfile(self, filename, do_reorder, do_distribute): return "STUB_MESH" + + # Driver and Mesh creation + def make_mg_gendriver(self, interval, callback): return "STUB_DRIVER" + def copy_mesher_defaults(self, defaults): return "STUB_MESHER" + def mesh_bodies_raw(self, driver, mesher, bb_min, bb_max, mesh_ext, objects, a0, density, fixed, mobile, simply, periodic, cache, hints): return "STUB_MESH" + def mesh_from_points_and_simplices(self, dim, points, simplices, regions, periodic, reorder, distribute): return "STUB_MESH" + + # Body operations + def body_union(self, objs): return "STUB_OBJ_UNION" + def body_difference(self, obj1, objs): return "STUB_OBJ_DIFF" + def body_intersection(self, objs): return "STUB_OBJ_INTERSECT" + def body_shifted_sc(self, obj, shift): return obj + def body_shifted_bc(self, obj, shift): return obj + def body_scaled(self, obj, scale): return obj + def body_rotated_sc(self, obj, a1, a2, ang): return obj + def body_rotated_bc(self, obj, a1, a2, ang): return obj + def body_rotated_axis_sc(self, obj, axis, ang): return obj + def body_rotated_axis_bc(self, obj, axis, ang): return obj + + # Primitives + def body_box(self, p1, p2): return "STUB_BOX" + def body_ellipsoid(self, length): return "STUB_ELLIPSOID" + def body_frustum(self, c1, r1, c2, r2): return "STUB_FRUSTUM" + def body_helix(self, c1, r1, c2, r2): return "STUB_HELIX" + + @property + def mesher_defaults(self): return "STUB_DEFAULTS" + +ocaml = OCamlStub() + +def memory_report(tag: str): + """Reports memory usage.""" + t, vmem, rss = utils.time_vmem_rss() + log.log(15, f"Memory report: T= {t:f} VMEM= {int(vmem)} KB RSS= {int(rss)} KB {tag}") + +# --- Configuration --- + +class MeshingParameters(MockFeatures): + """Parameters for the meshing algorithm, supporting multiple dimensions.""" + def __init__(self, string=None, file=None): + super().__init__() + self.dim = None + if file: self.from_file(file) + if string: self.from_string(string) + self.add_section('user-modifications') + + def _get_section_name(self): + if self.dim is None: + raise RuntimeError("Dimension not set in MeshingParameters") + return f'nmesh-{self.dim}D' if self.dim in [2, 3] else 'nmesh-ND' + + def __getitem__(self, name): + val = self.get('user-modifications', name) + if val is not None: + return val + section = self._get_section_name() + return self.get(section, name) + + def __setitem__(self, key, value): + self.set('user-modifications', key, value) + + def set_shape_force_scale(self, v): self["shape_force_scale"] = float(v) + def set_volume_force_scale(self, v): self["volume_force_scale"] = float(v) + def set_neigh_force_scale(self, v): self["neigh_force_scale"] = float(v) + def set_irrel_elem_force_scale(self, v): self["irrel_elem_force_scale"] = float(v) + def set_time_step_scale(self, v): self["time_step_scale"] = float(v) + def set_thresh_add(self, v): self["thresh_add"] = float(v) + def set_thresh_del(self, v): self["thresh_del"] = float(v) + def set_topology_threshold(self, v): self["topology_threshold"] = float(v) + def set_tolerated_rel_move(self, v): self["tolerated_rel_move"] = float(v) + def set_max_steps(self, v): self["max_steps"] = int(v) + def set_initial_settling_steps(self, v): self["initial_settling_steps"] = int(v) + def set_sliver_correction(self, v): self["sliver_correction"] = float(v) + def set_smallest_volume_ratio(self, v): self["smallest_volume_ratio"] = float(v) + def set_max_relaxation(self, v): self["max_relaxation"] = float(v) + + def pass_parameters_to_ocaml(self, mesher, dim): + self.dim = dim + for key, value in self.items('user-modifications'): + section = self._get_section_name() + self.set(section, key, str(value)) + + params = [ + ("shape_force_scale", ocaml.mesher_defaults_set_shape_force_scale), + ("volume_force_scale", ocaml.mesher_defaults_set_volume_force_scale), + ("neigh_force_scale", ocaml.mesher_defaults_set_neigh_force_scale), + ("irrel_elem_force_scale", ocaml.mesher_defaults_set_irrel_elem_force_scale), + ("time_step_scale", ocaml.mesher_defaults_set_time_step_scale), + ("thresh_add", ocaml.mesher_defaults_set_thresh_add), + ("thresh_del", ocaml.mesher_defaults_set_thresh_del), + ("topology_threshold", ocaml.mesher_defaults_set_topology_threshold), + ("tolerated_rel_move", ocaml.mesher_defaults_set_tolerated_rel_movement), + ("max_steps", ocaml.mesher_defaults_set_max_relaxation_steps), + ("initial_settling_steps", ocaml.mesher_defaults_set_initial_settling_steps), + ("sliver_correction", ocaml.mesher_defaults_set_sliver_correction), + ("smallest_volume_ratio", ocaml.mesher_defaults_set_smallest_allowed_volume_ratio), + ("max_relaxation", ocaml.mesher_defaults_set_movement_max_freedom), + ] + + for key, setter in params: + val = self[key] + if val is not None: + setter(mesher, float(val) if "steps" not in key else int(val)) + +def get_default_meshing_parameters(): + return MeshingParameters() + +# --- Loading Utilities --- + +def _is_nmesh_ascii_file(filename): + try: + with open(filename, 'r') as f: + return f.readline().startswith("# PYFEM") + except: return False + +def _is_nmesh_hdf5_file(filename): + return str(filename).lower().endswith('.h5') + +def hdf5_mesh_get_permutation(filename): + log.warning("hdf5_mesh_get_permutation: HDF5 support is stubbed.") + return None + +# --- Mesh Classes --- + +class MeshBase: + """Base class for all mesh objects, providing access to mesh data.""" + def __init__(self, raw_mesh): + self.raw_mesh = raw_mesh + self._cache = {} + + def scale_node_positions(self, scale: float): + """Scales all node positions in the mesh.""" + ocaml.mesh_scale_node_positions(self.raw_mesh, float(scale)) + self._cache.pop('points', None) + self._cache.pop('region_volumes', None) + + def save(self, filename: Union[str, Path]): + """Saves the mesh to a file (ASCII or HDF5).""" + path = str(filename) + if path.lower().endswith('.h5'): + log.info(f"Saving to HDF5 (stub): {path}") + else: + ocaml.mesh_writefile(path, self.raw_mesh) + + def __str__(self): + pts = ocaml.mesh_nr_points(self.raw_mesh) + simps = ocaml.mesh_nr_simplices(self.raw_mesh) + return f"Mesh with {pts} points and {simps} simplices" + + def to_lists(self): + """Returns mesh data as Python lists.""" + return ocaml.mesh_plotinfo(self.raw_mesh) + + @property + def points(self): + if 'points' not in self._cache: + self._cache['points'] = ocaml.mesh_plotinfo_points(self.raw_mesh) + return self._cache['points'] + + @property + def simplices(self): + if 'simplices' not in self._cache: + self._cache['simplices'] = ocaml.mesh_plotinfo_simplices(self.raw_mesh) + return self._cache['simplices'] + + @property + def regions(self): + if 'regions' not in self._cache: + self._cache['regions'] = ocaml.mesh_plotinfo_simplicesregions(self.raw_mesh) + return self._cache['regions'] + + @property + def dim(self): + return ocaml.mesh_dim(self.raw_mesh) + + @property + def surfaces(self): + return ocaml.mesh_plotinfo_surfaces_and_surfacesregions(self.raw_mesh)[0] + + @property + def point_regions(self): + """Returns regions for each point.""" + if 'point_regions' not in self._cache: + self._cache['point_regions'] = ocaml.mesh_plotinfo_pointsregions(self.raw_mesh) + return self._cache['point_regions'] + + @property + def links(self): + """Returns all links (pairs of point indices).""" + if 'links' not in self._cache: + self._cache['links'] = ocaml.mesh_plotinfo_links(self.raw_mesh) + return self._cache['links'] + + @property + def region_volumes(self): + """Returns volume of each region.""" + if 'region_volumes' not in self._cache: + self._cache['region_volumes'] = ocaml.mesh_plotinfo_regionvolumes(self.raw_mesh) + return self._cache['region_volumes'] + + @property + def num_regions(self): + """Returns the number of regions.""" + return len(self.region_volumes) + + @property + def periodic_point_indices(self): + """Returns indices of periodic nodes.""" + if 'periodic_indices' not in self._cache: + self._cache['periodic_indices'] = ocaml.mesh_plotinfo_periodic_points_indices(self.raw_mesh) + return self._cache['periodic_indices'] + + @property + def permutation(self): + """Returns the node permutation mapping.""" + return ocaml.mesh_get_permutation(self.raw_mesh) + + def set_vertex_distribution(self, dist): + """Sets vertex distribution.""" + ocaml.mesh_set_vertex_distribution(self.raw_mesh, dist) + +class Mesh(MeshBase): + """Class for generating a mesh from geometric objects.""" + def __init__(self, bounding_box, objects=[], a0=1.0, density="", + periodic=[], fixed_points=[], mobile_points=[], simply_points=[], + callback=None, mesh_bounding_box=False, meshing_parameters=None, + cache_name="", hints=[], **kwargs): + + if bounding_box is None: + raise ValueError("Bounding box must be provided.") + + bb = [[float(x) for x in p] for p in bounding_box] + dim = len(bb[0]) + mesh_ext = 1 if mesh_bounding_box else 0 + + if not objects and not mesh_bounding_box: + raise ValueError("No objects to mesh and bounding box meshing disabled.") + + params = meshing_parameters or get_default_meshing_parameters() + for k, v in kwargs.items(): + params[k] = v + + obj_bodies = [] + # Store points lists to allow appending later (mimicking original API) + self._fixed_points = [list(map(float, p)) for p in fixed_points] + self._mobile_points = [list(map(float, p)) for p in mobile_points] + self._simply_points = [list(map(float, p)) for p in simply_points] + + for obj in objects: + obj_bodies.append(obj.obj) + self._fixed_points.extend(obj.fixed_points) + self._mobile_points.extend(obj.mobile_points) + + periodic_floats = [1.0 if p else 0.0 for p in periodic] if periodic else [0.0] * dim + + cb_func, cb_interval = callback if callback else (lambda a,b,c: None, 1000000) + self.fun_driver = cb_func + driver = ocaml.make_mg_gendriver(cb_interval, cb_func) + mesher = ocaml.copy_mesher_defaults(ocaml.mesher_defaults) + params.pass_parameters_to_ocaml(mesher, dim) + + # Note: In the original code, mesh generation happens in __init__. + # Adding points via methods afterwards wouldn't affect the already generated mesh + # unless we regenerate or if those methods were meant for pre-generation setup. + # However, checking lib1.py, __init__ calls mesh_bodies_raw immediately. + # The methods fixed_points/mobile_points in lib1.py just append to self.fixed_points + # which seems useless after __init__ unless the user manually triggers something else. + # But we will preserve them for API compatibility. + + raw = ocaml.mesh_bodies_raw( + driver, mesher, bb[0], bb[1], mesh_ext, obj_bodies, float(a0), + density, self._fixed_points, self._mobile_points, self._simply_points, periodic_floats, + cache_name, hints + ) + + if raw is None: raise RuntimeError("Mesh generation failed.") + super().__init__(raw) + + def default_fun(self, nr_piece, n, mesh): + """Default callback function.""" + pass + + def extended_fun_driver(self, nr_piece, iteration_nr, mesh): + """Extended driver callback.""" + if hasattr(self, 'fun_driver'): + self.fun_driver(nr_piece, iteration_nr, mesh) + + def fixed_points(self, points: List[List[float]]): + """Adds fixed points to the mesh configuration.""" + if points: + self._fixed_points.extend(points) + + def mobile_points(self, points: List[List[float]]): + """Adds mobile points to the mesh configuration.""" + if points: + self._mobile_points.extend(points) + + def simply_points(self, points: List[List[float]]): + """Adds simply points to the mesh configuration.""" + if points: + self._simply_points.extend(points) + +class MeshFromFile(MeshBase): + """Loads a mesh from a file.""" + def __init__(self, filename, reorder=False, distribute=True): + path = Path(filename) + if not path.exists(): raise FileNotFoundError(f"File {filename} not found") + + # Determine format + if _is_nmesh_ascii_file(filename): + raw = ocaml.mesh_readfile(str(path), reorder, distribute) + elif _is_nmesh_hdf5_file(filename): + raw = ocaml.mesh_readfile(str(path), reorder, distribute) + else: + raise ValueError(f"Unknown mesh file format: {filename}") + + super().__init__(raw) + +class mesh_from_points_and_simplices(MeshBase): + """Wrapper for backward compatibility.""" + def __init__(self, points=[], simplices_indices=[], simplices_regions=[], + periodic_point_indices=[], initial=0, do_reorder=False, + do_distribute=True): + + # Adjust for 1-based indexing if initial=1 + if initial == 1: + simplices_indices = [[idx - 1 for idx in s] for s in simplices_indices] + + raw = ocaml.mesh_from_points_and_simplices( + len(points[0]) if points else 3, + [[float(x) for x in p] for p in points], + [[int(x) for x in s] for s in simplices_indices], + [int(r) for r in simplices_regions], + periodic_point_indices, do_reorder, do_distribute + ) + super().__init__(raw) + +def load(filename, reorder=False, distribute=True): + """Utility function to load a mesh.""" + return MeshFromFile(filename, reorder, distribute) + +def save(mesh: MeshBase, filename: Union[str, Path]): + """Alias for mesh.save for backward compatibility.""" + mesh.save(filename) + +# --- Geometry --- + +class MeshObject: + """Base class for geometric primitives and CSG operations.""" + def __init__(self, dim, fixed=[], mobile=[]): + self.dim = dim + self.fixed_points = fixed + self.mobile_points = mobile + self.obj: Any = None + + def shift(self, vector, system_coords=True): + self.obj = (ocaml.body_shifted_sc if system_coords else ocaml.body_shifted_bc)(self.obj, vector) + + def scale(self, factors): + self.obj = ocaml.body_scaled(self.obj, factors) + + def rotate(self, a1, a2, angle, system_coords=True): + rad = math.radians(angle) + self.obj = (ocaml.body_rotated_sc if system_coords else ocaml.body_rotated_bc)(self.obj, a1, a2, rad) + + def rotate_3d(self, axis, angle, system_coords=True): + rad = math.radians(angle) + self.obj = (ocaml.body_rotated_axis_sc if system_coords else ocaml.body_rotated_axis_bc)(self.obj, axis, rad) + + def transform(self, transformations, system_coords=True): + """Applies a list of transformation tuples.""" + for t in transformations: + name, *args = t + if name == "shift": self.shift(args[0], system_coords) + elif name == "scale": self.scale(args[0]) + elif name == "rotate": self.rotate(args[0][0], args[0][1], args[1], system_coords) + elif name == "rotate2d": self.rotate(0, 1, args[0], system_coords) + elif name == "rotate3d": self.rotate_3d(args[0], args[1], system_coords) + +class Box(MeshObject): + def __init__(self, p1, p2, transform=[], fixed=[], mobile=[], system_coords=True, use_fixed_corners=False): + dim = len(p1) + if use_fixed_corners: + fixed.extend([list(c) for c in itertools.product(*zip(p1, p2))]) + super().__init__(dim, fixed, mobile) + self.obj = ocaml.body_box([float(x) for x in p1], [float(x) for x in p2]) + self.transform(transform, system_coords) + +class Ellipsoid(MeshObject): + def __init__(self, lengths, transform=[], fixed=[], mobile=[], system_coords=True): + super().__init__(len(lengths), fixed, mobile) + self.obj = ocaml.body_ellipsoid([float(x) for x in lengths]) + self.transform(transform, system_coords) + +class Conic(MeshObject): + def __init__(self, c1, r1, c2, r2, transform=[], fixed=[], mobile=[], system_coords=True): + super().__init__(len(c1), fixed, mobile) + self.obj = ocaml.body_frustum(c1, r1, c2, r2) + self.transform(transform, system_coords) + +class Helix(MeshObject): + def __init__(self, c1, r1, c2, r2, transform=[], fixed=[], mobile=[], system_coords=True): + super().__init__(len(c1), fixed, mobile) + self.obj = ocaml.body_helix(c1, r1, c2, r2) + self.transform(transform, system_coords) + +# --- CSG --- + +def union(objects: List[MeshObject]) -> MeshObject: + if len(objects) < 2: raise ValueError("Union requires at least two objects") + res = MeshObject(objects[0].dim) + for o in objects: + res.fixed_points.extend(o.fixed_points) + res.mobile_points.extend(o.mobile_points) + res.obj = ocaml.body_union([o.obj for o in objects]) + return res + +def difference(mother: MeshObject, subtract: List[MeshObject]) -> MeshObject: + res = MeshObject(mother.dim, mother.fixed_points[:], mother.mobile_points[:]) + for o in subtract: + res.fixed_points.extend(o.fixed_points) + res.mobile_points.extend(o.mobile_points) + res.obj = ocaml.body_difference(mother.obj, [o.obj for o in subtract]) + return res + +def intersect(objects: List[MeshObject]) -> MeshObject: + if len(objects) < 2: raise ValueError("Intersection requires at least two objects") + res = MeshObject(objects[0].dim) + for o in objects: + res.fixed_points.extend(o.fixed_points) + res.mobile_points.extend(o.mobile_points) + res.obj = ocaml.body_intersection([o.obj for o in objects]) + return res + +# --- Utilities --- + +def outer_corners(mesh: MeshBase): + """Determines the bounding box of the mesh nodes.""" + coords = mesh.points + if not coords: return None, None + transpose = list(zip(*coords)) + return [min(t) for t in transpose], [max(t) for t in transpose] + +def generate_1d_mesh_components(regions: List[Tuple[float, float]], discretization: float) -> Tuple: + """Generates 1D mesh components (points, simplices, regions).""" + points, simplices, regions_ids = [], [], [] + point_map = {} + + def get_idx(v): + vk = round(v, 8) + if vk not in point_map: + point_map[vk] = len(points) + points.append([float(v)]) + return point_map[vk] + + for rid, (start, end) in enumerate(regions, 1): + if start > end: start, end = end, start + steps = max(1, int(abs((end - start) / discretization))) + step = (end - start) / steps + last = get_idx(start) + for i in range(1, steps + 1): + curr = get_idx(start + i * step) + simplices.append([last, curr]) + regions_ids.append(rid) + last = curr + + return points, simplices, regions_ids + +def generate_1d_mesh(regions: List[Tuple[float, float]], discretization: float) -> MeshBase: + """Generates a 1D mesh with specified regions and step size.""" + pts, simps, regs = generate_1d_mesh_components(regions, discretization) + return mesh_from_points_and_simplices(pts, simps, regs) + +def to_lists(mesh: MeshBase): + """Returns mesh data as Python lists.""" + return mesh.to_lists() + +tolists = to_lists + +def write_mesh(mesh_data, out=None, check=True, float_fmt=" %f"): + """ + Writes mesh data (points, simplices, surfaces) to a file in nmesh format. + mesh_data: (points, simplices, surfaces) + """ + points, simplices, surfaces = mesh_data + + lines = ["# PYFEM mesh file version 1.0"] + dim = len(points[0]) if points else 0 + lines.append(f"# dim = {dim} \t nodes = {len(points)} \t simplices = {len(simplices)} \t surfaces = {len(surfaces)} \t periodic = 0") + + lines.append(str(len(points))) + for p in points: + lines.append("".join(float_fmt % x for x in p)) + + lines.append(str(len(simplices))) + for body, nodes in simplices: + lines.append(f" {body} " + " ".join(str(n) for n in nodes)) + + lines.append(str(len(surfaces))) + for body, nodes in surfaces: + lines.append(f" {body} " + " ".join(str(n) for n in nodes)) + + lines.append("0") + + content = "\n".join(lines) + "\n" + + if out is None: + print(content) + elif isinstance(out, (str, Path)): + Path(out).write_text(content) + else: + out.write(content) diff --git a/src/nmesh/utils/__init__.py b/src/nmesh/utils/__init__.py new file mode 100644 index 0000000..30aac80 --- /dev/null +++ b/src/nmesh/utils/__init__.py @@ -0,0 +1,2 @@ +from .array_list_utils import * +from .timing_memory_utils import * diff --git a/src/nmesh/utils/array_list_utils.py b/src/nmesh/utils/array_list_utils.py new file mode 100644 index 0000000..4a2bcb1 --- /dev/null +++ b/src/nmesh/utils/array_list_utils.py @@ -0,0 +1,42 @@ +import numpy as np + +def array_filter(p, arr): + arr = np.asanyarray(arr) + # Apply predicate to create a boolean mask. + mask = np.vectorize(p)(arr) + return arr[mask] + +def array_position(x, arr, start=0): + arr = np.asanyarray(arr) + sub_arr = arr[start:] + indices = np.where(sub_arr == x)[0] + if len(indices) > 0: + return int(indices[0] + start) + return -1 + +def array_position_if(p, arr, start=0): + arr = np.asanyarray(arr) + sub_arr = arr[start:] + mask = np.vectorize(p)(sub_arr) + indices = np.where(mask)[0] + if len(indices) > 0: + return int(indices[0] + start) + return -1 + +def array_one_shorter(arr, pos): + return np.delete(np.asanyarray(arr), pos) + +def determinant(mx): + return float(np.linalg.det(np.asanyarray(mx))) + +def inverse(mx): + return np.linalg.inv(np.asanyarray(mx)) + +def det_and_inv(mx): + arr = np.asanyarray(mx) + det = np.linalg.det(arr) + inv = np.linalg.inv(arr) + return float(det), inv + +def cross_product_3d(v1, v2): + return np.cross(np.asanyarray(v1), np.asanyarray(v2)) diff --git a/src/nmesh/utils/timing_memory_utils.py b/src/nmesh/utils/timing_memory_utils.py new file mode 100644 index 0000000..6892ffd --- /dev/null +++ b/src/nmesh/utils/timing_memory_utils.py @@ -0,0 +1,48 @@ +import logging +import os +import re +import time + +log = logging.getLogger(__name__) + +_time_zero = None + +def time_passed(): + """Returns elapsed time since the first call to this function.""" + global _time_zero + if _time_zero is None: + _time_zero = time.time() + return 0.0 + return time.time() - _time_zero + +def memstats(self_status_file="/proc/self/status"): + """Reads VmSize and VmRSS from /proc/self/status (Linux).""" + vmsize_vmrss = [0.0, 0.0] + if not os.path.exists(self_status_file): + return vmsize_vmrss + + # Matches "VmSize: 1234 kB" or "VmRSS: 5678 KB". + re_pattern = re.compile(r"^(VmSize|VmRSS):\s+(\d+)\s+[kK][bB]", re.MULTILINE) + try: + with open(self_status_file, "r") as fd: + content = fd.read() + found = 0 + for match in re_pattern.finditer(content): + key = match.group(1) + value = float(match.group(2)) + if key == "VmSize": + vmsize_vmrss[0] = value + else: + vmsize_vmrss[1] = value + found += 1 + if found >= 2: + break + except Exception as e: + log.debug(f"Failed to read memstats: {e}") + return vmsize_vmrss + +def time_vmem_rss(): + """Returns (elapsed_time, vmem, rss) where memory is in KB.""" + t = time_passed() + mem = memstats() + return t, mem[0], mem[1] diff --git a/src/simulation/mock_features.py b/src/simulation/mock_features.py deleted file mode 100644 index af997d4..0000000 --- a/src/simulation/mock_features.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import Any - -class MockFeatures: - """ - A simple stub to replace nsim.setup.get_features(). - Currently this is used in the simulation_core class. - Provides default configuration values for the simulation. - There are no tests for this class as it is only a stub and will removed in future. - """ - def __init__(self): - self._config = { - ('etc', 'runid'): 'nmag_simulation', # Default output filename - ('etc', 'savedir'): '.', # Default output directory - ('nmag', 'clean'): False, # Don't delete old files automatically - ('nmag', 'restart'): False, # Don't try to restart from h5 - } - - def get(self, section: str, key: str, raw: bool = False) -> Any: - return self._config.get((section, key)) diff --git a/tests/nmesh/utils/test_array_list_utils.py b/tests/nmesh/utils/test_array_list_utils.py new file mode 100644 index 0000000..3ed78eb --- /dev/null +++ b/tests/nmesh/utils/test_array_list_utils.py @@ -0,0 +1,55 @@ +import unittest + +import numpy as np +import nmesh.utils.array_list_utils as array_utils + + +class TestArrayListUtils(unittest.TestCase): + def test_array_filter(self): + arr = [1, 2, 3, 4, 5] + p = lambda x: x % 2 == 0 + expected = [2, 4] + np.testing.assert_array_equal(array_utils.array_filter(p, arr), expected) + + def test_array_position(self): + arr = [10, 20, 30, 40, 30] + self.assertEqual(array_utils.array_position(30, arr), 2) + self.assertEqual(array_utils.array_position(30, arr, start=3), 4) + self.assertEqual(array_utils.array_position(50, arr), -1) + + def test_array_position_if(self): + arr = [1, 3, 5, 8, 10] + p = lambda x: x % 2 == 0 + self.assertEqual(array_utils.array_position_if(p, arr), 3) + self.assertEqual(array_utils.array_position_if(p, arr, start=4), 4) + + def test_array_one_shorter(self): + arr = [1, 2, 3, 4] + np.testing.assert_array_equal(array_utils.array_one_shorter(arr, 1), [1, 3, 4]) + + def test_determinant(self): + mx = [[1, 2], [3, 4]] + self.assertAlmostEqual(array_utils.determinant(mx), -2.0, places=9) + + def test_inverse(self): + mx = [[1, 2], [3, 4]] + inv = array_utils.inverse(mx) + expected = [[-2.0, 1.0], [1.5, -0.5]] + np.testing.assert_array_almost_equal(inv, expected, decimal=9) + + def test_det_and_inv(self): + mx = [[1, 2], [3, 4]] + det, inv = array_utils.det_and_inv(mx) + self.assertAlmostEqual(det, -2.0, places=9) + expected = [[-2.0, 1.0], [1.5, -0.5]] + np.testing.assert_array_almost_equal(inv, expected, decimal=9) + + def test_cross_product_3d(self): + v1 = [1, 0, 0] + v2 = [0, 1, 0] + expected = [0, 0, 1] + np.testing.assert_array_equal(array_utils.cross_product_3d(v1, v2), expected) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/nmesh/utils/test_timing_memory_utils.py b/tests/nmesh/utils/test_timing_memory_utils.py new file mode 100644 index 0000000..3e26bbf --- /dev/null +++ b/tests/nmesh/utils/test_timing_memory_utils.py @@ -0,0 +1,33 @@ +import os +import tempfile +import unittest + +import nmesh.utils.timing_memory_utils as timing_utils + + +class TestTimingMemoryUtils(unittest.TestCase): + def setUp(self): + timing_utils._time_zero = None + + def test_time_vmem_rss(self): + t, vmem, rss = timing_utils.time_vmem_rss() + self.assertGreaterEqual(t, 0.0) + # On non-Linux this might be 0.0, but if /proc/self/status exists it should be > 0. + if os.path.exists("/proc/self/status"): + self.assertGreater(vmem, 0.0) + self.assertGreater(rss, 0.0) + + def test_memstats_from_file(self): + content = "Name:\tpython\nVmSize:\t 1234 kB\nVmRSS:\t 456 kB\n" + with tempfile.NamedTemporaryFile("w+", delete=False) as tmp: + tmp.write(content) + tmp_path = tmp.name + + try: + self.assertEqual(timing_utils.memstats(tmp_path), [1234.0, 456.0]) + finally: + os.unlink(tmp_path) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/nmesh_test.py b/tests/nmesh_test.py new file mode 100644 index 0000000..ec44546 --- /dev/null +++ b/tests/nmesh_test.py @@ -0,0 +1,102 @@ +import unittest +import math +from pathlib import Path +import nmesh + +class TestNMesh(unittest.TestCase): + def test_meshing_parameters(self): + """Test MeshingParameters setters and getters.""" + params = nmesh.get_default_meshing_parameters() + params.dim = 3 + + # Test individual setters + params.set_shape_force_scale(0.5) + self.assertEqual(params["shape_force_scale"], 0.5) + + params.set_max_steps(5000) + self.assertEqual(params["max_steps"], 5000) + + # Test item setting + params["volume_force_scale"] = 0.1 + self.assertEqual(params["volume_force_scale"], 0.1) + + def test_box_primitive(self): + """Test Box primitive creation and transformations.""" + p1 = [0.0, 0.0, 0.0] + p2 = [1.0, 1.0, 1.0] + b = nmesh.Box(p1, p2, use_fixed_corners=True) + + self.assertEqual(b.dim, 3) + # 8 corners for a 3D box + self.assertEqual(len(b.fixed_points), 8) + + # Test transformation + b.shift([1.0, 0.0, 0.0]) + b.scale([2.0, 2.0, 2.0]) + b.rotate(0, 1, 90) + + def test_csg_operations(self): + """Test CSG operations like union and difference.""" + b1 = nmesh.Box([0,0,0], [1,1,1]) + b2 = nmesh.Box([0.5,0.5,0.5], [1.5,1.5,1.5]) + + u = nmesh.union([b1, b2]) + self.assertEqual(u.dim, 3) + + d = nmesh.difference(b1, [b2]) + self.assertEqual(d.dim, 3) + + def test_mesh_generation_stub(self): + """Test Mesh class initialization with stubs.""" + bb = [[0,0,0], [1,1,1]] + obj = nmesh.Box([0.2,0.2,0.2], [0.8,0.8,0.8]) + + m = nmesh.Mesh(bounding_box=bb, objects=[obj], a0=0.1) + self.assertEqual(str(m), "Mesh with 0 points and 0 simplices") # From stubs + + # Test properties (should return empty lists from stubs) + self.assertEqual(m.points, []) + self.assertEqual(m.simplices, []) + self.assertEqual(m.regions, []) + + def test_1d_mesh_generation(self): + """Test 1D mesh generation logic.""" + regions = [(0.0, 1.0), (1.0, 2.0)] + discretization = 0.5 + + m = nmesh.generate_1d_mesh(regions, discretization) + self.assertIsInstance(m, nmesh.MeshBase) + + pts, simps, regs = nmesh.generate_1d_mesh_components(regions, discretization) + self.assertEqual(len(pts), 5) # 0.0, 0.5, 1.0, 1.5, 2.0 + self.assertEqual(len(simps), 4) + self.assertEqual(len(regs), 4) + + def test_outer_corners(self): + """Test outer_corners utility.""" + class MockMesh(nmesh.MeshBase): + @property + def points(self): + return [[0,0], [1,2], [-1,1]] + + m = MockMesh("raw") + min_c, max_corner = nmesh.outer_corners(m) + self.assertEqual(min_c, [-1, 0]) + self.assertEqual(max_corner, [1, 2]) + + def test_write_mesh(self): + """Test write_mesh utility.""" + points = [[0.0, 0.0], [1.0, 1.0]] + simplices = [(1, [0, 1])] + surfaces = [(1, [0])] + data = (points, simplices, surfaces) + + import io + out = io.StringIO() + nmesh.write_mesh(data, out=out) + content = out.getvalue() + self.assertIn("# PYFEM mesh file version 1.0", content) + self.assertIn("nodes = 2", content) + +if __name__ == '__main__': + unittest.main()