diff --git a/dev-scripts/profile_model_memory.py b/dev-scripts/profile_model_memory.py new file mode 100644 index 00000000..a6a5a981 --- /dev/null +++ b/dev-scripts/profile_model_memory.py @@ -0,0 +1,200 @@ +""" +Reusable memory profiling script for linopy model building. + +Run with scalene for line-level memory attribution: + scalene run dev-scripts/profile_model_memory.py --preset medium + +Run standalone for quick peak RSS + timing: + python dev-scripts/profile_model_memory.py --shape 100 100 20 --sparsity 0.05 --n-expr 5 +""" + +import argparse +import json +import resource +import subprocess +import time +from datetime import datetime, timezone + +import numpy as np +import pandas as pd +import xarray as xr + +import linopy + +PRESETS = { + "small": {"shape": (100, 100, 20), "sparsity": 0.2, "n_expr": 5}, + "medium": {"shape": (200, 200, 50), "sparsity": 0.2, "n_expr": 5}, + "large": {"shape": (300, 300, 100), "sparsity": 0.2, "n_expr": 5}, +} + + +def get_git_info(): + """Return current git branch and short SHA.""" + info = {} + try: + info["git_branch"] = ( + subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL + ) + .decode() + .strip() + ) + info["git_sha"] = ( + subprocess.check_output( + ["git", "rev-parse", "--short", "HEAD"], stderr=subprocess.DEVNULL + ) + .decode() + .strip() + ) + except Exception: + info["git_branch"] = "unknown" + info["git_sha"] = "unknown" + return info + + +def make_coords(shape): + return [pd.RangeIndex(s, name=f"d{i}") for i, s in enumerate(shape)] + + +def make_sparse_coeffs(shape, sparsity, rng, dtype=np.float64): + """Return dense array with (1 - sparsity) fraction set to zero, plus bool mask.""" + mask = rng.random(shape) < sparsity + vals = rng.standard_normal(shape).astype(dtype) + vals[~mask] = 0.0 + return vals, mask + + +def build_model(shape, sparsity, n_expr, seed=42): + """Build a linopy model with sparse expressions and constraints.""" + rng = np.random.default_rng(seed) + coords = make_coords(shape) + dims = [c.name for c in coords] + + m = linopy.Model() + + variables = [] + coeff_arrays = [] + masks = [] + + for i in range(n_expr): + v = m.add_variables(lower=0, coords=coords, name=f"x{i}") + variables.append(v) + c, mask = make_sparse_coeffs(shape, sparsity, rng) + coeff_arrays.append(c) + masks.append(mask) + + # Build expressions and sum + exprs = [coeff_arrays[i] * variables[i] for i in range(n_expr)] + total = sum(exprs) + + # Combined mask + combined_mask = np.zeros(shape, dtype=bool) + for mask in masks: + combined_mask |= mask + combined_mask_da = xr.DataArray( + combined_mask, dims=dims, coords={c.name: c for c in coords} + ) + + # Add constraints + m.add_constraints(total <= 1, name="con", mask=combined_mask_da) + + return m + + +def get_peak_rss_mb(): + """Get peak RSS in MB via resource module.""" + ru = resource.getrusage(resource.RUSAGE_SELF) + # On macOS ru_maxrss is in bytes, on Linux it's in KB + import sys + + if sys.platform == "darwin": + return ru.ru_maxrss / 1024**2 + return ru.ru_maxrss / 1024 + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Profile linopy model building memory usage." + ) + parser.add_argument( + "--shape", + type=int, + nargs="+", + help="Dimensions of the problem, e.g. --shape 100 100 20", + ) + parser.add_argument( + "--sparsity", + type=float, + default=0.05, + help="Fraction of non-zero coefficients (default: 0.05)", + ) + parser.add_argument( + "--n-expr", + type=int, + default=5, + help="Number of expressions to build (default: 5)", + ) + parser.add_argument( + "--preset", + choices=PRESETS.keys(), + help="Use a preset configuration (overrides --shape, --sparsity, --n-expr)", + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Path to write JSON results (default: stdout only)", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + if args.preset: + config = PRESETS[args.preset] + shape = config["shape"] + sparsity = config["sparsity"] + n_expr = config["n_expr"] + elif args.shape: + shape = tuple(args.shape) + sparsity = args.sparsity + n_expr = args.n_expr + else: + config = PRESETS["medium"] + shape = config["shape"] + sparsity = config["sparsity"] + n_expr = config["n_expr"] + + total_elements = int(np.prod(shape)) + print(f"Config: shape={shape} sparsity={sparsity} n_expr={n_expr}") + print(f"Total elements: {total_elements:,}") + print("-" * 60) + + t0 = time.perf_counter() + build_model(shape, sparsity, n_expr) + elapsed = time.perf_counter() - t0 + peak_rss = get_peak_rss_mb() + + print(f"Peak RSS: {peak_rss:.1f} MB") + print(f"Elapsed: {elapsed:.2f} s") + + git_info = get_git_info() + result = { + **git_info, + "timestamp": datetime.now(timezone.utc).isoformat(), + "config": {"shape": list(shape), "sparsity": sparsity, "n_expr": n_expr}, + "peak_rss_mb": round(peak_rss, 1), + "elapsed_s": round(elapsed, 2), + } + + if args.output: + with open(args.output, "w") as f: + json.dump(result, f, indent=2) + print(f"\nResults written to {args.output}") + else: + print(f"\nJSON: {json.dumps(result)}") + + +if __name__ == "__main__": + main() diff --git a/linopy/constraints.py b/linopy/constraints.py index 291beb1d..d171d49d 100644 --- a/linopy/constraints.py +++ b/linopy/constraints.py @@ -109,38 +109,55 @@ class Constraint: functions can be applied to it. """ - __slots__ = ("_data", "_model", "_assigned") + __slots__ = ("_data", "_model", "_assigned", "_lazy_parts", "_name", "_label_range") _fill_value = FILL_VALUE def __init__( self, - data: Dataset, + data: Dataset | None, model: Model, name: str = "", skip_broadcast: bool = False, + lazy_parts: list[Dataset] | None = None, ) -> None: """ Initialize the Constraint. Parameters ---------- - labels : xarray.DataArray - labels of the constraint. + data : xarray.Dataset or None + Constraint data. Can be None when lazy_parts is provided. model : linopy.Model Underlying model. name : str Name of the constraint. + skip_broadcast : bool + Skip broadcasting data. + lazy_parts : list of Dataset, optional + Per-part constraint datasets for lazy evaluation. When set, + data can be None and will be materialized on first access. """ from linopy.model import Model - if not isinstance(data, Dataset): - raise ValueError(f"data must be a Dataset, got {type(data)}") - if not isinstance(model, Model): raise ValueError(f"model must be a Model, got {type(model)}") + self._model = model + self._lazy_parts = lazy_parts + self._name = name + self._label_range: tuple[int, int] | None = None + + if lazy_parts is not None: + # Defer materialization — _data computed on first .data access + self._data = None + self._assigned = False + return + + if not isinstance(data, Dataset): + raise ValueError(f"data must be a Dataset, got {type(data)}") + # check that `labels`, `lower` and `upper`, `sign` and `mask` are in data for attr in ("coeffs", "vars", "sign", "rhs"): if attr not in data: @@ -153,7 +170,6 @@ def __init__( self._assigned = "labels" in data self._data = data - self._model = model def __getitem__( self, selector: str | int | slice | list | tuple | dict @@ -171,6 +187,11 @@ def attrs(self) -> dict[str, Any]: """ Get the attributes of the constraint. """ + if self._data is None and self._lazy_parts: + attrs: dict[str, Any] = {"name": self._name} + if self._label_range is not None: + attrs["label_range"] = self._label_range + return attrs return self.data.attrs @property @@ -238,8 +259,41 @@ def data(self) -> Dataset: """ Get the underlying DataArray. """ + if self._data is None and self._lazy_parts: + self._materialize_from_parts() + assert self._data is not None return self._data + def _materialize_from_parts(self) -> None: + """Materialize constraint data from lazy parts (fallback).""" + from linopy.common import check_common_keys_values + from linopy.constants import HELPER_DIMS + + assert self._lazy_parts is not None + parts = self._lazy_parts + coord_dims: list[dict[str, int]] = [ + {str(k): v for k, v in p.sizes.items() if k not in HELPER_DIMS} + for p in parts + ] + override = check_common_keys_values(coord_dims) + kwargs = { + "coords": "minimal", + "compat": "override", + "fill_value": FILL_VALUE, + "join": "override" if override else "outer", + } + ds = xr.concat(parts, TERM_DIM, **kwargs) # type: ignore[call-overload] + for d in set(HELPER_DIMS) & set(ds.coords): + ds = ds.reset_index(d, drop=True) + (ds,) = xr.broadcast(ds, exclude=[TERM_DIM]) + mat_attrs: dict[str, Any] = {"name": self._name} + if self._label_range is not None: + mat_attrs["label_range"] = self._label_range + ds = ds.assign_attrs(**mat_attrs) + self._data = ds + self._assigned = "labels" in ds + self._lazy_parts = None # clear parts after materialization + @property def labels(self) -> DataArray: """ @@ -586,6 +640,9 @@ def flat(self) -> pd.DataFrame: ------- df : pandas.DataFrame """ + if self._lazy_parts: + return self._flat_from_parts() + ds = self.data def mask_func(data: pd.DataFrame) -> pd.Series: @@ -604,41 +661,106 @@ def mask_func(data: pd.DataFrame) -> pd.Series: check_has_nulls(df, name=f"{self.type} {self.name}") return df - def to_polars(self) -> pl.DataFrame: - """ - Convert the constraint to a polars DataFrame. + def _flat_from_parts(self) -> pd.DataFrame: + """Convert lazy constraint parts to flat DataFrame per-part.""" - The resulting DataFrame represents a long table format of the all - non-masked constraints with non-zero coefficients. It typically - contains the columns `labels`, `coeffs`, `vars`, `rhs`, `sign`. + def mask_func(data: dict) -> pd.Series: + mask = (data["vars"] != -1) & (data["coeffs"] != 0) + if "labels" in data: + mask &= data["labels"] != -1 + return mask - Returns - ------- - df : polars.DataFrame - """ - ds = self.data + assert self._lazy_parts is not None + frames = [] + for part in self._lazy_parts: + df = to_dataframe(part, mask_func=mask_func) + if len(df): + frames.append(df) + + if not frames: + return pd.DataFrame(columns=["coeffs", "vars", "labels", "rhs", "sign"]) + + df = pd.concat(frames, ignore_index=True) + + # Group repeated variables in the same constraint + group_keys = ["labels", "vars"] if "labels" in df.columns else ["vars"] + agg_custom = {k: "first" for k in list(df.columns)} + agg_standards = dict(coeffs="sum", rhs="first", sign="first") + agg = {**agg_custom, **agg_standards} + df = df.groupby(group_keys, as_index=False).aggregate(agg) + name = self.name if self._data is not None else "" + check_has_nulls(df, name=f"{self.type} {name}") + return df + @staticmethod + def _dataset_to_polars(ds: Dataset, name: str = "") -> pl.DataFrame: + """Convert a single constraint Dataset to a polars DataFrame.""" keys = [k for k in ds if ("_term" in ds[k].dims) or (k == "labels")] long = to_polars(ds[keys]) - long = filter_nulls_polars(long) long = group_terms_polars(long) - check_has_nulls_polars(long, name=f"{self.type} {self.name}") + check_has_nulls_polars(long, name=name) short_ds = ds[[k for k in ds if "_term" not in ds[k].dims]] schema = infer_schema_polars(short_ds) schema["sign"] = pl.Enum(["=", "<=", ">="]) short = to_polars(short_ds, schema=schema) short = filter_nulls_polars(short) - check_has_nulls_polars(short, name=f"{self.type} {self.name}") + check_has_nulls_polars(short, name=name) + + return pl.concat([short, long], how="diagonal_relaxed") - df = pl.concat([short, long], how="diagonal_relaxed").sort(["labels", "rhs"]) - # delete subsequent non-null rhs (happens is all vars per label are -1) + @staticmethod + def _finalize_polars(df: pl.DataFrame) -> pl.DataFrame: + """Sort, deduplicate rhs rows, and select columns.""" + df = df.sort(["labels", "rhs"]) + # delete subsequent non-null rhs (happens if all vars per label are -1) is_non_null = df["rhs"].is_not_null() prev_non_is_null = is_non_null.shift(1).fill_null(False) df = df.filter(is_non_null & ~prev_non_is_null | ~is_non_null) return df[["labels", "coeffs", "vars", "sign", "rhs"]] + def to_polars(self) -> pl.DataFrame: + """ + Convert the constraint to a polars DataFrame. + + The resulting DataFrame represents a long table format of the all + non-masked constraints with non-zero coefficients. It typically + contains the columns `labels`, `coeffs`, `vars`, `rhs`, `sign`. + + Returns + ------- + df : polars.DataFrame + """ + if self._lazy_parts: + return self._to_polars_from_parts() + + df = self._dataset_to_polars(self.data, name=f"{self.type} {self.name}") + return self._finalize_polars(df) + + def _to_polars_from_parts(self) -> pl.DataFrame: + """Convert lazy constraint parts to polars DataFrame per-part.""" + assert self._lazy_parts is not None + frames = [ + df for part in self._lazy_parts if len(df := self._dataset_to_polars(part)) + ] + + if not frames: + return pl.DataFrame( + schema={ + "labels": pl.Int64, + "coeffs": pl.Float64, + "vars": pl.Int64, + "sign": pl.Utf8, + "rhs": pl.Float64, + } + ) + + name = self.name if self._data is not None else "" + df = self._finalize_polars(pl.concat(frames)) + check_has_nulls_polars(df, name=f"{self.type} {name}") + return df + # Wrapped function which would convert variable to dataarray assign = conwrap(Dataset.assign) @@ -923,38 +1045,57 @@ def equalities(self) -> Constraints: """ return self[[n for n, s in self.items() if (s.sign == EQUAL).all()]] + def _apply_per_constraint(self, fn: Callable[[Dataset], Dataset]) -> None: + """Apply *fn* to each constraint's parts (lazy) or data (materialized).""" + for name in self: + con = self[name] + if con._lazy_parts: + con._lazy_parts[:] = [fn(part) for part in con._lazy_parts] + else: + con._data = fn(con.data) + def sanitize_zeros(self) -> None: """ Filter out terms with zero and close-to-zero coefficient. """ - for name in self: - not_zero = abs(self[name].coeffs) > 1e-10 - con = self[name] - con.vars = self[name].vars.where(not_zero, -1) - con.coeffs = self[name].coeffs.where(not_zero) + + def _fn(ds: Dataset) -> Dataset: + not_zero = abs(ds.coeffs) > 1e-10 + return assign_multiindex_safe( + ds, + vars=ds.vars.where(not_zero, -1), + coeffs=ds.coeffs.where(not_zero), + ) + + self._apply_per_constraint(_fn) def sanitize_missings(self) -> None: """ Set constraints labels to -1 where all variables in the lhs are missing. """ - for name in self: - con = self[name] - contains_non_missing = (con.vars != -1).any(con.term_dim) - labels = self[name].labels.where(contains_non_missing, -1) - con._data = assign_multiindex_safe(con.data, labels=labels) + + def _fn(ds: Dataset) -> Dataset: + term_dim = [d for d in ds.vars.dims if d.startswith("_term")][0] + contains_non_missing = (ds.vars != -1).any(term_dim) + labels = ds.labels.where(contains_non_missing, -1) + return assign_multiindex_safe(ds, labels=labels) + + self._apply_per_constraint(_fn) def sanitize_infinities(self) -> None: """ Replace infinite values in the constraints with a large value. """ - for name in self: - con = self[name] - valid_infinity_values = ((con.sign == LESS_EQUAL) & (con.rhs == np.inf)) | ( - (con.sign == GREATER_EQUAL) & (con.rhs == -np.inf) + + def _fn(ds: Dataset) -> Dataset: + valid = ((ds.sign == LESS_EQUAL) & (ds.rhs == np.inf)) | ( + (ds.sign == GREATER_EQUAL) & (ds.rhs == -np.inf) ) - labels = con.labels.where(~valid_infinity_values, -1) - con._data = assign_multiindex_safe(con.data, labels=labels) + labels = ds.labels.where(~valid, -1) + return assign_multiindex_safe(ds, labels=labels) + + self._apply_per_constraint(_fn) def get_name_by_label(self, label: int | float) -> str: """ diff --git a/linopy/expressions.py b/linopy/expressions.py index 10e243de..1af7b142 100644 --- a/linopy/expressions.py +++ b/linopy/expressions.py @@ -9,6 +9,7 @@ import functools import logging +import math from abc import ABC, abstractmethod from collections.abc import Callable, Hashable, Iterator, Mapping, Sequence from dataclasses import dataclass, field @@ -869,6 +870,11 @@ def to_constraint(self, sign: SignLike, rhs: SideLike) -> Constraint: f"Both sides of the constraint are constant. At least one side must contain variables. {self} {rhs}" ) + # If rhs is a LazyLinearExpression, delegate to its to_constraint + # so the lazy paths (coord-disjoint, redistribute) can be tried. + if isinstance(rhs, LazyLinearExpression) and rhs.is_lazy: + return (self - rhs).to_constraint(sign, 0) + all_to_lhs = (self - rhs).data data = assign_multiindex_safe( all_to_lhs[["coeffs", "vars"]], sign=sign, rhs=-all_to_lhs.const @@ -1265,7 +1271,7 @@ class LinearExpression(BaseExpression): >>> other = 4 * y >>> type(expr + other) - + Multiplying: @@ -1772,6 +1778,588 @@ def from_constant(cls, model: Model, constant: ConstantLike) -> LinearExpression return LinearExpression(const_da, model) +def _parts_are_disjoint(parts: list[Dataset]) -> bool: + """Check if all parts have non-empty, mutually disjoint coord dims.""" + coord_dims = [set(k for k in p.sizes if k not in HELPER_DIMS) for p in parts] + if not all(coord_dims): + return False + for i, di in enumerate(coord_dims): + for dj in coord_dims[i + 1 :]: + if di & dj: + return False + return True + + +def _parts_are_coord_disjoint(parts: list[Dataset]) -> bool: + """ + Check if parts have non-overlapping coordinate spaces. + + Two parts are coordinate-disjoint if, for every pair that shares + dimension names, at least one shared dimension has completely + non-overlapping coordinate values. All parts must have at least + one non-helper dimension (scalars need broadcasting, not slicing). + """ + all_coord_dims = [{k for k in p.sizes if k not in HELPER_DIMS} for p in parts] + if not all(all_coord_dims): + return False + for i, pi in enumerate(parts): + dims_i = {k for k in pi.sizes if k not in HELPER_DIMS} + for pj in parts[i + 1 :]: + dims_j = {k for k in pj.sizes if k not in HELPER_DIMS} + shared = dims_i & dims_j + if not shared: + continue + disjoint_on_some_dim = False + for d in shared: + if d in pi.coords and d in pj.coords: + vals_i = set(pi[d].values.flat) + vals_j = set(pj[d].values.flat) + if not (vals_i & vals_j): + disjoint_on_some_dim = True + break + if not disjoint_on_some_dim: + return False + return True + + +def _try_redistribute(parts: list[Dataset]) -> list[Dataset] | None: + """ + Redistribute parts so a broad part is sliced into coord-disjoint pieces. + + When one part spans the full coordinate space (e.g. a variable with all + contributors × effects × time) and the remaining parts cover disjoint + subsets, this slices the broad part to each narrow part's coordinates + and concatenates along ``_term``. The result is a list of parts that + are coordinate-disjoint and can be used in the lazy constraint path. + """ + if len(parts) <= 1: + return None + + coord_dims_per = [{k for k in p.sizes if k not in HELPER_DIMS} for p in parts] + # All parts must share the same coord dims + if len(set(frozenset(d) for d in coord_dims_per)) != 1: + return None + coord_dims = coord_dims_per[0] + if not coord_dims: + return None + + # Find broadest part by total element count; must be strictly largest + sizes = [math.prod(p.sizes[d] for d in coord_dims) for p in parts] + max_size = max(sizes) + if sizes.count(max_size) != 1: + return None # multiple parts with same max size → no clear broad part + broad_idx = sizes.index(max_size) + broad = parts[broad_idx] + narrow_parts = [p for i, p in enumerate(parts) if i != broad_idx] + + if not narrow_parts: + return None + + # Narrow parts must be pairwise coord-disjoint + if not _parts_are_coord_disjoint(narrow_parts): + return None + + # Tiling check: narrow parts must perfectly tile the broad part + broad_total = sizes[broad_idx] + narrow_total = sum(math.prod(p.sizes[d] for d in coord_dims) for p in narrow_parts) + for dim in coord_dims: + broad_vals = set(broad[dim].values.flat) + narrow_vals: set = set() + for n in narrow_parts: + if dim in n.coords: + narrow_vals |= set(n[dim].values.flat) + if not narrow_vals >= broad_vals: + return None + if narrow_total != broad_total: + # Cross-product gaps → fall back + return None + + # Redistribute: slice broad into each narrow's coordinate space + enriched: list[Dataset] = [] + for n in narrow_parts: + sel_dict = {} + for dim in coord_dims: + if dim in n.coords and dim in broad.coords: + sel_dict[dim] = n[dim] + sliced = broad.sel(sel_dict) if sel_dict else broad + combined = xr.concat( + [n, sliced], + TERM_DIM, + coords="minimal", + compat="override", + join="override", + fill_value=FILL_VALUE, + ) + for d in set(HELPER_DIMS) & set(combined.coords): + combined = combined.reset_index(d, drop=True) + enriched.append(combined) + + return enriched + + +class LazyLinearExpression(LinearExpression): + """ + A LinearExpression that defers merge along _term. + + Instead of concatenating datasets immediately (which triggers dense + outer-join padding in xarray), this class stores a list of compact + per-expression datasets. Materialization into a single dense Dataset + happens lazily on first access to ``.data`` so that all existing code + paths work unchanged. The speedup comes from overriding the ``flat`` + property (the solver hot-path) to iterate parts directly, avoiding the + dense padding entirely. + """ + + __slots__ = ("_parts", "_const_override") + + def __init__( + self, + data_or_parts: Dataset | list[Dataset], + model: Model, + *, + parts: list[Dataset] | None = None, + const: DataArray | None = None, + ) -> None: + # Normal LinearExpression construction (e.g. from exprwrap fallback) + if parts is None and not isinstance(data_or_parts, list): + super().__init__(data_or_parts, model) + self._parts: list[Dataset] = [] + self._const_override = None + return + + # Lazy construction: store parts, defer materialization + if parts is not None: + part_list = parts + else: + part_list = data_or_parts # type: ignore[assignment] + + object.__setattr__(self, "_parts", part_list) + object.__setattr__(self, "_const_override", const) + object.__setattr__(self, "_model", model) + object.__setattr__(self, "_data", None) # lazy + + @property + def data(self) -> Dataset: + if self._data is None: + object.__setattr__(self, "_data", self._materialize()) + object.__setattr__(self, "_const_override", None) + return self._data + + @property + def const(self) -> DataArray: + if self._const_override is not None: + if self.is_lazy and self._const_override.ndim == 0 and self._parts: + # Broadcast scalar const to match parts' coord dims. + # Use the first part's dims as reference — this is correct + # when parts share the same coord space (common case). + # For disjoint dims, materialization is needed for full const. + ref = self._parts[0] + coord_sizes = { + k: v for k, v in ref.sizes.items() if k not in HELPER_DIMS + } + if coord_sizes: + return self._const_override.broadcast_like( + xr.DataArray( + np.zeros(list(coord_sizes.values())), + dims=list(coord_sizes.keys()), + ) + ) + return self._const_override + return self.data.const + + @const.setter + def const(self, value: DataArray) -> None: + object.__setattr__(self, "_const_override", None) + self._data = assign_multiindex_safe(self.data, const=value) + + @property + def is_lazy(self) -> bool: + """True if the expression has not been materialized yet.""" + return self._data is None and len(self._parts) > 0 + + def _materialize(self) -> Dataset: + """Fall back to the standard dense merge.""" + if not self._parts: + raise ValueError("LazyLinearExpression has no parts to materialize") + + # Detect if all parts share the same coordinate space + coord_dims = [ + {k: v for k, v in p.sizes.items() if k not in HELPER_DIMS} + for p in self._parts + ] + override = check_common_keys_values(coord_dims) # type: ignore[arg-type] + + kwargs: dict[str, Any] = { + "coords": "minimal", + "compat": "override", + "join": "override" if override else "outer", + "fill_value": FILL_VALUE, + } + ds = xr.concat(self._parts, TERM_DIM, **kwargs) + + for d in set(HELPER_DIMS) & set(ds.coords): + ds = ds.reset_index(d, drop=True) + + if self._const_override is not None: + ds = assign_multiindex_safe(ds, const=self._const_override) + else: + ds = ds.assign(const=0.0) + + # Normalize: transpose so _term is last, broadcast coord dims + ds = Dataset(ds.transpose(..., TERM_DIM)) + (ds,) = xr.broadcast(ds, exclude=HELPER_DIMS) + + return ds + + @property + def flat(self) -> pd.DataFrame: + """ + Convert the expression to a pandas DataFrame without materializing. + + Each part is converted independently and the results are concatenated, + avoiding the dense outer-join padding entirely. + """ + if not self.is_lazy: + return super().flat + + frames = [] + for part in self._parts: + + def mask_func( + datadict: dict[Hashable, np.ndarray], + ) -> pd.Series: + return (datadict["vars"] != -1) & (datadict["coeffs"] != 0) + + df = to_dataframe(part, mask_func=mask_func) + if len(df): + frames.append(df) + + if not frames: + return pd.DataFrame( + {"coeffs": pd.Series(dtype=float), "vars": pd.Series(dtype=int)} + ) + + df = pd.concat(frames, ignore_index=True) + df = df.groupby("vars", as_index=False).sum() + check_has_nulls(df, name=self.type) + return df + + def __add__( # type: ignore[override] + self, + other: ConstantLike + | VariableLike + | ScalarLinearExpression + | LinearExpression + | QuadraticExpression, + ) -> LinearExpression | QuadraticExpression: + if not self.is_lazy: + return super().__add__(other) + + if isinstance(other, QuadraticExpression): + return other.__add__(self) + + try: + if np.isscalar(other): + const = ( + self._const_override + if self._const_override is not None + else xr.DataArray(0.0) + ) + return LazyLinearExpression( + None, # type: ignore[arg-type] + self._model, + parts=list(self._parts), + const=const + other, + ) + + if isinstance(other, LazyLinearExpression) and other.is_lazy: + # Merge parts lists + const_self = ( + self._const_override + if self._const_override is not None + else xr.DataArray(0.0) + ) + const_other = ( + other._const_override + if other._const_override is not None + else xr.DataArray(0.0) + ) + aligned_self, aligned_other = xr.align( + const_self, const_other, join="outer", fill_value=0 + ) + return LazyLinearExpression( + None, # type: ignore[arg-type] + self._model, + parts=self._parts + other._parts, + const=aligned_self + aligned_other, + ) + + # For LinearExpression/Variable, use merge directly to avoid + # materializing self via coord_dims access in super().__add__ + other = as_expression(other, model=self._model) + return merge([self, other]) + except TypeError: + return NotImplemented + + def __neg__(self) -> LinearExpression: # type: ignore[override] + if not self.is_lazy: + return super().__neg__() + neg_parts = [assign_multiindex_safe(p, coeffs=-p.coeffs) for p in self._parts] + const = ( + self._const_override + if self._const_override is not None + else xr.DataArray(0.0) + ) + return LazyLinearExpression( + None, # type: ignore[arg-type] + self._model, + parts=neg_parts, + const=-const, + ) + + def __sub__( # type: ignore[override] + self, + other: ConstantLike + | VariableLike + | ScalarLinearExpression + | LinearExpression + | QuadraticExpression, + ) -> LinearExpression | QuadraticExpression: + if not self.is_lazy: + return super().__sub__(other) + try: + return self.__add__(-other) + except TypeError: + return NotImplemented + + def _compact(self) -> LazyLinearExpression: + """ + Merge parts that share the same coordinate space. + + Groups parts by their non-helper dimension signature and merges + each group with ``join="override"`` (cheap, no padding). This + keeps the part count low after many chained additions. + """ + if not self.is_lazy or len(self._parts) <= 1: + return self + + from itertools import groupby + + def _coord_key(p: Dataset) -> tuple: + return tuple( + sorted((k, v) for k, v in p.sizes.items() if k not in HELPER_DIMS) + ) + + sorted_parts = sorted(self._parts, key=_coord_key) + merged: list[Dataset] = [] + for _key, group in groupby(sorted_parts, key=_coord_key): + group_list = list(group) + if len(group_list) == 1: + merged.append(group_list[0]) + else: + ds = xr.concat( + group_list, + TERM_DIM, + coords="minimal", + compat="override", + join="override", + fill_value=FILL_VALUE, + ) + for d in set(HELPER_DIMS) & set(ds.coords): + ds = ds.reset_index(d, drop=True) + merged.append(ds) + + return LazyLinearExpression( + None, # type: ignore[arg-type] + self._model, + parts=merged, + const=self._const_override, + ) + + def to_constraint(self, sign: SignLike, rhs: SideLike) -> Constraint: + """Build constraint per-part to avoid Cartesian-product materialization.""" + if not self.is_lazy: + return super().to_constraint(sign, rhs) + + # Compute lhs = self - rhs (stays lazy for scalar/constant rhs) + lhs = self - rhs + if not (isinstance(lhs, LazyLinearExpression) and lhs.is_lazy): + # Fell back to non-lazy, use standard path + return LinearExpression.to_constraint(lhs, sign, 0) + + # Only use lazy constraint path when ALL parts have non-empty, + # mutually disjoint coord dims. When parts share coords or have + # empty dims (scalars), they must be merged so each constraint + # label spans all terms at the same coordinate. + if _parts_are_disjoint(lhs._parts): + # Build per-part constraint datasets + # Use raw _const_override (scalar) to avoid broadcasting across + # disjoint dimensions when assigning rhs to each part + raw_const = ( + lhs._const_override + if lhs._const_override is not None + else xr.DataArray(0.0) + ) + rhs_val = -raw_const + constraint_parts: list[Dataset] = [] + for part in lhs._parts: + ds = assign_multiindex_safe( + part[["coeffs", "vars"]], sign=sign, rhs=rhs_val + ) + constraint_parts.append(ds) + + return constraints.Constraint( + None, model=self.model, lazy_parts=constraint_parts + ) + + # Parts share dimension names but may be disjoint by coordinate values + if _parts_are_coord_disjoint(lhs._parts): + raw_const = ( + lhs._const_override + if lhs._const_override is not None + else xr.DataArray(0.0) + ) + rhs_val = -raw_const + constraint_parts_cd: list[Dataset] = [] + for part in lhs._parts: + part_rhs = rhs_val + if isinstance(rhs_val, xr.DataArray) and rhs_val.dims: + sel = { + d: part[d] + for d in rhs_val.dims + if d in part.coords and d not in HELPER_DIMS + } + if sel: + part_rhs = rhs_val.sel(sel) + ds = assign_multiindex_safe( + part[["coeffs", "vars"]], sign=sign, rhs=part_rhs + ) + constraint_parts_cd.append(ds) + + return constraints.Constraint( + None, model=self.model, lazy_parts=constraint_parts_cd + ) + + # Try redistribute: slice a broad part across coord-disjoint narrows + # Use raw parts (not compacted) because _compact merges same-shape + # parts with different coordinates, destroying the tiling structure. + redistributed = _try_redistribute(lhs._parts) + if redistributed is not None: + raw_const = ( + lhs._const_override + if lhs._const_override is not None + else xr.DataArray(0.0) + ) + rhs_val = -raw_const + constraint_parts_rd: list[Dataset] = [] + for part in redistributed: + part_rhs = rhs_val + if isinstance(rhs_val, xr.DataArray) and rhs_val.dims: + sel = { + d: part[d] + for d in rhs_val.dims + if d in part.coords and d not in HELPER_DIMS + } + if sel: + part_rhs = rhs_val.sel(sel) + ds = assign_multiindex_safe( + part[["coeffs", "vars"]], sign=sign, rhs=part_rhs + ) + constraint_parts_rd.append(ds) + + return constraints.Constraint( + None, model=self.model, lazy_parts=constraint_parts_rd + ) + + # Fall back to materialization + return LinearExpression.to_constraint(lhs, sign, 0) + + def to_polars(self) -> pl.DataFrame: + """Convert the expression to a polars DataFrame without materializing.""" + if not self.is_lazy: + return super().to_polars() + + frames = [] + for part in self._parts: + df = to_polars(part) + df = filter_nulls_polars(df) + if len(df): + frames.append(df) + + if not frames: + return pl.DataFrame( + { + "vars": pl.Series([], dtype=pl.Int64), + "coeffs": pl.Series([], dtype=pl.Float64), + } + ) + + df = pl.concat(frames) + df = group_terms_polars(df) + check_has_nulls_polars(df, name=self.type) + return df + + def sel(self, *args: Any, **kwargs: Any) -> LazyLinearExpression | LinearExpression: + """Apply sel to each part independently.""" + if not self.is_lazy: + return super().sel(*args, **kwargs) + # Materialize — sel semantics with indexers may not apply cleanly per-part + return LinearExpression.sel(self, *args, **kwargs) + + def rename( + self, *args: Any, **kwargs: Any + ) -> LazyLinearExpression | LinearExpression: + """Apply rename to each part independently, skipping parts that lack the dim.""" + if not self.is_lazy: + return super().rename(*args, **kwargs) + # Build the name mapping + from xarray.core.utils import either_dict_or_kwargs + + name_dict = either_dict_or_kwargs(args[0] if args else {}, kwargs, "rename") + new_parts = [] + for p in self._parts: + # Only rename dims/vars that exist in this part + applicable = {k: v for k, v in name_dict.items() if k in p or k in p.dims} + new_parts.append(p.rename(applicable) if applicable else p) + const = self._const_override + if const is not None: + applicable = {k: v for k, v in name_dict.items() if k in const.dims} + if applicable: + const = const.rename(applicable) + return LazyLinearExpression( + None, # type: ignore[arg-type] + self._model, + parts=new_parts, + const=const, + ) + + def shift( + self, *args: Any, **kwargs: Any + ) -> LazyLinearExpression | LinearExpression: + """Apply shift to each part independently.""" + if not self.is_lazy: + return super().shift(*args, **kwargs) + # Materialize — shift fill behavior needs consistent coord space + return LinearExpression.shift(self, *args, **kwargs) + + def diff(self, dim: str, n: int = 1) -> LazyLinearExpression | LinearExpression: # type: ignore[override] + """Apply diff to each part that contains the dimension.""" + if not self.is_lazy: + return super().diff(dim, n) + new_parts = [] + for p in self._parts: + if dim in p.dims: + new_parts.append(p.diff(dim, n)) + else: + new_parts.append(p) + return LazyLinearExpression( + None, # type: ignore[arg-type] + self._model, + parts=new_parts, + const=self._const_override, + ) + + class QuadraticExpression(BaseExpression): """ A quadratic expression consisting of terms of coefficients and variables. @@ -1875,7 +2463,9 @@ def __sub__(self, other: SideLike) -> QuadraticExpression: return self.assign(const=self.const - other) other = as_expression(other, model=self.model, dims=self.coord_dims) - if type(other) is LinearExpression: + if isinstance(other, LinearExpression) and not isinstance( + other, QuadraticExpression + ): other = other.to_quadexpr() return merge([self, -other], cls=self.__class__) except TypeError: @@ -2094,7 +2684,7 @@ def merge( exprs = [exprs] + list(add_exprs) # type: ignore has_quad_expression = any(type(e) is QuadraticExpression for e in exprs) - has_linear_expression = any(type(e) is LinearExpression for e in exprs) + has_linear_expression = any(isinstance(e, LinearExpression) for e in exprs) if cls is None: cls = QuadraticExpression if has_quad_expression else LinearExpression @@ -2111,7 +2701,8 @@ def merge( model = exprs[0].model - if cls in linopy_types and dim in HELPER_DIMS: + has_lazy = any(isinstance(e, LazyLinearExpression) and e.is_lazy for e in exprs) + if cls in linopy_types and dim in HELPER_DIMS and not has_lazy: coord_dims = [ {k: v for k, v in e.sizes.items() if k not in HELPER_DIMS} for e in exprs ] @@ -2119,15 +2710,24 @@ def merge( else: override = False - data = [e.data if isinstance(e, linopy_types) else e for e in exprs] - data = [fill_missing_coords(ds, fill_helper_dims=True) for ds in data] + data_list: list[Dataset] = [ + e.data + if isinstance(e, linopy_types) + and not (isinstance(e, LazyLinearExpression) and e.is_lazy) + else e # type: ignore[misc] + for e in exprs + ] + data_list = [ + fill_missing_coords(d, fill_helper_dims=True) if isinstance(d, Dataset) else d + for d in data_list + ] if not kwargs: kwargs = { "coords": "minimal", "compat": "override", } - if cls == LinearExpression: + if issubclass(cls, LinearExpression): kwargs["fill_value"] = FILL_VALUE elif cls == variables.Variable: kwargs["fill_value"] = variables.FILL_VALUE @@ -2138,20 +2738,70 @@ def merge( kwargs.setdefault("join", "outer") if dim == TERM_DIM: - ds = xr.concat([d[["coeffs", "vars"]] for d in data], dim, **kwargs) + # Use lazy merge for LinearExpression + if issubclass(cls, LinearExpression): + # Flatten any existing LazyLinearExpression parts + parts: list[Dataset] = [] + const_arrays: list[DataArray] = [] + for expr in exprs: + if isinstance(expr, LazyLinearExpression) and expr.is_lazy: + parts.extend(expr._parts) + c = ( + expr._const_override + if expr._const_override is not None + else xr.DataArray(0.0) + ) + const_arrays.append(c) + else: + d = expr.data if isinstance(expr, linopy_types) else expr + d = fill_missing_coords(d, fill_helper_dims=True) + const_arrays.append(d["const"]) + parts.append(d[["coeffs", "vars"]]) + + # Sum constants — skip zero-valued arrays to avoid creating + # a Cartesian product from disjoint dimensions. + def _is_all_zero(c: DataArray) -> bool: + vals = c.values + if isinstance(vals, np.ndarray): + return bool((vals == 0).all()) + return float(vals) == 0.0 + + nonzero_consts = [c for c in const_arrays if not _is_all_zero(c)] + if not nonzero_consts: + const: DataArray = xr.DataArray(0.0) + else: + const = nonzero_consts[0] + for c in nonzero_consts[1:]: + const, c = xr.align(const, c, join="outer", fill_value=0) + const = const + c + + return LazyLinearExpression( # type: ignore[return-value] + None, # type: ignore[arg-type] + model, + parts=parts, + const=const, + ) + + ds = xr.concat([d[["coeffs", "vars"]] for d in data_list], dim, **kwargs) subkwargs = {**kwargs, "fill_value": 0} - const = xr.concat([d["const"] for d in data], dim, **subkwargs).sum(TERM_DIM) + const = xr.concat([d["const"] for d in data_list], dim, **subkwargs).sum( + TERM_DIM + ) ds = assign_multiindex_safe(ds, const=const) elif dim == FACTOR_DIM: - ds = xr.concat([d[["vars"]] for d in data], dim, **kwargs) - coeffs = xr.concat([d["coeffs"] for d in data], dim, **kwargs).prod(FACTOR_DIM) - const = xr.concat([d["const"] for d in data], dim, **kwargs).prod(FACTOR_DIM) + ds = xr.concat([d[["vars"]] for d in data_list], dim, **kwargs) + coeffs = xr.concat([d["coeffs"] for d in data_list], dim, **kwargs).prod( + FACTOR_DIM + ) + const = xr.concat([d["const"] for d in data_list], dim, **kwargs).prod( + FACTOR_DIM + ) ds = assign_multiindex_safe(ds, coeffs=coeffs, const=const) else: - ds = xr.concat(data, dim, **kwargs) + ds = xr.concat(data_list, dim, **kwargs) - for d in set(HELPER_DIMS) & set(ds.coords): - ds = ds.reset_index(d, drop=True) + for dim_name in set(HELPER_DIMS) & set(ds.coords): + ds = ds.reset_index(dim_name, drop=True) return cls(ds, model) diff --git a/linopy/model.py b/linopy/model.py index 657b2d45..866118b5 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -591,6 +591,45 @@ def add_sos_constraints( variable.attrs.update(sos_type=sos_type, sos_dim=sos_dim) + def _label_constraint_dataset( + self, ds: Dataset, name: str, start: int, mask: MaskLike | None = None + ) -> tuple[Dataset, int]: + """ + Assign sequential labels to a constraint dataset. + + Performs infinity check, drops helper dims, broadcasts, assigns label + range, applies mask, and chunks. Returns ``(ds, next_start)``. + """ + invalid_infinity_values = ((ds.sign == LESS_EQUAL) & (ds.rhs == -np.inf)) | ( + (ds.sign == GREATER_EQUAL) & (ds.rhs == np.inf) + ) + if invalid_infinity_values.any(): + raise ValueError(f"Constraint {name} contains incorrect infinite values.") + + if drop_dims := set(HELPER_DIMS).intersection(ds.coords): + ds = ds.drop_vars(drop_dims) + + ds["labels"] = -1 + (ds,) = xr.broadcast(ds, exclude=[TERM_DIM]) + + self.check_force_dim_names(ds) + + n = ds.labels.size + ds.labels.values = np.arange(start, start + n).reshape(ds.labels.shape) + + if mask is not None: + m = as_dataarray(mask).astype(bool) + assert set(m.dims).issubset(ds.dims), ( + "Dimensions of mask not a subset of resulting labels dimensions." + ) + ds.labels.values = ds.labels.where(m, -1).values + + if self.chunk: + ds = ds.chunk(self.chunk) + + ds = ds.assign_attrs(label_range=(start, start + n), name=name) + return ds, start + n + def add_constraints( self, lhs: VariableLike @@ -656,73 +695,64 @@ def add_constraints( if sign is not None: sign = maybe_replace_signs(as_dataarray(sign)) + con: Constraint | None = None if isinstance(lhs, LinearExpression): if sign is None or rhs is None: raise ValueError(msg_sign_rhs_not_none) - data = lhs.to_constraint(sign, rhs).data + con = lhs.to_constraint(sign, rhs) elif isinstance(lhs, list | tuple): if sign is None or rhs is None: raise ValueError(msg_sign_rhs_none) - data = self.linexpr(*lhs).to_constraint(sign, rhs).data + con = self.linexpr(*lhs).to_constraint(sign, rhs) # directly convert first argument to a constraint elif callable(lhs): assert coords is not None, "`coords` must be given when lhs is a function" rule = lhs if sign is not None or rhs is not None: raise ValueError(msg_sign_rhs_none) - data = Constraint.from_rule(self, rule, coords).data + con = Constraint.from_rule(self, rule, coords) elif isinstance(lhs, AnonymousScalarConstraint): if sign is not None or rhs is not None: raise ValueError(msg_sign_rhs_none) - data = lhs.to_constraint().data + con = lhs.to_constraint() elif isinstance(lhs, Constraint): if sign is not None or rhs is not None: raise ValueError(msg_sign_rhs_none) - data = lhs.data + con = lhs elif isinstance(lhs, Variable | ScalarVariable | ScalarLinearExpression): if sign is None or rhs is None: raise ValueError(msg_sign_rhs_not_none) - data = lhs.to_linexpr().to_constraint(sign, rhs).data + con = lhs.to_linexpr().to_constraint(sign, rhs) else: raise ValueError( f"Invalid type of `lhs` ({type(lhs)}) or invalid combination of `lhs`, `sign` and `rhs`." ) - invalid_infinity_values = ( - (data.sign == LESS_EQUAL) & (data.rhs == -np.inf) - ) | ((data.sign == GREATER_EQUAL) & (data.rhs == np.inf)) # noqa: F821 - if invalid_infinity_values.any(): - raise ValueError(f"Constraint {name} contains incorrect infinite values.") - - # ensure helper dimensions are not set as coordinates - if drop_dims := set(HELPER_DIMS).intersection(data.coords): - # TODO: add a warning here, routines should be safe against this - data = data.drop_vars(drop_dims) - - data["labels"] = -1 - (data,) = xr.broadcast(data, exclude=[TERM_DIM]) - - if mask is not None: - mask = as_dataarray(mask).astype(bool) - # TODO: simplify - assert set(mask.dims).issubset(data.dims), ( - "Dimensions of mask not a subset of resulting labels dimensions." + # Lazy constraint path: per-part label assignment + if con._lazy_parts is not None: + range_start = self._cCounter + start = range_start + labeled_parts: list[Dataset] = [] + for part in con._lazy_parts: + part, start = self._label_constraint_dataset(part, name, start, mask) + labeled_parts.append(part) + + self._cCounter = start + constraint = Constraint( + None, + name=name, + model=self, + lazy_parts=labeled_parts, ) - - self.check_force_dim_names(data) - - start = self._cCounter - end = start + data.labels.size - data.labels.values = np.arange(start, end).reshape(data.labels.shape) - self._cCounter += data.labels.size - - if mask is not None: - data.labels.values = data.labels.where(mask, -1).values - - data = data.assign_attrs(label_range=(start, end), name=name) - - if self.chunk: - data = data.chunk(self.chunk) + # Store overall label_range for get_name_by_label + constraint._label_range = (range_start, start) + self.constraints.add(constraint) + return constraint + + # Standard (non-lazy) path + data = con.data + data, end = self._label_constraint_dataset(data, name, self._cCounter, mask) + self._cCounter = end constraint = Constraint(data, name=name, model=self, skip_broadcast=True) self.constraints.add(constraint) diff --git a/linopy/objective.py b/linopy/objective.py index b1449270..88ab4407 100644 --- a/linopy/objective.py +++ b/linopy/objective.py @@ -232,11 +232,13 @@ def set_value(self, value: float) -> None: @property def is_linear(self) -> bool: - return type(self.expression) is expressions.LinearExpression + return isinstance( + self.expression, expressions.LinearExpression + ) and not isinstance(self.expression, expressions.QuadraticExpression) @property def is_quadratic(self) -> bool: - return type(self.expression) is expressions.QuadraticExpression + return isinstance(self.expression, expressions.QuadraticExpression) def to_matrix(self, *args: Any, **kwargs: Any) -> csc_matrix: """Wrapper for expression.to_matrix"""