diff --git a/CHANGELOG.md b/CHANGELOG.md index dc889e1..0e5a72b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,29 @@ # Changelog +## 0.3.0 + +Third public release adding support for discrete optimization problems via Iterated Local Search (ILS). + +### Highlights + +- Added a discrete optimization framework with `DiscreteProblem` abstract base class — a generic, stateless interface for defining custom discrete problems. +- Added `BitstringProblem` base class providing out-of-the-box `random_solution()`, `local_search()`, `perturb()`, and `solution_id()` for binary-encoded problems, with configurable first-improvement (stochastic) and best-improvement (deterministic) hill climbing. +- Added built-in problem implementations: + - `NumberPartitioning`: Number Partitioning Problem with configurable hardness parameter `k`, random instance generation via `instance_seed`, or explicit weights. + - `OneMax`: Simple maximization benchmark with O(1) delta evaluation. +- Added `ILSSampler` and `ILSSamplerConfig` for constructing LONs from discrete problems via Iterated Local Search, with configurable stopping criteria (`n_iter_no_change`, `max_iter`) and equal-acceptance moves. +- Discrete and continuous sampling produce the same trace format (`[run, fit1, node1, fit2, node2]`), so `LON.from_trace_data()`, `CMLON`, metrics, and visualization all work unchanged. + +### API and Behavior Changes + +- Package now exports `DiscreteProblem`, `BitstringProblem`, `NumberPartitioning`, `OneMax`, `ILSSampler`, `ILSSamplerConfig`, and `ILSResult`. +- Internal module structure reorganized: continuous sampling moved to `lonkit.continuous.sampling`, discrete modules under `lonkit.discrete.problems` and `lonkit.discrete.sampling`. + +### Documentation + +- Updated user guide and API docs to cover the discrete framework, ILS sampling, and built-in problems. +- Added discrete quick-start example to README. + ## 0.2.0 Second public release adding multiprocessing to Basin-Hopping sampling procedure. diff --git a/README.md b/README.md index 70145b9..9ebdb79 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,14 @@ [![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/) [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1Ujl48ffgHg9ck1Hueh59s65OR3Q3BG99?usp=sharing) -**Local Optima Networks for Continuous Optimization** +**Local Optima Networks** -lonkit is a Python library for constructing, analyzing, and visualizing Local Optima Networks (LONs) for continuous optimization problems. LONs provide a powerful way to understand the structure of fitness landscapes, revealing how local optima are connected and how difficult it may be to find global optima. +lonkit is a Python library for constructing, analyzing, and visualizing Local Optima Networks (LONs) for both continuous and discrete optimization problems. LONs provide a powerful way to understand the structure of fitness landscapes, revealing how local optima are connected and how difficult it may be to find global optima. ## Features -- **Basin-Hopping Sampling**: Efficient exploration of fitness landscapes using configurable Basin-Hopping +- **Basin-Hopping Sampling**: Efficient exploration of continuous fitness landscapes using configurable Basin-Hopping +- **Iterated Local Search**: Discrete LON construction via ILS with built-in problems (Number Partitioning, OneMax) - **LON Construction**: Automatic construction of Local Optima Networks from sampling data - **CMLON Support**: Compressed Monotonic LONs for cleaner landscape analysis - **Rich Metrics**: Compute landscape metrics including funnel analysis and neutrality @@ -103,6 +104,40 @@ result = sampler.sample(rastrigin, domain) lon = sampler.sample_to_lon(result) ``` +## Discrete Optimization Problems + +lonkit also supports Local Optima Networks for discrete optimization problems using Iterated Local Search (ILS). + +### Quick Start (Discrete) + +```python +from lonkit import NumberPartitioning, ILSSampler, ILSSamplerConfig, LONVisualizer + +# Define problem instance +problem = NumberPartitioning(n=20, k=0.5, instance_seed=1) + +# Configure and run ILS sampling +config = ILSSamplerConfig(n_runs=50, n_iter_no_change=100, seed=42) +sampler = ILSSampler(config) +result = sampler.sample(problem) + +# Build LON and CMLON +lon = sampler.sample_to_lon(result) +cmlon = lon.to_cmlon() + +# Compute metrics +print(lon.compute_metrics()) +print(cmlon.compute_metrics()) + +# Visualize +viz = LONVisualizer() +viz.plot_2d(cmlon, output_path="npp_cmlon.png") +``` + +### Custom Discrete Problems + +You can define your own discrete problem by subclassing `DiscreteProblem`. For bitstring problems, inherit from `BitstringProblem` instead it provides `random_solution()`, `local_search()`, `perturb()`, and `solution_id()` out of the box. You only need to implement `evaluate()`. + ## Documentation For full documentation, visit: [https://helix-agh.github.io/lonkit](https://helix-agh.github.io/lonkit) diff --git a/docs/api/discrete.md b/docs/api/discrete.md new file mode 100644 index 0000000..3a22412 --- /dev/null +++ b/docs/api/discrete.md @@ -0,0 +1,43 @@ +# Discrete Module + +## Problems + +::: lonkit.discrete.problems.problem.DiscreteProblem + options: + show_root_heading: true + show_source: true + +::: lonkit.discrete.problems.bitstring.BitstringProblem + options: + show_root_heading: true + show_source: true + +::: lonkit.discrete.problems.bitstring.NumberPartitioning + options: + show_root_heading: true + show_source: true + +::: lonkit.discrete.problems.bitstring.OneMax + options: + show_root_heading: true + show_source: true + +## Sampling + +::: lonkit.discrete.sampling.ILSSamplerConfig + options: + show_root_heading: true + show_source: true + +::: lonkit.discrete.sampling.ILSSampler + options: + show_root_heading: true + show_source: true + members: + - sample + - sample_to_lon + +::: lonkit.discrete.sampling.ILSResult + options: + show_root_heading: true + show_source: true diff --git a/docs/api/index.md b/docs/api/index.md index 961c7d8..2693b7e 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -8,7 +8,7 @@ Complete API documentation for lonkit. ## Modules -lonkit is organized into three main modules: +lonkit is organized into the following modules: ### [LON Module](lon.md) @@ -18,13 +18,27 @@ Data structures for Local Optima Networks. - [`CMLON`](lon.md#lonkit.lon.CMLON) - Compressed Monotonic LON - [`LONConfig`](lon.md#lonkit.lon.LONConfig) - Configuration for LON construction -### [Sampling Module](sampling.md) +### [Continuous Sampling Module](sampling.md) -Basin-Hopping sampling for LON construction. +Basin-Hopping sampling for continuous LON construction. -- [`compute_lon()`](sampling.md#lonkit.sampling.compute_lon) - High-level convenience function -- [`BasinHoppingSampler`](sampling.md#lonkit.sampling.BasinHoppingSampler) - Sampling class -- [`BasinHoppingSamplerConfig`](sampling.md#lonkit.sampling.BasinHoppingSamplerConfig) - Configuration +- [`compute_lon()`](sampling.md#lonkit.continuous.sampling.compute_lon) - High-level convenience function +- [`BasinHoppingSampler`](sampling.md#lonkit.continuous.sampling.BasinHoppingSampler) - Sampling class +- [`BasinHoppingSamplerConfig`](sampling.md#lonkit.continuous.sampling.BasinHoppingSamplerConfig) - Configuration +- [`BasinHoppingResult`](sampling.md#lonkit.continuous.sampling.BasinHoppingResult) - Sampling result container +- [`StepSizeEstimator`](sampling.md#lonkit.continuous.step_size.StepSizeEstimator) - Step size estimation +- [`StepSizeEstimatorConfig`](sampling.md#lonkit.continuous.step_size.StepSizeEstimatorConfig) - Step size estimator configuration + +### [Discrete Module](discrete.md) + +Iterated Local Search sampling and built-in discrete problems. + +- [`DiscreteProblem`](discrete.md#lonkit.discrete.problems.problem.DiscreteProblem) - Abstract base for discrete problems +- [`BitstringProblem`](discrete.md#lonkit.discrete.problems.bitstring.BitstringProblem) - Base class for bitstring problems +- [`NumberPartitioning`](discrete.md#lonkit.discrete.problems.bitstring.NumberPartitioning) - Number Partitioning Problem +- [`OneMax`](discrete.md#lonkit.discrete.problems.bitstring.OneMax) - OneMax benchmark +- [`ILSSampler`](discrete.md#lonkit.discrete.sampling.ILSSampler) - ILS sampling class +- [`ILSSamplerConfig`](discrete.md#lonkit.discrete.sampling.ILSSamplerConfig) - ILS configuration ### [Visualization Module](visualization.md) diff --git a/docs/api/sampling.md b/docs/api/sampling.md index 5367d80..8aa7ccb 100644 --- a/docs/api/sampling.md +++ b/docs/api/sampling.md @@ -1,16 +1,16 @@ -# Sampling Module +# Continuous Sampling Module -::: lonkit.sampling.compute_lon +::: lonkit.continuous.sampling.compute_lon options: show_root_heading: true show_source: true -::: lonkit.sampling.BasinHoppingSamplerConfig +::: lonkit.continuous.sampling.BasinHoppingSamplerConfig options: show_root_heading: true show_source: true -::: lonkit.sampling.BasinHoppingSampler +::: lonkit.continuous.sampling.BasinHoppingSampler options: show_root_heading: true show_source: true @@ -18,7 +18,24 @@ - sample - sample_to_lon -::: lonkit.sampling.BasinHoppingResult +::: lonkit.continuous.sampling.BasinHoppingResult + options: + show_root_heading: true + show_source: true + +## Step Size Estimation + +::: lonkit.continuous.step_size.StepSizeEstimatorConfig + options: + show_root_heading: true + show_source: true + +::: lonkit.continuous.step_size.StepSizeEstimator + options: + show_root_heading: true + show_source: true + +::: lonkit.continuous.step_size.StepSizeResult options: show_root_heading: true show_source: true diff --git a/docs/api/step_size.md b/docs/api/step_size.md index addf300..f9eaf6d 100644 --- a/docs/api/step_size.md +++ b/docs/api/step_size.md @@ -1,16 +1,16 @@ # Step Size Module -::: lonkit.step_size.StepSizeEstimator +::: lonkit.continuous.step_size.StepSizeEstimator options: show_root_heading: true show_source: true -::: lonkit.step_size.StepSizeEstimatorConfig +::: lonkit.continuous.step_size.StepSizeEstimatorConfig options: show_root_heading: true show_source: true -::: lonkit.step_size.StepSizeResult +::: lonkit.continuous.step_size.StepSizeResult options: show_root_heading: true show_source: true diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index 359f43d..0139c93 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -53,7 +53,7 @@ import lonkit print(lonkit.__version__) ``` -You should see the version number (e.g., `0.1.0`). +You should see the version number (e.g., `0.3.0`). ## Troubleshooting diff --git a/docs/index.md b/docs/index.md index 359cf66..7ffaaf6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,11 +4,11 @@ description: lonkit - Python library for constructing, analyzing, and visualizin # lonkit -**Local Optima Networks for Continuous Optimization** +**Local Optima Networks** ![lonkit](assets/icon.png){ width="100%" } -lonkit is a Python library for constructing, analyzing, and visualizing Local Optima Networks (LONs) for continuous optimization problems. +lonkit is a Python library for constructing, analyzing, and visualizing Local Optima Networks (LONs) for both continuous and discrete optimization problems. ## What are Local Optima Networks? @@ -26,7 +26,13 @@ Local Optima Networks (LONs) are graph-based models that capture the global stru --- - Efficient exploration of fitness landscapes using configurable Basin-Hopping with customizable perturbation strategies + Efficient exploration of continuous fitness landscapes using configurable Basin-Hopping with customizable perturbation strategies + +- **Iterated Local Search** + + --- + + Discrete LON construction via ILS with built-in problems (Number Partitioning, OneMax) and custom problem support - **LON Construction** diff --git a/docs/user-guide/discrete.md b/docs/user-guide/discrete.md new file mode 100644 index 0000000..cd212d1 --- /dev/null +++ b/docs/user-guide/discrete.md @@ -0,0 +1,266 @@ +# Discrete Problems + +lonkit supports Local Optima Networks for discrete optimization problems using +Iterated Local Search (ILS). + +## Concepts + +### How Discrete LONs Differ from Continuous LONs + +| Aspect | Continuous | Discrete | +|--------|-----------|----------| +| **Sampling method** | Basin-Hopping (perturbation + local minimizer) | Iterated Local Search (perturbation + hill climbing) | +| **Local search** | Scipy minimizer (L-BFGS-B, etc.) | Problem-specific neighborhood search (e.g., bit-flip) | +| **Perturbation** | Random displacement in continuous space | Combinatorial move (e.g., flip k bits) | +| **Node identity** | Rounded coordinates (hash) | Exact solution representation (e.g., bitstring) | +| **Neighborhoods** | Implicit (minimizer handles it) | Explicit (defined by the problem) | + +Despite these differences, the downstream pipeline is identical: both produce +trace data in the format `[run, fit1, node1, fit2, node2]`, which feeds into +`LON.from_trace_data()` → `LON.to_cmlon()` → `LONVisualizer`. + +## Built-in Problems + +### NumberPartitioning + +The **Number Partitioning Problem (NPP)** asks: given a set of N positive integers, +partition them into two subsets such that the absolute difference of their sums is +minimized. + +```python +from lonkit import NumberPartitioning + +# Random instance with hardness parameter k +problem = NumberPartitioning(n=20, k=0.5, instance_seed=1) + +# Or provide explicit weights +problem = NumberPartitioning(n=4, weights=[7, 5, 6, 4]) +``` + +**Parameters:** + +- `n`: Number of items +- `k`: Hardness parameter. Items drawn uniformly from `[1, 2^(n*k)]`. Higher k produces harder instances. +- `instance_seed`: Seed for generating item weights (reproducible instances) +- `weights`: Explicit item weights (alternative to k + instance_seed) +- `n_perturbation_flips`: Number of random bit flips per perturbation (default: 2) +- `first_improvement`: Use first-improvement hill climbing (default: True) + +**Fitness:** `|sum(A) - sum(B)|` (minimization, optimal = 0) + +### OneMax + +**OneMax** is a simple benchmark: maximize the number of 1-bits in a bitstring. + +```python +from lonkit import OneMax + +problem = OneMax(n=20) +``` + +**Parameters:** + +- `n`: Length of the bitstring +- `n_perturbation_flips`: Number of random bit flips per perturbation (default: 2) +- `first_improvement`: Use first-improvement hill climbing (default: True) + +**Fitness:** `sum(bits)` (maximization, optimal = n) + +OneMax has a single global optimum (all ones) and supports O(1) delta evaluation. + +## Custom Problems + +To define your own discrete problem, you can subclass `DiscreteProblem`: + +```python +import numpy as np +from lonkit import DiscreteProblem + +class MaxCut(DiscreteProblem[list[int]]): + """Maximize the number of edges crossing the partition.""" + + def __init__(self, adjacency: list[list[int]]): + self.adjacency = adjacency + self.n = len(adjacency) + + @property + def minimize(self) -> bool: + return False # maximization + + def random_solution(self, rng: np.random.Generator) -> list[int]: + return rng.integers(0, 2, size=self.n).tolist() + + def evaluate(self, solution: list[int]) -> float: + cut = 0 + for i in range(self.n): + for j in self.adjacency[i]: + if solution[i] != solution[j]: + cut += 1 + return float(cut // 2) # each edge counted twice + + def local_search(self, solution: list[int], rng: np.random.Generator) -> tuple[list[int], float]: + sol = list(solution) + fitness = self.evaluate(sol) + improved = True + while improved: + improved = False + indices = list(range(self.n)) + rng.shuffle(indices) + for i in indices: + sol[i] = 1 - sol[i] + new_fitness = self.evaluate(sol) + if self.is_better(new_fitness, fitness): + fitness = new_fitness + improved = True + break + sol[i] = 1 - sol[i] + return sol, fitness + + def perturb(self, solution: list[int], rng: np.random.Generator) -> list[int]: + sol = list(solution) + indices = rng.choice(self.n, size=min(3, self.n), replace=False) + for i in indices: + sol[i] = 1 - sol[i] + return sol + + def solution_id(self, solution: list[int]) -> str: + return "".join(str(b) for b in solution) +``` + +### Abstract Methods + +| Method | Purpose | +|--------|---------| +| `minimize` (property) | Return `True` for minimization, `False` for maximization | +| `random_solution(rng)` | Generate a random initial solution | +| `evaluate(solution)` | Return the fitness value (pure function, no side effects) | +| `local_search(solution, rng)` | Run hill climbing to a local optimum; return `(solution, fitness)` | +| `perturb(solution, rng)` | Apply perturbation to escape current basin | +| `solution_id(solution)` | Return a unique string identifier for the solution | + +### Using BitstringProblem + +For bitstring-based problems, inherit from `BitstringProblem` instead. It provides +`random_solution()`, `local_search()`, `perturb()`, and `solution_id()` out of the box. +You only need to implement `evaluate()`: + +```python +from lonkit import BitstringProblem + +class LeadingOnes(BitstringProblem): + """Count consecutive leading 1-bits.""" + + @property + def minimize(self) -> bool: + return False + + def evaluate(self, solution: list[int]) -> float: + count = 0 + for bit in solution: + if bit == 1: + count += 1 + else: + break + return float(count) +``` + +**Tips:** + +- Override `delta_evaluate(solution, index)` for O(1) neighbor evaluation when possible +- Set `first_improvement=False` for deterministic best-improvement hill climbing +- Adjust `n_perturbation_flips` to control perturbation strength + +**First-improvement vs best-improvement:** + +- **First-improvement** (`first_improvement=True`, default): The scan order over neighbor + positions is randomized each pass. As soon as an improving neighbor is found, the move + is accepted immediately. This makes the local search *stochastic* — the same starting + solution may reach different local optima depending on the RNG state. +- **Best-improvement** (`first_improvement=False`): All neighbors are evaluated every pass + and the best improving move is selected. The scan order does not affect the outcome, + making the local search *deterministic* (given the same starting solution, it always + reaches the same local optimum). + +## ILS Configuration + +Configure the sampling process with `ILSSamplerConfig`: + +```python +from lonkit import ILSSamplerConfig + +config = ILSSamplerConfig( + n_runs=100, # Number of independent ILS runs + n_iter_no_change=100, # Stop after 100 non-improving iterations + max_iter=None, # No hard iteration limit (use n_iter_no_change) + accept_equal=True, # Accept lateral moves (equal fitness) + seed=42, # For reproducibility +) +``` + +**Parameters:** + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `n_runs` | 100 | Number of independent ILS restarts. More runs = better coverage of the landscape | +| `n_iter_no_change` | 100 | Patience: stop a run after this many consecutive non-improving iterations. Set to `None` to disable (must set `max_iter`) | +| `max_iter` | None | Hard limit on total iterations per run. Set to `None` for no limit (must set `n_iter_no_change`) | +| `accept_equal` | True | If `True`, accept moves to equal-fitness optima (greedy with equal acceptance). If `False`, only strictly improving moves | +| `seed` | None | Random seed controlling all search randomness | + +At least one of `n_iter_no_change` or `max_iter` must be set. + +### Comparison with BasinHoppingSamplerConfig + +| ILS Parameter | Basin-Hopping Equivalent | Notes | +|---------------|-------------------------|-------| +| `n_runs` | `n_runs` | Same meaning | +| `n_iter_no_change` | `n_iter_no_change` | Same stopping criterion | +| `accept_equal` | — | Discrete-specific; continuous uses tolerance-based comparison | +| `seed` | `seed` | Same meaning | + +## Complete Example + +```python +from lonkit import ( + NumberPartitioning, + ILSSampler, + ILSSamplerConfig, + LONVisualizer, +) + +# Create problem instance +problem = NumberPartitioning(n=20, k=0.5, instance_seed=1) + +# Configure sampler +config = ILSSamplerConfig( + n_runs=50, + n_iter_no_change=100, + seed=42, +) +sampler = ILSSampler(config) + +# Run sampling +result = sampler.sample(problem) + +# Build LON +lon = sampler.sample_to_lon(result) +print(f"LON: {lon.n_vertices} optima, {lon.n_edges} edges") + +# Build CMLON +cmlon = lon.to_cmlon() +print(f"CMLON: {cmlon.n_vertices} optima, {cmlon.n_edges} edges") + +# Compute metrics +metrics = cmlon.compute_metrics() +for key, value in metrics.items(): + print(f" {key}: {value}") + +# Visualize +viz = LONVisualizer() +viz.plot_2d(cmlon, output_path="npp_cmlon_2d.png") +viz.plot_3d(cmlon, output_path="npp_cmlon_3d.png") +``` + +## API Reference + +See the [Discrete API Reference](../api/discrete.md) for complete API documentation. diff --git a/pyproject.toml b/pyproject.toml index e8e8c79..f58f031 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "lonkit" -version = "0.2.0" -description = "Local Optima Networks (LONs) for continuous optimization - Python implementation" +version = "0.3.0" +description = "Local Optima Networks (LONs) for continuous and discrete optimization - Python implementation" readme = "README.md" requires-python = ">=3.10" authors = [{name = "HELIX AGH"}] diff --git a/src/lonkit/__init__.py b/src/lonkit/__init__.py index 068d1e4..d71eca9 100644 --- a/src/lonkit/__init__.py +++ b/src/lonkit/__init__.py @@ -1,22 +1,31 @@ -from lonkit.lon import CMLON, LON, LONConfig -from lonkit.sampling import ( +from lonkit.continuous.sampling import ( BasinHoppingResult, BasinHoppingSampler, BasinHoppingSamplerConfig, compute_lon, ) -from lonkit.step_size import StepSizeEstimator, StepSizeEstimatorConfig, StepSizeResult +from lonkit.continuous.step_size import StepSizeEstimator, StepSizeEstimatorConfig, StepSizeResult +from lonkit.discrete.problems import BitstringProblem, DiscreteProblem, NumberPartitioning, OneMax +from lonkit.discrete.sampling import ILSResult, ILSSampler, ILSSamplerConfig +from lonkit.lon import CMLON, LON, LONConfig from lonkit.visualization import LONVisualizer -__version__ = "0.1.0" +__version__ = "0.3.0" __all__ = [ "CMLON", "LON", "BasinHoppingResult", "BasinHoppingSampler", "BasinHoppingSamplerConfig", + "BitstringProblem", + "DiscreteProblem", + "ILSResult", + "ILSSampler", + "ILSSamplerConfig", "LONConfig", "LONVisualizer", + "NumberPartitioning", + "OneMax", "StepSizeEstimator", "StepSizeEstimatorConfig", "StepSizeResult", diff --git a/src/lonkit/continuous/__init__.py b/src/lonkit/continuous/__init__.py new file mode 100644 index 0000000..1b1f02a --- /dev/null +++ b/src/lonkit/continuous/__init__.py @@ -0,0 +1,21 @@ +from lonkit.continuous.sampling import ( + BasinHoppingResult, + BasinHoppingSampler, + BasinHoppingSamplerConfig, + compute_lon, +) +from lonkit.continuous.step_size import ( + StepSizeEstimator, + StepSizeEstimatorConfig, + StepSizeResult, +) + +__all__ = [ + "BasinHoppingResult", + "BasinHoppingSampler", + "BasinHoppingSamplerConfig", + "StepSizeEstimator", + "StepSizeEstimatorConfig", + "StepSizeResult", + "compute_lon", +] diff --git a/src/lonkit/sampling.py b/src/lonkit/continuous/sampling.py similarity index 100% rename from src/lonkit/sampling.py rename to src/lonkit/continuous/sampling.py diff --git a/src/lonkit/step_size.py b/src/lonkit/continuous/step_size.py similarity index 99% rename from src/lonkit/step_size.py rename to src/lonkit/continuous/step_size.py index cced435..08794ab 100644 --- a/src/lonkit/step_size.py +++ b/src/lonkit/continuous/step_size.py @@ -5,7 +5,7 @@ import numpy as np from scipy.optimize import minimize -from lonkit.sampling import BasinHoppingSampler, BasinHoppingSamplerConfig +from lonkit.continuous.sampling import BasinHoppingSampler, BasinHoppingSamplerConfig @dataclass diff --git a/src/lonkit/discrete/__init__.py b/src/lonkit/discrete/__init__.py new file mode 100644 index 0000000..1e3ac5c --- /dev/null +++ b/src/lonkit/discrete/__init__.py @@ -0,0 +1,12 @@ +from lonkit.discrete.problems import BitstringProblem, DiscreteProblem, NumberPartitioning, OneMax +from lonkit.discrete.sampling import ILSResult, ILSSampler, ILSSamplerConfig + +__all__ = [ + "BitstringProblem", + "DiscreteProblem", + "ILSResult", + "ILSSampler", + "ILSSamplerConfig", + "NumberPartitioning", + "OneMax", +] diff --git a/src/lonkit/discrete/problems/__init__.py b/src/lonkit/discrete/problems/__init__.py new file mode 100644 index 0000000..881a894 --- /dev/null +++ b/src/lonkit/discrete/problems/__init__.py @@ -0,0 +1,13 @@ +from lonkit.discrete.problems.bitstring import ( + BitstringProblem, + NumberPartitioning, + OneMax, +) +from lonkit.discrete.problems.problem import DiscreteProblem + +__all__ = [ + "BitstringProblem", + "DiscreteProblem", + "NumberPartitioning", + "OneMax", +] diff --git a/src/lonkit/discrete/problems/bitstring.py b/src/lonkit/discrete/problems/bitstring.py new file mode 100644 index 0000000..db7b8a3 --- /dev/null +++ b/src/lonkit/discrete/problems/bitstring.py @@ -0,0 +1,247 @@ +import math +import warnings + +import numpy as np + +from lonkit.discrete.problems.problem import DiscreteProblem + + +class BitstringProblem(DiscreteProblem[list[int]]): + """ + Base class for problems with binary string representation. + + Provides common bitstring operations: + - Random solution generation (random bitstring) + - Perturbation (k random bit flips) + - Solution identity (join bits as string) + - Hill climbing with first/best-improvement flip neighborhood + + Subclasses must implement `evaluate()` and may override + `minimize` or `local_search()`. + + The problem is **stateless**: `n`, `n_perturbation_flips`, and + `first_improvement` are immutable configuration. All randomness + comes from the `rng` parameter passed by the caller. + """ + + def __init__( + self, + n: int, + n_perturbation_flips: int = 2, + first_improvement: bool = True, + ): + """ + Args: + n: Length of the bitstring. Must be > 0. + n_perturbation_flips: Number of random bit flips per perturbation. + Must be in [1, n]. + first_improvement: If True, local search uses first-improvement + hill climbing (stochastic -- scan order randomized each pass). + If False, uses best-improvement (deterministic). + + Raises: + ValueError: If n <= 0 or n_perturbation_flips is out of [1, n]. + """ + if n <= 0: + raise ValueError(f"n must be positive, got {n}") + if n_perturbation_flips <= 0 or n_perturbation_flips > n: + raise ValueError( + f"n_perturbation_flips must be in [1, {n}], got {n_perturbation_flips}" + ) + self.n = n + self.n_perturbation_flips = n_perturbation_flips + self.first_improvement = first_improvement + + def random_solution(self, rng: np.random.Generator) -> list[int]: + result: list[int] = rng.integers(0, 2, size=self.n).tolist() + return result + + def solution_id(self, solution: list[int]) -> str: + return "".join(str(b) for b in solution) + + def perturb(self, solution: list[int], rng: np.random.Generator) -> list[int]: + """Flip `n_perturbation_flips` random distinct bits.""" + sol = list(solution) # copy + indices = rng.choice(self.n, size=self.n_perturbation_flips, replace=False) + for i in indices: + sol[i] = 1 - sol[i] + return sol + + def delta_evaluate( + self, + solution: list[int], # noqa: ARG002 + index: int, # noqa: ARG002 + ) -> float | None: + """ + Optional delta evaluation hook for flipping bit at `index`. + + Returns the fitness *change* (delta) if efficient delta evaluation + is supported, or None to fall back to full evaluation. + + The default implementation returns None. Subclasses like OneMax + can override this for O(1) evaluation instead of O(n). + + Args: + solution: Current solution (not modified). + index: Bit index that would be flipped. + + Returns: + Fitness delta (new_fitness - current_fitness), or None. + """ + return None + + def local_search( + self, solution: list[int], rng: np.random.Generator + ) -> tuple[list[int], float]: + """ + First/best improvement hill climbing with 1-bit-flip neighborhood. + + Scans all N bit positions. For first-improvement, the scan order + is randomized each pass (stochastic). For best-improvement, the + scan is deterministic. Stops when no improving neighbor exists. + + Uses `delta_evaluate()` when available for O(1) neighbor evaluation; + falls back to full `evaluate()` otherwise. + """ + sol = list(solution) # copy + current_fitness = self.evaluate(sol) + improved = True + + while improved: + improved = False + indices = list(range(self.n)) + if self.first_improvement: + rng.shuffle(indices) + + best_delta_index = -1 + best_delta_fitness = current_fitness + + for i in indices: + # Try delta evaluation first + delta = self.delta_evaluate(sol, i) + if delta is not None: + new_fitness = current_fitness + delta + else: + # Full evaluation: flip, evaluate, undo + sol[i] = 1 - sol[i] + new_fitness = self.evaluate(sol) + sol[i] = 1 - sol[i] + + if self.is_better(new_fitness, current_fitness): + if self.first_improvement: + # Accept immediately + sol[i] = 1 - sol[i] + current_fitness = new_fitness + improved = True + break + else: + # Track best + if self.is_better(new_fitness, best_delta_fitness): + best_delta_fitness = new_fitness + best_delta_index = i + + if not self.first_improvement and best_delta_index >= 0: + sol[best_delta_index] = 1 - sol[best_delta_index] + current_fitness = best_delta_fitness + improved = True + + return sol, current_fitness + + +class NumberPartitioning(BitstringProblem): + """ + Number Partitioning Problem (NPP). + + Given a set of N positive integers, partition them into two subsets + such that the absolute difference of their sums is minimized. + + A solution is a bitstring of length N. Bit i=0 means item i goes + to subset A, bit i=1 means subset B. + + Fitness = |sum(A) - sum(B)| (minimization, optimal = 0). + + Construction: provide either explicit `weights` or both `k` and + `instance_seed` for random instance generation. + + Args: + n: Number of items. Must be > 0. + k: Hardness parameter. Items drawn uniformly from [1, 2^(n*k)]. + Higher k = harder instances (phase transition around k ~ 1.0). + Required if `weights` is not provided. Must be > 0. + instance_seed: Seed for generating item weights. + Required if `weights` is not provided. + weights: Explicit item weights. If provided, `k` and + `instance_seed` are ignored. Length must equal `n`. + n_perturbation_flips: Number of random flips per perturbation (default: 2). + first_improvement: If True, local search uses first-improvement + hill climbing (stochastic -- scan order randomized each pass). + If False, uses best-improvement (deterministic). Default: True. + """ + + @property + def minimize(self) -> bool: + return True + + def __init__( + self, + n: int, + k: float | None = None, + instance_seed: int | None = None, + weights: list[int] | None = None, + n_perturbation_flips: int = 2, + first_improvement: bool = True, + ): + super().__init__(n, n_perturbation_flips, first_improvement) + if weights is not None: + if k is not None or instance_seed is not None: + warnings.warn( + "Both `weights` and `k`/`instance_seed` were provided. " + "`weights` will be used and `k`/`instance_seed` will be ignored.", + UserWarning, + stacklevel=2, + ) + if len(weights) != n: + raise ValueError(f"weights length ({len(weights)}) must equal n ({n})") + if any(w <= 0 for w in weights): + raise ValueError("All weights must be positive") + self.weights = list(weights) + self.k = None + self.instance_seed = None + elif k is not None and instance_seed is not None: + if k <= 0: + raise ValueError(f"k must be positive, got {k}") + self.k = k + self.instance_seed = instance_seed + # Generate item weights using a separate RNG + rng = np.random.default_rng(instance_seed) + upper = round(math.pow(2, n * k)) + self.weights = rng.integers(1, upper + 1, size=n).tolist() + else: + raise ValueError("Provide either `weights` or both `k` and `instance_seed`") + + def evaluate(self, solution: list[int]) -> float: + cost_a = sum(self.weights[i] for i in range(self.n) if solution[i] == 0) + cost_b = sum(self.weights[i] for i in range(self.n) if solution[i] == 1) + return float(abs(cost_a - cost_b)) + + +class OneMax(BitstringProblem): + """ + OneMax problem: maximize the number of 1-bits. + + Fitness = sum(bits). Single global optimum at all-ones. + + Supports O(1) delta evaluation: flipping bit i changes + fitness by -1 (if 1->0) or +1 (if 0->1). + """ + + @property + def minimize(self) -> bool: + return False + + def evaluate(self, solution: list[int]) -> float: + return float(sum(solution)) + + def delta_evaluate(self, solution: list[int], index: int) -> float | None: + """O(1) delta evaluation for OneMax.""" + return -1.0 if solution[index] == 1 else 1.0 diff --git a/src/lonkit/discrete/problems/problem.py b/src/lonkit/discrete/problems/problem.py new file mode 100644 index 0000000..3b801b5 --- /dev/null +++ b/src/lonkit/discrete/problems/problem.py @@ -0,0 +1,148 @@ +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +import numpy as np + +SolutionT = TypeVar("SolutionT") + + +class DiscreteProblem(ABC, Generic[SolutionT]): + """ + Abstract base class for discrete optimization problems. + + A DiscreteProblem defines the search space, objective function, + neighborhood structure, and perturbation operator. It is **stateless**: + all randomness is injected via an `rng` parameter owned by the caller + (typically the ILS sampler). + + Subclasses must implement all abstract methods to define: + - Solution generation and identity + - Fitness evaluation (pure function) + - Local search (hill climbing with problem-specific neighborhood) + - Perturbation (escape operator for ILS) + + The ILS sampler calls these methods without knowing the solution + representation, neighborhood structure, or move operators. + + Type parameter: + SolutionT: The solution representation type (e.g., list[int] for + bitstrings, list[int] for permutations). + """ + + @property + def minimize(self) -> bool: + """Whether this is a minimization problem. Default: True.""" + return True + + def is_better(self, a: float, b: float) -> bool: + """Return True if fitness `a` is strictly better than `b`.""" + return a < b if self.minimize else a > b + + def is_better_or_equal(self, a: float, b: float) -> bool: + """Return True if fitness `a` is better than or equal to `b`.""" + return a <= b if self.minimize else a >= b + + def compare(self, a: float, b: float) -> int: + """ + Compare two fitness values. + + Returns: + 1 if a is better, -1 if b is better, 0 if equal. + """ + if self.is_better(a, b): + return 1 + if self.is_better(b, a): + return -1 + return 0 + + @abstractmethod + def random_solution(self, rng: np.random.Generator) -> SolutionT: + """ + Generate a random initial solution. + + Args: + rng: Random number generator (owned by the caller). + + Returns: + A solution object (representation is problem-specific). + The returned solution does NOT need to be a local optimum. + """ + ... + + @abstractmethod + def evaluate(self, solution: SolutionT) -> float: + """ + Evaluate and return the fitness of a solution. + + This must be a **pure function**: same solution always returns + the same fitness, no side effects. + + Args: + solution: A solution object. + + Returns: + Scalar fitness value. + """ + ... + + @abstractmethod + def local_search( + self, solution: SolutionT, rng: np.random.Generator + ) -> tuple[SolutionT, float]: + """ + Run local search from a starting solution to find a local optimum. + + This encapsulates the full hill-climbing loop: + neighborhood definition, scanning strategy (first/best improvement), + and termination. + + **Stochastic local search note**: When using first-improvement with + randomized scan order, the local optimum reached from a given + starting solution depends on the RNG state. This means basin + identity is sampler-state dependent -- the same starting solution + may reach different local optima across runs. This is an intentional + and well-understood property of stochastic local search in LON + construction. + + Args: + solution: Starting solution (not necessarily a local optimum). + rng: Random number generator (owned by the caller). + + Returns: + Tuple of (local_optimum_solution, fitness). + """ + ... + + @abstractmethod + def perturb(self, solution: SolutionT, rng: np.random.Generator) -> SolutionT: + """ + Apply a perturbation to escape the current basin of attraction. + + The perturbation should be strong enough to potentially reach a + different basin but not so strong as to be random restart. + + Args: + solution: A local optimum solution. + rng: Random number generator (owned by the caller). + + Returns: + A perturbed solution (NOT necessarily a local optimum). + """ + ... + + @abstractmethod + def solution_id(self, solution: SolutionT) -> str: + """ + Return a unique string identifier for this solution. + + Two solutions that represent the same point in the search space + MUST return the same ID. Two different solutions MUST return + different IDs. + + Args: + solution: A solution object. + + Returns: + String identifier (used as node ID in the LON). + """ + ... diff --git a/src/lonkit/discrete/sampling.py b/src/lonkit/discrete/sampling.py new file mode 100644 index 0000000..62d731c --- /dev/null +++ b/src/lonkit/discrete/sampling.py @@ -0,0 +1,275 @@ +from collections.abc import Callable +from dataclasses import dataclass, replace +from typing import Any + +import numpy as np +import pandas as pd + +from lonkit.discrete.problems.problem import DiscreteProblem +from lonkit.lon import LON, LONConfig + + +@dataclass +class ILSSamplerConfig: + """ + Configuration for Iterated Local Search sampling. + + Attributes: + n_runs: Number of independent ILS runs. Default: 100. + n_iter_no_change: Maximum consecutive non-improving iterations + before stopping each run. Use None for no limit. + Setting both n_iter_no_change and max_iter to None will + result in an error. Default: 100. + max_iter: Maximum total iterations per run. Use None for no + limit. Setting both n_iter_no_change and max_iter to None + will result in an error. Default: None. + accept_equal: If True, accept moves to equal-fitness optima + (greedy with equal acceptance). If False, only accept + strictly improving moves. Default: True. + seed: Random seed for reproducibility. Controls ALL search + randomness: initial solution generation, local search + scan order, and perturbation. The problem instance is + stateless and receives the sampler's RNG. Default: None. + """ + + n_runs: int = 100 + n_iter_no_change: int | None = 100 + max_iter: int | None = None + accept_equal: bool = True + seed: int | None = None + + def __post_init__(self) -> None: + if self.n_iter_no_change is not None and self.n_iter_no_change <= 0: + raise ValueError("n_iter_no_change must be positive or None.") + if self.max_iter is not None and self.max_iter <= 0: + raise ValueError("max_iter must be positive or None.") + if self.n_iter_no_change is None and self.max_iter is None: + raise ValueError( + "At least one stopping criterion must be set: " "n_iter_no_change and/or max_iter." + ) + + +@dataclass +class ILSResult: + """ + Result of ILS sampling. + + Attributes: + trace_df: DataFrame with columns [run, fit1, node1, fit2, node2] + representing transitions between local optima. + raw_records: List of dicts with per-iteration data. Each dict + has keys: run, iteration, current_id, current_fitness, + new_id, new_fitness, accepted. + minimize: Whether the problem is a minimization problem. Default: True. + """ + + trace_df: pd.DataFrame + raw_records: list[dict] + minimize: bool = True + + +class ILSSampler: + """ + Iterated Local Search sampler for constructing Local Optima Networks + from discrete optimization problems. + + ILS alternates between perturbation and local search to explore the + space of local optima. Transitions between local optima are recorded + in the same trace format as BasinHoppingSampler, enabling direct use + with LON.from_trace_data(). + + The sampler owns all search randomness via a single `numpy.random.Generator` + instance created from `ILSSamplerConfig.seed`. The problem instance + is stateless — the sampler passes its RNG into every problem method + call (`random_solution`, `local_search`, `perturb`). + + Example: + >>> from lonkit.discrete import NumberPartitioning, ILSSampler, ILSSamplerConfig + >>> problem = NumberPartitioning(n=20, k=0.5, instance_seed=1) + >>> config = ILSSamplerConfig(n_runs=10, n_iter_no_change=100, seed=42) + >>> sampler = ILSSampler(config) + >>> result = sampler.sample(problem) + >>> lon = sampler.sample_to_lon(result) + """ + + def __init__(self, config: ILSSamplerConfig | None = None): + self.config = config or ILSSamplerConfig() + + def sample( + self, + problem: DiscreteProblem[Any], + progress_callback: Callable[[int, int], None] | None = None, + ) -> ILSResult: + """ + Run ILS sampling and construct trace data. + + Args: + problem: A DiscreteProblem instance defining the optimization + problem, local search, and perturbation operators. + The problem must be stateless — the sampler provides + the RNG. + progress_callback: Optional callback(run, total_runs) for + progress reporting. Default: None. + + Returns: + ILSResult with trace DataFrame and raw records. + """ + rng = np.random.default_rng(self.config.seed) + raw_records = [] + + for run in range(1, self.config.n_runs + 1): + if progress_callback: + progress_callback(run, self.config.n_runs) + + run_records = self._ils_run(problem, run, rng) + raw_records.extend(run_records) + + trace_df = self._construct_trace_data(raw_records) + return ILSResult(trace_df=trace_df, raw_records=raw_records, minimize=problem.minimize) + + def _ils_run( + self, + problem: DiscreteProblem[Any], + run: int, + rng: np.random.Generator, + ) -> list[dict]: + """ + Execute a single ILS run. + + The algorithm: + 1. Generate random initial solution + 2. Run local search to find first local optimum + 3. Loop: + a. Perturb current best local optimum + b. Run local search on perturbed solution + c. Record transition (current -> new) + d. Accept if new is better or equal (configurable) + e. Stop after n_iter_no_change or max_iter + + Args: + problem: The discrete problem instance. + run: Run number (1-indexed, for trace DataFrame). + rng: Random number generator (owned by the sampler). + + Returns: + List of raw record dicts for this run. + """ + records = [] + + # Initial solution and local search + initial_sol = problem.random_solution(rng) + current_sol, current_fitness = problem.local_search(initial_sol, rng) + current_id = problem.solution_id(current_sol) + + iters_without_improvement = 0 + iter_index = 0 + + while True: + # Check stopping criteria + if self.config.max_iter is not None and iter_index >= self.config.max_iter: + break + if ( + self.config.n_iter_no_change is not None + and iters_without_improvement >= self.config.n_iter_no_change + ): + break + + # Perturb and local search + perturbed_sol = problem.perturb(current_sol, rng) + new_sol, new_fitness = problem.local_search(perturbed_sol, rng) + new_id = problem.solution_id(new_sol) + + if self.config.accept_equal: + accepted = problem.is_better_or_equal(new_fitness, current_fitness) + else: + accepted = problem.is_better(new_fitness, current_fitness) + + records.append( + { + "run": run, + "iteration": iter_index, + "current_id": current_id, + "current_fitness": current_fitness, + "new_id": new_id, + "new_fitness": new_fitness, + "accepted": accepted, + } + ) + + if problem.is_better(new_fitness, current_fitness): + iters_without_improvement = 0 + else: + iters_without_improvement += 1 + + if accepted: + current_sol = new_sol + current_fitness = new_fitness + current_id = new_id + + iter_index += 1 + + return records + + def _construct_trace_data(self, raw_records: list[dict]) -> pd.DataFrame: + """ + Construct trace data from accepted transitions in raw records. + + Args: + raw_records: List of raw sampling records from ILS. + + Returns: + DataFrame with columns `[run, fit1, node1, fit2, node2]` representing + accepted transitions only. + """ + trace_records = [] + + for rec in raw_records: + if not rec["accepted"]: + continue + + trace_records.append( + { + "run": rec["run"], + "fit1": rec["current_fitness"], + "node1": rec["current_id"], + "fit2": rec["new_fitness"], + "node2": rec["new_id"], + } + ) + + return pd.DataFrame( + trace_records, + columns=["run", "fit1", "node1", "fit2", "node2"], + ) + + def sample_to_lon( + self, + sampler_result: ILSResult, + lon_config: LONConfig | None = None, + ) -> LON: + """ + Construct a LON from an `ILSResult`. + + Convenience wrapper that passes the trace data to + LON.from_trace_data(). Equivalent to calling + LON.from_trace_data(sampler_result.trace_df, config=lon_config). + + Args: + sampler_result: Result returned by `sample()`. + lon_config: LON construction configuration. If `None`, uses + default `LONConfig`. Default: `None`. + + Returns: + `LON` instance constructed from the sampling trace. + """ + trace_df = sampler_result.trace_df + + if trace_df.empty: + return LON(minimize=sampler_result.minimize) + + config = ( + replace(lon_config, minimize=sampler_result.minimize) + if lon_config is not None + else LONConfig(minimize=sampler_result.minimize) + ) + return LON.from_trace_data(trace_df, config=config) diff --git a/src/lonkit/lon.py b/src/lonkit/lon.py index 3925641..1e21502 100644 --- a/src/lonkit/lon.py +++ b/src/lonkit/lon.py @@ -33,6 +33,7 @@ class LONConfig: warn_on_duplicates: bool = True max_fitness_deviation: float | None = None eq_atol: float = DEFAULT_ATOL + minimize: bool = True @dataclass @@ -45,15 +46,17 @@ class LON: Attributes: graph: The underlying `igraph` Graph object. Default: empty directed `ig.Graph`. - best_fitness: The best (minimum) fitness value found. Default: `None`. + best_fitness: The best fitness value found. Default: `None`. final_run_values: `Series` mapping run number to final fitness value. Default: `None`. eq_atol: Tolerance for considering fitness values as equal. Default: `1e-12`. + minimize: Whether this is a minimization problem. Default: `True`. """ graph: ig.Graph = field(default_factory=lambda: ig.Graph(directed=True)) best_fitness: float | None = None final_run_values: pd.Series | None = None eq_atol: float = DEFAULT_ATOL + minimize: bool = True @classmethod def from_trace_data( @@ -152,13 +155,14 @@ def from_trace_data( # Remove self-loops graph = graph.simplify(multiple=False, loops=True) - best = nodes["Fitness"].min() + best = nodes["Fitness"].min() if config.minimize else nodes["Fitness"].max() return cls( graph=graph, best_fitness=best, final_run_values=final_run_values, eq_atol=config.eq_atol, + minimize=config.minimize, ) @property @@ -348,15 +352,17 @@ class CMLON: Attributes: graph: The underlying igraph Graph object. - best_fitness: The best (minimum) fitness value. + best_fitness: The best fitness value. source_lon: Reference to the original LON (optional). eq_atol: Tolerance for considering fitness values as equal. Default: `1e-12`. + minimize: Whether this is a minimization problem. Default: `True`. """ graph: ig.Graph = field(default_factory=lambda: ig.Graph(directed=True)) best_fitness: float | None = None source_lon: LON | None = None eq_atol: float = DEFAULT_ATOL + minimize: bool = True @classmethod def from_lon(cls, lon: LON) -> "CMLON": @@ -383,6 +389,7 @@ def from_lon(cls, lon: LON) -> "CMLON": best_fitness=lon.best_fitness, source_lon=lon, eq_atol=lon.eq_atol, + minimize=lon.minimize, ) # Create a working copy @@ -399,11 +406,11 @@ def from_lon(cls, lon: LON) -> "CMLON": edge_types = [] equal_edge_indices = [] for i, (fit1, fit2) in enumerate(zip(f1, f2)): - if fit2 < fit1: - edge_types.append("improving") - elif lon._allclose(fit2, fit1): + if lon._allclose(fit2, fit1): edge_types.append("equal") equal_edge_indices.append(i) + elif (fit2 < fit1) if lon.minimize else (fit2 > fit1): + edge_types.append("improving") else: edge_types.append("worsening") mlon.es["type"] = edge_types @@ -429,6 +436,7 @@ def from_lon(cls, lon: LON) -> "CMLON": best_fitness=lon.best_fitness, source_lon=lon, eq_atol=lon.eq_atol, + minimize=lon.minimize, ) def _allclose(self, f1: float | None, f2: float | None) -> bool: @@ -474,7 +482,9 @@ def get_local_sinks(self) -> list[int]: fits = self.vertex_fitness if self.best_fitness is None: return [] - return [s for s in sinks if fits[s] > self.best_fitness] + if self.minimize: + return [s for s in sinks if fits[s] > self.best_fitness] + return [s for s in sinks if fits[s] < self.best_fitness] def compute_network_metrics(self, known_best: float | None = None) -> dict[str, Any]: """ diff --git a/tests/continuous/__init__.py b/tests/continuous/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_nfev.py b/tests/continuous/test_nfev.py similarity index 100% rename from tests/test_nfev.py rename to tests/continuous/test_nfev.py diff --git a/tests/test_sampling.py b/tests/continuous/test_sampling.py similarity index 100% rename from tests/test_sampling.py rename to tests/continuous/test_sampling.py diff --git a/tests/discrete/__init__.py b/tests/discrete/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/discrete/fixtures/__init__.py b/tests/discrete/fixtures/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/discrete/test_exhaustive.py b/tests/discrete/test_exhaustive.py new file mode 100644 index 0000000..141db67 --- /dev/null +++ b/tests/discrete/test_exhaustive.py @@ -0,0 +1,112 @@ +import numpy as np + +from lonkit import ILSSampler, ILSSamplerConfig, NumberPartitioning, OneMax + + +class TestOneMax4Exhaustive: + """Exhaustive verification for OneMax(n=4) with 16 solutions.""" + + def test_onemax_4_single_local_optimum(self): + """OneMax(4) has exactly one local optimum: all-ones. + + From any solution, first-improvement HC with 1-bit-flip + neighborhood always reaches [1,1,1,1] because every + non-optimal solution has at least one 0-bit that can be + flipped to improve fitness. + """ + problem = OneMax(n=4) + rng = np.random.default_rng(42) + + # Enumerate all 16 solutions + all_solutions = [list(map(int, format(i, "04b"))) for i in range(16)] + + local_optima = set() + for sol in all_solutions: + opt, fit = problem.local_search(list(sol), rng) + local_optima.add((problem.solution_id(opt), fit)) + + assert len(local_optima) == 1 + assert local_optima == {("1111", 4.0)} + + def test_onemax_4_sampled_lon_single_node(self): + """Sampled LON for OneMax(4) should have exactly 1 node. + + Since there's only one local optimum, all ILS transitions + are self-loops (1111 -> 1111) which get removed during LON + construction. The resulting LON has 1 node and 0 edges. + """ + problem = OneMax(n=4) + config = ILSSamplerConfig(n_runs=20, n_iter_no_change=10, seed=42) + sampler = ILSSampler(config) + result = sampler.sample(problem) + lon = sampler.sample_to_lon(result) + + assert lon.n_vertices == 1 + assert lon.graph.ecount() == 0 + + +class TestNPP6Exhaustive: + """Exhaustive verification for NumberPartitioning(n=6) with 64 solutions.""" + + def test_npp_6_exhaustive_local_optima(self): + """Verify local optima count for NPP(n=6, k=0.5, instance_seed=1). + + Enumerate all 64 solutions, run local search from each, and verify + that every discovered optimum is truly a local optimum (no improving + single-bit-flip neighbor). + """ + problem = NumberPartitioning(n=6, k=0.5, instance_seed=1) + rng = np.random.default_rng(42) + + all_solutions = [list(map(int, format(i, "06b"))) for i in range(64)] + + local_optima = set() + for sol in all_solutions: + opt, _ = problem.local_search(list(sol), rng) + local_optima.add(problem.solution_id(opt)) + + assert len(local_optima) > 1, "NPP should have multiple local optima" + + # Verify each is truly a local optimum + for sol_id in local_optima: + sol = [int(b) for b in sol_id] + fitness = problem.evaluate(sol) + for i in range(problem.n): + neighbor = list(sol) + neighbor[i] = 1 - neighbor[i] + neighbor_fitness = problem.evaluate(neighbor) + assert not problem.is_better(neighbor_fitness, fitness), ( + f"Solution {sol_id} is not a local optimum: " + f"flip at {i} gives fitness {neighbor_fitness} vs {fitness}" + ) + + def test_npp_6_sampled_lon_converges_to_exhaustive(self): + """With enough runs, sampled LON should discover all local optima.""" + problem = NumberPartitioning(n=6, k=0.5, instance_seed=1) + rng = np.random.default_rng(42) + + # Exhaustive: find all local optima + all_solutions = [list(map(int, format(i, "06b"))) for i in range(64)] + exhaustive_optima = set() + for sol in all_solutions: + opt, _ = problem.local_search(list(sol), rng) + exhaustive_optima.add(problem.solution_id(opt)) + + # Sampled: run many ILS runs + config = ILSSamplerConfig(n_runs=200, n_iter_no_change=50, seed=42) + sampler = ILSSampler(config) + sample_problem = NumberPartitioning(n=6, k=0.5, instance_seed=1) + result = sampler.sample(sample_problem) + lon = sampler.sample_to_lon(result) + + sampled_optima = set(lon.graph.vs["name"]) + + # Sampled should be a subset of exhaustive (can't discover non-optima) + assert sampled_optima.issubset(exhaustive_optima) + + # With enough runs, we should discover most (if not all) optima + coverage = len(sampled_optima) / len(exhaustive_optima) + assert coverage == 1.0, ( + f"Only discovered {len(sampled_optima)}/{len(exhaustive_optima)} " + f"optima ({coverage:.0%} coverage)" + ) diff --git a/tests/discrete/test_problems.py b/tests/discrete/test_problems.py new file mode 100644 index 0000000..d528ca1 --- /dev/null +++ b/tests/discrete/test_problems.py @@ -0,0 +1,73 @@ +import numpy as np + +from lonkit.discrete.problems.bitstring import ( + NumberPartitioning, + OneMax, +) + + +class TestOneMax: + def test_evaluate_all_ones(self): + p = OneMax(n=4) + assert p.evaluate([1, 1, 1, 1]) == 4.0 + + def test_evaluate_all_zeros(self): + p = OneMax(n=4) + assert p.evaluate([0, 0, 0, 0]) == 0.0 + + def test_minimize_is_false(self): + p = OneMax(n=4) + assert p.minimize is False + + def test_is_better_higher_is_better(self): + p = OneMax(n=4) + assert p.is_better(4, 3) is True + assert p.is_better(3, 4) is False + assert p.is_better(3, 3) is False + + def test_solution_id(self): + p = OneMax(n=4) + assert p.solution_id([0, 1, 0, 1]) == "0101" + + def test_reaches_global_optimum_from_zeros(self): + p = OneMax(n=4) + rng = np.random.default_rng(42) + sol, fit = p.local_search([0, 0, 0, 0], rng) + assert sol == [1, 1, 1, 1] + assert fit == 4.0 + + def test_perturb_does_not_modify_original(self): + p = OneMax(n=4, n_perturbation_flips=2) + original = [1, 1, 1, 1] + copy = list(original) + rng = np.random.default_rng(42) + p.perturb(original, rng) + assert original == copy + + def test_compare(self): + p = OneMax(n=4) + assert p.compare(4, 3) == 1 + assert p.compare(3, 4) == -1 + assert p.compare(3, 3) == 0 + + +class TestNumberPartitioningBasic: + def test_minimize_is_true(self): + p = NumberPartitioning(n=4, weights=[3, 1, 1, 3]) + assert p.minimize is True + + def test_perfect_partition(self): + p = NumberPartitioning(n=4, weights=[3, 1, 1, 3]) + # [0, 1, 0, 1] → A={3,1}=4, B={1,3}=4 → |4-4|=0 (perfect!) + assert p.evaluate([0, 1, 0, 1]) == 0.0 + + def test_is_better_lower_is_better(self): + p = NumberPartitioning(n=4, weights=[3, 1, 1, 3]) + assert p.is_better(0, 5) is True + assert p.is_better(5, 0) is False + + def test_compare_minimization(self): + p = NumberPartitioning(n=4, weights=[1, 1, 1, 1]) + assert p.compare(0, 5) == 1 + assert p.compare(5, 0) == -1 + assert p.compare(3, 3) == 0 diff --git a/tests/discrete/test_properties.py b/tests/discrete/test_properties.py new file mode 100644 index 0000000..71c375a --- /dev/null +++ b/tests/discrete/test_properties.py @@ -0,0 +1,128 @@ +import pytest + +from lonkit import ILSSampler, ILSSamplerConfig, NumberPartitioning + + +class TestLONProperties: + """Properties that must hold for any LON built from ILS trace data.""" + + @pytest.fixture(scope="class") + def npp_result(self): + """Run ILS sampling and return (problem, result, lon) tuple.""" + problem = NumberPartitioning(n=15, k=0.5, instance_seed=1) + config = ILSSamplerConfig(n_runs=20, n_iter_no_change=50, seed=42) + sampler = ILSSampler(config) + result = sampler.sample(problem) + lon = sampler.sample_to_lon(result) + return problem, result, lon + + @pytest.fixture(scope="class") + def npp_problem(self, npp_result): + return npp_result[0] + + @pytest.fixture(scope="class") + def npp_lon(self, npp_result): + return npp_result[2] + + def test_all_nodes_are_local_optima(self, npp_lon, npp_problem): + """ + Every node in the LON should be a local optimum. + + Verify by checking that no single-bit-flip neighbor improves fitness. + """ + for name in npp_lon.graph.vs["name"]: + sol = [int(b) for b in name] + fitness = npp_problem.evaluate(sol) + for i in range(npp_problem.n): + neighbor = list(sol) + neighbor[i] = 1 - neighbor[i] + neighbor_fitness = npp_problem.evaluate(neighbor) + assert not npp_problem.is_better( + neighbor_fitness, fitness + ), f"Node {name} is not a local optimum: flip at {i} gives {neighbor_fitness} vs {fitness}" + + def test_best_fitness_is_minimum_vertex_fitness(self, npp_lon): + """best_fitness equals the minimum vertex fitness.""" + assert npp_lon.best_fitness == min(npp_lon.vertex_fitness) + + def test_no_self_loops(self, npp_lon): + """LON should have no self-loops (removed during construction).""" + assert not any(npp_lon.graph.is_loop()) + + def test_trace_only_contains_accepted_transitions(self, npp_result): + """trace_df should only contain accepted transitions.""" + _, result, _ = npp_result + accepted_in_raw = [r for r in result.raw_records if r["accepted"]] + assert len(result.trace_df) == len(accepted_in_raw) + + def test_all_raw_records_have_required_fields(self, npp_result): + """Each raw record must have all required fields.""" + _, result, _ = npp_result + required_fields = { + "run", + "iteration", + "current_id", + "current_fitness", + "new_id", + "new_fitness", + "accepted", + } + for rec in result.raw_records: + assert required_fields.issubset( + rec.keys() + ), f"Missing fields: {required_fields - rec.keys()}" + + def test_n_funnels_equals_n_sinks(self, npp_lon): + """n_funnels metric should equal the number of sinks.""" + metrics = npp_lon.compute_network_metrics() + assert metrics["n_funnels"] == len(npp_lon.get_sinks()) + + def test_n_global_funnels_leq_n_funnels(self, npp_lon): + """Number of global funnels <= total number of funnels.""" + metrics = npp_lon.compute_network_metrics() + assert metrics["n_global_funnels"] <= metrics["n_funnels"] + + +class TestCMLONProperties: + """Properties that must hold for any CMLON.""" + + @pytest.fixture(scope="class") + def npp_lon_and_cmlon(self): + """Build LON and CMLON from NPP sampling.""" + problem = NumberPartitioning(n=15, k=0.5, instance_seed=1) + config = ILSSamplerConfig(n_runs=20, n_iter_no_change=50, seed=42) + sampler = ILSSampler(config) + result = sampler.sample(problem) + lon = sampler.sample_to_lon(result) + cmlon = lon.to_cmlon() + return lon, cmlon + + @pytest.fixture(scope="class") + def npp_lon(self, npp_lon_and_cmlon): + return npp_lon_and_cmlon[0] + + @pytest.fixture(scope="class") + def npp_cmlon(self, npp_lon_and_cmlon): + return npp_lon_and_cmlon[1] + + def test_cmlon_has_no_equal_fitness_edges(self, npp_cmlon): + """CMLON should have no edges between equal-fitness nodes.""" + el = npp_cmlon.graph.get_edgelist() + fits = npp_cmlon.vertex_fitness + for src, tgt in el: + assert fits[src] != fits[tgt], ( + f"Equal-fitness edge found: " + f"v{src} (fit={fits[src]}) -> v{tgt} (fit={fits[tgt]})" + ) + + def test_cmlon_fewer_or_equal_vertices(self, npp_lon, npp_cmlon): + """CMLON has at most as many vertices as the source LON.""" + assert npp_cmlon.n_vertices <= npp_lon.n_vertices + + def test_cmlon_preserves_best_fitness(self, npp_lon, npp_cmlon): + """CMLON best_fitness matches source LON best_fitness.""" + assert npp_cmlon.best_fitness == npp_lon.best_fitness + + def test_cmlon_no_self_loops(self, npp_cmlon): + """CMLON should have no self-loops after contraction.""" + assert not any(npp_cmlon.graph.is_loop()) diff --git a/uv.lock b/uv.lock index c8f3276..259cda4 100644 --- a/uv.lock +++ b/uv.lock @@ -876,7 +876,7 @@ wheels = [ [[package]] name = "lonkit" -version = "0.1.0" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "igraph" },