diff --git a/CHANGELOG.md b/CHANGELOG.md index 30491e7..cdd1862 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,20 +5,29 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## [Unreleased] -- YYYY-MM-DD +## [Unreleased] -- 2025-08-22 -(00-changelog)= +(0.0.0a)= ### Added - -(01-changelog)= +- RotationLifter: support to plot 2d frames, improved documentation for rotation conventions +- RotationLifter: implemented rank-d ("bm") lifting +- RangeOnlyLifter: new landmark sampling methods to fill available space +- Documentation instructions in CONTRIBUTING.md +- Unit tests and notebook for RotationLifter +- StateLifter: started support for rank-d instead of rank-1 formulations + +(0.0.0c)= ### Changed +- RangeOnlyLifter: default sampling for landmarks in RO is now the one filling space +- RangeOnlyLifter: theta is sampled at least MIN_DIST away from landmarks -(02-changelog)= +(0.0.0a)= ### Fixed +- Link fixes in documentation ## [0.0.2] - 2025-06-04 -(6-changelog)= +(0.0.2a)= ### Added - RangeOnlyLifter base class for 2D/3D range-only localization (base_lifters/range_only_lifter.py) @@ -26,12 +35,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - RangeOnlySqLifter: specialized lifter for squared-distance-based range-only localization - GitHub Pages deployment step to documentation.yml using peaceiris/actions-gh-pages for automatic documentation publishing -(5-changelog)= +(0.0.2r)= ### Removed - docs/build/ files -(4-changelog)= +(0.0.2f)= ### Fixed - Corrected return type annotations in get_grad and get_hess methods in state_lifter.py from float to np.ndarray diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1ab723a..4666351 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -74,3 +74,11 @@ curl --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/nektos/ac ``` sudo ./bin/act ``` + +### Running documentation locally + +To generate a live-reloading preview of Sphinx docs while editing, you can run, from this root folder +of the repository: +``` +make doc +``` diff --git a/Makefile b/Makefile index 3a6a301..7892a45 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ doc: - sphinx-autobuild docs/source docs/build + sphinx-autobuild docs/source docs/build --watch popcor/ doctest: sphinx-build -b doctest docs/source docs/build/doctest diff --git a/scripts/rotation_averaging.ipynb b/notebooks/rotation_averaging.ipynb similarity index 63% rename from scripts/rotation_averaging.ipynb rename to notebooks/rotation_averaging.ipynb index dcd99d1..88a3fd7 100644 --- a/scripts/rotation_averaging.ipynb +++ b/notebooks/rotation_averaging.ipynb @@ -44,7 +44,7 @@ "from popcor.examples import RotationLifter\n", "\n", "np.random.seed(2)\n", - "lifter = RotationLifter(d=3, n_meas=3)\n", + "lifter = RotationLifter(d=3, n_abs=1, n_rot=4)\n", "\n", "y = lifter.simulate_y(noise=0.2)\n", "\n", @@ -81,11 +81,12 @@ "A_known = lifter.get_A_known()\n", "constraints = lifter.get_A_b_list(lifter.get_A_known())\n", "\n", - "fig, axs = plt.subplots(1, len(A_known) + 1)\n", - "fig.set_size_inches(3*(len(A_known) + 1), 3)\n", - "for i in range(len(A_known)):\n", - " plot_matrix(A_known[i].toarray(), ax=axs[i], title=f\"A{i} \", colorbar=False)\n", - "fig = plot_matrix(Q.toarray(), ax=axs[i+1], title=\"Q\", colorbar=False)" + "A_plot = A_known[:5]\n", + "fig, axs = plt.subplots(1, len(A_plot) + 1)\n", + "fig.set_size_inches(3 * (len(A_plot) + 1), 3)\n", + "for i in range(len(A_plot)):\n", + " plot_matrix(A_plot[i].toarray(), ax=axs[i], title=f\"A{i} \", colorbar=False)\n", + "fig = plot_matrix(Q.toarray(), ax=axs[i + 1], title=\"Q\", colorbar=False)" ] }, { @@ -103,7 +104,7 @@ "x, info_rank = rank_project(X, p=1)\n", "print(f\"EVR: {info_rank['EVR']:.2e}\")\n", "\n", - "theta_opt = lifter.get_theta(x.flatten()[1:])\n", + "theta_opt = lifter.get_theta(x.flatten())\n", "\n", "estimates = {\"init gt\": theta_gt, \"SDP\": theta_opt}\n", "fig, ax = lifter.plot(estimates=estimates)" @@ -116,8 +117,53 @@ "source": [ "## Conclusion\n", "\n", - "This problem is too easy! No redundant measurements are required for tightness. " + "This problem quite easy! No redundant measurements are required for tightness, even up to very high noise levels." ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "563a21f1-3705-4dc7-99cd-01b916d56184", + "metadata": {}, + "outputs": [], + "source": [ + "data = []\n", + "for noise in np.logspace(-3, 2, 6):\n", + " for i in range(10):\n", + " np.random.seed(i)\n", + " lifter = RotationLifter(d=3, n_abs=1, n_rot=4)\n", + " y = lifter.simulate_y(noise=noise)\n", + " Q = lifter.get_Q_from_y(y=y)\n", + " A_known = lifter.get_A_known()\n", + " constraints = lifter.get_A_b_list(lifter.get_A_known())\n", + " X, info = solve_sdp(Q, constraints, verbose=False)\n", + " x, info_rank = rank_project(X, p=1)\n", + " data.append({\"seed\": i, \"noise\": noise, \"EVR\": info_rank[\"EVR\"]})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df6b3146", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import seaborn as sns\n", + "df = pd.DataFrame(data)\n", + "fig, ax =plt.subplots()\n", + "sns.scatterplot(df, x=\"noise\", y=\"EVR\", ax=ax)\n", + "ax.set_xscale(\"log\")\n", + "ax.set_yscale(\"log\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fbf60376", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/popcor/base_lifters/_base_class.py b/popcor/base_lifters/_base_class.py index a39d579..6342d0e 100644 --- a/popcor/base_lifters/_base_class.py +++ b/popcor/base_lifters/_base_class.py @@ -685,7 +685,25 @@ def get_A0(self, var_subset=None): return A0.get_matrix(var_dict) def get_A_b_list(self, A_list, var_subset=None): - return [(self.get_A0(var_subset), 1.0)] + [(A, 0.0) for A in A_list] + """get equality constraint tuples (Ai, bi) s.t. x.t @ Ai @ bi, 0 + + :param A_list: Normally, this is just the list of equality constaints that equal zero. We will add the homogenization. + For certain cases, such as the RotationLifter with level="bm", this is already a tuple of (Ai, bi). + + """ + if var_subset is None: + var_subset = self.var_dict + + if isinstance(A_list, list): + # TODO(FD): do more with var_subset + assert self.HOM in var_subset + return [(self.get_A0(var_subset), 1.0)] + [(A, 0.0) for A in A_list] + else: + assert isinstance(A_list, tuple) + A0_list, b0_list = self.get_A0(var_subset) + return [ + (Ai, bi) for Ai, bi in zip(A0_list + A_list[0], b0_list + A_list[1]) + ] def sample_parameters_landmarks(self, landmarks: np.ndarray): """Used by RobustPoseLifter, RangeOnlyLocLifter: the default way of adding landmarks to parameters.""" diff --git a/popcor/base_lifters/state_lifter.py b/popcor/base_lifters/state_lifter.py index df42d6a..0000f34 100644 --- a/popcor/base_lifters/state_lifter.py +++ b/popcor/base_lifters/state_lifter.py @@ -153,17 +153,24 @@ def get_cost(self, theta, y: np.ndarray | None = None) -> float: print( "Warning: using default get_cost, which may be less efficient than a custom one." ) - x = self.get_x(theta=theta).flatten("C") if y is not None: Q = self.get_Q_from_y(y) else: Q = self.get_Q() - return float(x.T @ Q @ x) + x = self.get_x(theta=theta) + if np.ndim(x) == 1: + return float(x.T @ Q @ x) + elif np.ndim(x) == 2: + return float(np.trace(x.T @ Q @ x)) + else: + raise ValueError( + f"Unexpected shape of x: {x.shape}. Must be 1D or 2D array." + ) def local_solver( self, t0, - y: np.ndarray | list | None = None, + y: np.ndarray | list | dict | None = None, *args, **kwargs, ): @@ -171,9 +178,6 @@ def local_solver( Default local solver that uses IPOPT to solve the QCQP problem defined by Q and the constraints matrices. Consider overwriting this for more efficient solvers. """ - print( - "Warning: using default local_solver, which may be less efficient than a custom one." - ) if len(args): print(f"Warning: ignore args {args}") from cert_tools.sdp_solvers import solve_low_rank_sdp @@ -196,13 +200,16 @@ def local_solver( Constraints = self.get_A_b_list(A_list=self.get_A_known()) x0 = self.get_x(theta=t0) + if np.ndim(x0) == 1: + x0 = x0.reshape((-1, 1)) + X, info = solve_low_rank_sdp( - Q, Constraints=Constraints, rank=1, x_cand=x0, **kwargs + Q, Constraints=Constraints, rank=x0.shape[1], x_cand=x0, **kwargs ) # TODO(FD) identify when the solve is not successful. info["success"] = True try: - theta = self.get_theta(X[1:, 0]) + theta = self.get_theta(X[:, : x0.shape[1]]) except AttributeError: theta = X[1 : 1 + self.d, 0] return theta, info, info["cost"] @@ -239,7 +246,10 @@ def get_theta(self, x): "Warning: got homogenized vector x. The convention is that get_theta should get x[1:]." "Please make sure that you use get_theta as intended." ) - return x.flatten()[: self.d] + if self.HOM in self.var_dict: + return x.flatten()[1 : 1 + self.d] + else: + return x.flatten()[: self.d] def get_valid_samples(self, n_samples): samples = [self.sample_theta().flatten() for _ in range(n_samples)] diff --git a/popcor/examples/example_lifter.py b/popcor/examples/example_lifter.py index 2b5c59c..5daaea9 100644 --- a/popcor/examples/example_lifter.py +++ b/popcor/examples/example_lifter.py @@ -58,7 +58,7 @@ def get_x( return np.array(x_data) def sample_parameters(self, theta: np.ndarray | None = None) -> dict: - pass + return {} def sample_theta(self) -> np.ndarray: - pass + return np.array([]) diff --git a/popcor/examples/rotation_lifter.py b/popcor/examples/rotation_lifter.py index 9df3d14..ff7f5d2 100644 --- a/popcor/examples/rotation_lifter.py +++ b/popcor/examples/rotation_lifter.py @@ -1,6 +1,21 @@ +""" +A class for solving rotation averaging problems. + +Some notes and conventions: +- The rotation matrices are called R_i. +- We consider two different forms of "lifting": + - "no" corresponds to the rank-1 version, where we define x = [1, c_1, ..., c_N] with each c_i = R_i.flatten("C"), x in (d**2 x N + 1) + - "bm" corresponds to the rank-d version, where we define X.T = [R_0.T; R_1.T; ...; R_N.T], X in (d x (N + 1)*d), and R_0 is the world frame. +- According to above conventions, HOM is either 1 or R_0, the world frame. +- Relative measurements are defined such that R_j = R_i @ R_ij + +""" + +from typing import List + import numpy as np from poly_matrix.poly_matrix import PolyMatrix -from scipy.spatial.transform import Rotation as R +from scipy.spatial.transform import Rotation from popcor.base_lifters import StateLifter @@ -9,13 +24,89 @@ min_gradient_norm=1e-7, max_iterations=10000, min_step_size=1e-8, verbosity=1 ) +DEBUG = True + + +def get_orthogonal_constraints(key, hom, d, level): + A_list = [] + b_list = [] + for i in range(d): + # enforce diagonal == 1 for R'R = I + Ei = np.zeros((d, d)) + Ei[i, i] = 1.0 + Ai = PolyMatrix(symmetric=True) + if level == "no": + constraint = np.kron(Ei, np.eye(d)) + Ai[hom, hom] = -1 + Ai[key, key] = constraint + A_list.append(Ai) + b_list.append(0.0) + else: + Ai[key, key] = Ei + A_list.append(Ai) + b_list.append(1.0) + + # enforce off-diagonal == 0 for R'R = I + for i in range(d): + for j in range(i + 1, d): + Ei = np.zeros((d, d)) + Ei[i, j] = 1.0 + Ei[j, i] = 1.0 + Ai = PolyMatrix(symmetric=True) + if level == "no": + constraint = np.kron(Ei, np.eye(d)) + Ai[key, key] = constraint + A_list.append(Ai) + b_list.append(0.0) + else: + Ai[key, key] = Ei + A_list.append(Ai) + b_list.append(0.0) + return A_list, b_list + class RotationLifter(StateLifter): - """Rotation averaging problem.""" + """Rotation averaging problem. + + We solve the following optimization problem: + + .. math:: + f(\\theta) = \\min_{R_0, R_1, \\ldots, R_N \\in \\mathrm{SO}^d} + \\sum_{i,j \\in \\mathcal{E}} || R_i - R_j \\tilde{R}_{ij} ||_F^2 + + \\sum_{i=\\in\\mathcal{A}} || R_i - \\tilde{R}_i ||_F^2 + + where :math:`\\tilde{R}_{ij}` are the relative measurements, :math:`\\tilde{R}_{i}` are the absolute measurements, + and the unknowns are - LEVELS = ["no"] + .. math:: + \\theta = \\begin{bmatrix} R_1 & R_2 & \\ldots & R_N \\end{bmatrix} + + + We can alternatively replace the absolute-measurement terms by + + .. math:: + || R_i - R_w \\tilde{R}_{i} ||_F^2 + + where :math:`R_w` is an arbitrary world frame that we can also optimize over, transforming the solutions + by :math:`R_w^{-1}R_i` after to move the world frame to the origin. Using this formulation, all + measurements are binary factors, which may simplify implementation. + + We consider two different formulations of the problem: + + - level "no" corresponds to the rank-1 version: + + .. math:: + x = \\begin{bmatrix} 1, \\mathrm{vec}(R_1), \\ldots, \\mathrm{vec}(R_N) \\end{bmatrix}^T + + - level "bm" corresponds to the rank-d version (bm=Burer-Monteiro). + + .. math:: + X = \\begin{bmatrix} R_1^\\top \\\\ \\vdots \\\\ R_N^\\top \\end{bmatrix} + """ + + LEVELS = ["no", "bm"] HOM = "h" - VARIABLE_LIST = [["h", "c"]] + VARIABLE_LIST = [["h", "c_0"], ["h", "c_0", "c_1"]] # whether or not to include the determinant constraints in the known constraints. ADD_DETERMINANT = False @@ -23,9 +114,25 @@ class RotationLifter(StateLifter): NOISE = 1e-3 # Add any parameters here that describe the problem (e.g. number of landmarks etc.) - def __init__(self, level="no", param_level="no", d=2, n_meas=2): - self.n_meas = n_meas + def __init__( + self, + level="no", + param_level="no", + d=2, + n_abs=2, + n_rot=1, + n_rel=1, + sparsity="chain", + ): + assert n_rel in [ + 0, + 1, + ], "do not support more than 1 relative measurement per pair currently." + self.n_rot = n_rot + self.n_abs = n_abs + self.n_rel = n_rel self.level = level + self.sparsity = sparsity super().__init__( level=level, param_level=param_level, @@ -34,16 +141,27 @@ def __init__(self, level="no", param_level="no", d=2, n_meas=2): @property def var_dict(self): - return {self.HOM: 1, "c": self.d**2} + if self.level == "no": + # self.HOM is the scalar homogenization variable + var_dict = {self.HOM: 1} + var_dict.update({f"c_{i}": self.d**2 for i in range(self.n_rot)}) + else: + # self.HOM is the world frame, which is d x d + var_dict = {self.HOM: self.d} + var_dict.update({f"c_{i}": self.d for i in range(self.n_rot)}) + return var_dict def sample_theta(self): """Generate a random new feasible point.""" - - if self.d == 2: - angle = np.random.uniform(0, 2 * np.pi) - C = R.from_euler("z", angle).as_matrix()[:2, :2] - elif self.d == 3: - C = R.random().as_matrix() + C = np.empty((self.d, self.n_rot * self.d)) + for i in range(self.n_rot): + if self.d == 2: + angle = np.random.uniform(0, 2 * np.pi) + C[:, i * self.d : (i + 1) * self.d] = Rotation.from_euler( + "z", angle + ).as_matrix()[:2, :2] + elif self.d == 3: + C[:, i * self.d : (i + 1) * self.d] = Rotation.random().as_matrix() return C def get_x(self, theta=None, parameters=None, var_subset=None) -> np.ndarray: @@ -56,42 +174,122 @@ def get_x(self, theta=None, parameters=None, var_subset=None) -> np.ndarray: var_subset = self.var_dict.keys() x_data = [] - for key in var_subset: - if key == self.HOM: - x_data.append(1.0) - elif key == "c": - x_data += list(theta.flatten("C")) - dim_x = self.get_dim_x(var_subset=var_subset) - assert len(x_data) == dim_x - return np.array(x_data) + if self.level == "no": + for key in var_subset: + if key == self.HOM: + x_data.append(1.0) + elif "c" in key: + i = int(key.split("_")[1]) + x_data += list(theta[:, i * self.d : (i + 1) * self.d].flatten("F")) + else: + raise ValueError(f"untreated key {key}") + dim_x = self.get_dim_x(var_subset=var_subset) + assert len(x_data) == dim_x + return np.hstack(x_data) + elif self.level == "bm": + for key in var_subset: + if key == self.HOM: + x_data.append(np.eye(self.d)) + elif "c" in key: + i = int(key.split("_")[1]) + x_data.append(theta[:, i * self.d : (i + 1) * self.d].T) + else: + raise ValueError(f"untreated key {key}") + return np.vstack(x_data) + else: + raise ValueError(f"Unknown level {self.level} for RotationLifter") def get_theta(self, x: np.ndarray) -> np.ndarray: - assert np.ndim(x) == 1 - C_flat = x[: self.d**2] - return C_flat.reshape((self.d, self.d)) + if self.level == "no": + if np.ndim(x) == 2: + assert x.shape[1] == 1 + + C_flat = x[1 : 1 + self.n_rot * self.d**2] + return C_flat.reshape((self.d, self.n_rot * self.d), order="F") + elif self.level == "bm": + # Remember that x is composed of R_0.T, R_1.T, ..., R_N.T + # We want to return theta in the form [R*_1, R*_2, ..., R*_N] + # where each R*_i := R_0.T @ R_i + + # d x d + R0 = x[: self.d, : self.d].T + + # nd x d: [R_1.T; R_2.T; ...; R_N.T] + Ri = np.array(x[self.d : (self.n_rot + 1) * self.d, : self.d]) + + # return d x nd: [R*_1, R*_2, ..., R*_N] + Ri_world = R0.T @ Ri.T + return Ri_world + else: + raise ValueError(f"Unknown level {self.level} for RotationLifter") + + def add_relative_measurement(self, i, j, noise) -> np.ndarray: + """ + || R_j - R_i @ R_ij ||_F^2 + + measurements: R_ij = R_i.T @ R_j + """ + R_i = self.theta[:, i * self.d : (i + 1) * self.d] + R_j = self.theta[:, j * self.d : (j + 1) * self.d] + R_gt = R_i.T @ R_j + + # Generate a random small rotation as noise and apply it + if noise > 0: + noise_rotvec = np.random.normal(scale=noise, size=(self.d,)) + Rnoise = ( + Rotation.from_rotvec(noise_rotvec).as_matrix() + if self.d == 3 + else Rotation.from_euler("z", noise_rotvec[0]).as_matrix()[:2, :2] + ) + return R_gt @ Rnoise + else: + return R_gt - def simulate_y(self, noise: float | None = None) -> list: - if noise is None: - noise = self.NOISE + def add_absolute_measurement(self, i, noise, n_meas=1) -> List[np.ndarray]: + """ + || R_i - R_w @ R_wi || - R_gt = self.theta.reshape((self.d, self.d)) + measurements: + R_wi = R_w.T @ R_i + """ + R_gt = self.theta[:, i * self.d : (i + 1) * self.d] y = [] - for i in range(self.n_meas): + for _ in range(n_meas): # noise model: R_i = R.T @ Rnoise if noise > 0: # Generate a random small rotation as noise and apply it noise_rotvec = np.random.normal(scale=noise, size=(self.d,)) Rnoise = ( - R.from_rotvec(noise_rotvec).as_matrix() + Rotation.from_rotvec(noise_rotvec).as_matrix() if self.d == 3 - else R.from_euler("z", noise_rotvec[0]).as_matrix()[:2, :2] + else Rotation.from_euler("z", noise_rotvec[0]).as_matrix()[:2, :2] ) - Ri = R_gt.T @ Rnoise + Ri = R_gt @ Rnoise else: - Ri = R_gt.T + Ri = R_gt y.append(Ri) return y + def simulate_y(self, noise: float | None = None) -> dict: + if noise is None: + noise = self.NOISE + + y = {} + if self.n_abs > 0: + for i in range(self.n_rot): + y[i] = self.add_absolute_measurement(i, noise, self.n_abs) + else: + y[0] = self.add_absolute_measurement(0, 0.0, 1) # add prior + + if self.n_rel > 0: + if self.sparsity == "chain": + for i in range(self.n_rot - 1): + j = i + 1 + y[(i, j)] = self.add_relative_measurement(i, j, noise) + else: + raise ValueError(f"Unknown sparsity {self.sparsity}") + return y + def get_Q(self, noise: float | None = None, output_poly: bool = False): if noise is None: noise = self.NOISE @@ -101,152 +299,215 @@ def get_Q(self, noise: float | None = None, output_poly: bool = False): return self.get_Q_from_y(self.y_, output_poly=output_poly) def get_Q_from_y(self, y, output_poly=False): - # f(R) = sum_i || R @ R_i - I ||_F^2 - # argmin f(R) = argmin sum_i || R_i.T @ R_i ||^2 - 2 tr(R.T @ R_i) + ||I||_F^2 - # = argmin sum_i -2 tr(R.T @ R_i) + sum_i d - # = argmin sum_i -2 vec(R).T @ vec(R_i.T) + N * d - # sanity check for zero noise: - # || R @ R.T - I ||_F^2 = 0 """param y: list of noisy rotation matrices.""" Q = PolyMatrix() - for Ri in y: - Q[self.HOM, "c"] -= Ri.T.flatten("C")[None, :] - Q[self.HOM, self.HOM] += len(y) * self.d + + for key, R in y.items(): + if isinstance(key, int): + # loop over all absolute measurements of this rotation. + assert isinstance(R, list) + for Rk in R: + if self.level == "no": + # treat unary factors + # Rk is measured + # f(R) = sum_k || R - Rk ||_F^2 + # = sum_k 2tr(I) - 2tr(R'Rk) + Q_test = PolyMatrix() + Q_test[self.HOM, f"c_{key}"] -= Rk.flatten("F")[None, :] + # Not adding below to be consistent with "bm" case, where we cannot + # add a constant term to the cost. + # Q_test[self.HOM, self.HOM] += 2 * self.d + if DEBUG: + x = self.get_x() + cost_x = x.T @ Q_test.get_matrix(self.var_dict) @ x + Ri = self.theta[:, key * self.d : (key + 1) * self.d] + cost_R = np.linalg.norm(Ri - Rk) ** 2 - 2 * self.d + assert abs(cost_x - cost_R) < 1e-10 + Q += Q_test + + elif self.level == "bm": + # in this case, we use self.HOM as a world frame. + # f(R) = sum_i || R - HOM @ Rk || + # compare with below: + # R corresponds to Rj, HOM corresponds to Ri + Q_test = PolyMatrix() + Q_test[self.HOM, f"c_{key}"] -= Rk + if DEBUG: + x = self.get_x() + cost_x = np.trace( + x.T @ Q_test.get_matrix(self.var_dict) @ x + ) + Ri = self.theta[:, key * self.d : (key + 1) * self.d] + cost_R = np.linalg.norm(Ri - Rk) ** 2 - 2 * self.d + assert abs(cost_x - cost_R) < 1e-10 + Q += Q_test + # treat binary factors + # f(R) = sum_ij || Rj - Ri @ Rij ||_F^2 + # = sum_ij tr((Rj - Ri @ Rij)'(Rj - Ri @ Rij)) + # = sum_ij tr(Rj'Rj) - 2 tr(Rj'Ri Rij) + tr(Rij'Ri'RiRij) + # = sum_ij 2tr(I) - 2tr(Rij Rj' Ri) + # = sum_ij 2tr(I) - 2c_i' (Rij kron I) c_j + elif isinstance(key, tuple): + i, j = key + if self.level == "no": + Q_test = PolyMatrix() + + # Not adding below to be consistent with "bm" case, where we cannot + # add a constant term to the cost. + # Q_test[self.HOM, self.HOM] += 2 * self.d + Q_test[f"c_{i}", f"c_{j}"] = -np.kron(R, np.eye(self.d)) + if DEBUG: + x = self.get_x() + assert ( + x[1:].T @ x[1:] - np.trace(self.theta.T @ self.theta) + ) < 1e-10 + Ri = self.theta[:, i * self.d : (i + 1) * self.d] + Rj = self.theta[:, j * self.d : (j + 1) * self.d] + c_j = x[1 + j * self.d**2 : 1 + (j + 1) * self.d**2] + c_i = x[1 + i * self.d**2 : 1 + (i + 1) * self.d**2] + np.testing.assert_allclose(Ri.flatten("F"), c_i, atol=1e-5) + np.testing.assert_allclose(Rj.flatten("F"), c_j, atol=1e-5) + + tr_R = np.trace(R @ Rj.T @ Ri) + tr_c = c_i.T @ (np.kron(R, np.eye(self.d)) @ c_j) + assert abs(tr_R - tr_c) < 1e-10 + + cost_x = x.T @ Q_test.get_matrix(self.var_dict) @ x + cost_R = np.linalg.norm(Rj - Ri @ R) ** 2 - 2 * self.d + assert abs(cost_x - cost_R) < 1e-10 + Q += Q_test + elif self.level == "bm": + Q_test = PolyMatrix() + Q_test[f"c_{i}", f"c_{j}"] = -R + if DEBUG: + x = self.get_x() + Ri = self.theta[:, i * self.d : (i + 1) * self.d] + Rj = self.theta[:, j * self.d : (j + 1) * self.d] + + cost_x = np.trace(x.T @ Q_test.get_matrix(self.var_dict) @ x) + cost_R = np.linalg.norm(Rj - Ri @ R) ** 2 - 2 * self.d + assert abs(cost_x - cost_R) < 1e-9 + Q += Q_test + if output_poly: return Q else: return Q.get_matrix(self.var_dict) - def local_solver_old( - self, t0, y, verbose=False, method=METHOD, solver_kwargs=SOLVER_KWARGS - ): - import pymanopt - from pymanopt.manifolds import SpecialOrthogonalGroup - - if method == "CG": - from pymanopt.optimizers import ConjugateGradient as Optimizer # fastest - elif method == "SD": - from pymanopt.optimizers import SteepestDescent as Optimizer # slow - elif method == "TR": - from pymanopt.optimizers import TrustRegions as Optimizer # okay - else: - raise ValueError(method) - - if verbose: - solver_kwargs["verbosity"] = 2 - else: - solver_kwargs["verbosity"] = 0 - - manifold = SpecialOrthogonalGroup(self.d, k=1) - - @pymanopt.function.autograd(manifold) - def cost(R): - cost = 0 - for Ri in y: - cost += np.sum((R.T @ Ri - np.eye(self.d)) ** 2) - return cost - - euclidean_gradient = None - problem = pymanopt.Problem( - manifold, cost, euclidean_gradient=euclidean_gradient - ) - optimizer = Optimizer(**solver_kwargs) - - res = optimizer.run(problem, initial_point=t0) - theta_hat = res.point - - success = ("min step_size" in res.stopping_criterion) or ( - "min grad norm" in res.stopping_criterion - ) - info = { - "success": success, - "msg": res.stopping_criterion, - } - if success: - return theta_hat, info, cost - - def test_and_add(self, A_list, Ai, output_poly): + def test_and_add(self, A_list, Ai, output_poly, b_list=[], bi=0.0): x = self.get_x() Ai_sparse = Ai.get_matrix(self.var_dict) - err = x.T @ Ai_sparse @ x + err = np.trace(np.atleast_2d(x.T @ Ai_sparse @ x)) - bi assert abs(err) <= 1e-10, err if output_poly: A_list.append(Ai) else: A_list.append(Ai_sparse) + b_list.append(bi) + + def get_A0(self, var_subset=None): + if var_subset is None: + var_subset = self.var_dict + if self.level == "no": + return super().get_A0(var_subset=var_subset) + else: + # using self.HOM, None just to make it clear that the second argument is not used. + A_orth, b_orth = get_orthogonal_constraints( + self.HOM, None, self.d, self.level + ) + return [Ai.get_matrix(var_subset) for Ai in A_orth], b_orth def get_A_known(self, var_dict=None, output_poly=False, add_redundant=False): A_list = [] + b_list = [] if var_dict is None: var_dict = self.var_dict - if "c" in var_dict: - # enforce diagonal == 1 for R'R = I - for i in range(self.d): - Ei = np.zeros((self.d, self.d)) - Ei[i, i] = 1.0 - constraint = np.kron(Ei, np.eye(self.d)) - Ai = PolyMatrix(symmetric=True) - Ai["c", "c"] = constraint - Ai[self.HOM, self.HOM] = -1 - self.test_and_add(A_list, Ai, output_poly=output_poly) - - # enforce off-diagonal == 0 for R'R = I - for i in range(self.d): - for j in range(i + 1, self.d): - Ei = np.zeros((self.d, self.d)) - Ei[i, j] = 1.0 - Ei[j, i] = 1.0 - constraint = np.kron(Ei, np.eye(self.d)) - Ai = PolyMatrix(symmetric=True) - Ai["c", "c"] = constraint - self.test_and_add(A_list, Ai, output_poly=output_poly) - - # enforce that determinant is one. - if self.d == 2 and self.ADD_DETERMINANT: - # C = [a b; c d]; ad - bc - 1 = 0 - # a b c d - # a 1 - # b -1 - # c -1 - # d 1 - Ai = PolyMatrix(symmetric=True) - constraint = np.zeros((self.d**2, self.d**2)) - constraint[0, 3] = constraint[3, 0] = 1.0 - constraint[1, 2] = constraint[2, 1] = -1.0 - Ai[self.HOM, self.HOM] = -2 - Ai["c", "c"] = constraint - self.test_and_add(A_list, Ai, output_poly=output_poly) - elif self.d == 3 and self.ADD_DETERMINANT: - # c11 c12 c13 c21 * c32 - c31 * c22 = c13 - # C = [c21, c22, c23]; c1 x c2 = c3: c31 * c12 - c11 * c12 = c23 - # c31 c32 c33 c11 * c22 - c21 * c12 = c33 - print( - "Warning: consider implementing the determinant constraint for RobustPoseLifter, d=3" + for k in range(self.n_rot): + if f"c_{k}" in var_dict: + A_orth, b_orth = get_orthogonal_constraints( + f"c_{k}", self.HOM, self.d, self.level ) - - if add_redundant and "c" in var_dict: - # enforce diagonal == 1 for RR' = I - for i in range(self.d): - Ei = np.zeros((self.d, self.d)) - Ei[i, i] = 1.0 - constraint = np.kron(np.eye(self.d), Ei) - Ai = PolyMatrix(symmetric=True) - Ai["c", "c"] = constraint - Ai[self.HOM, self.HOM] = -1 - self.test_and_add(A_list, Ai, output_poly=output_poly) - - # enforce off-diagonal == 0 for RR' = I - for i in range(self.d): - for j in range(i + 1, self.d): + for Ai, bi in zip(A_orth, b_orth): + self.test_and_add(A_list, Ai, output_poly, b_list, bi) + + # enforce that determinant is one. + if self.d == 2 and self.ADD_DETERMINANT: + # level "no": + # C = [a b; c d]; ad - bc - 1 = 0 + # a b c d + # a 1 + # b -1 + # c -1 + # d 1 + # level "bm" + # C = [a b + # c d] + # C @ C.T + # [a b] [a c] [a^2 + b^2 a*c + b*d] + # [c d] [b d] = [a*c + b*d c^2 + d^2] + # cannot be implemented... + if self.level == "bm": + raise NotImplementedError( + "Cannot add determinant constraint for level bm" + ) + Ai = PolyMatrix(symmetric=True) + constraint = np.zeros((self.d**2, self.d**2)) + constraint[0, 3] = constraint[3, 0] = 1.0 + constraint[1, 2] = constraint[2, 1] = -1.0 + Ai[self.HOM, self.HOM] = -2 + Ai[f"c_{k}", f"c_{k}"] = constraint + self.test_and_add(A_list, Ai, output_poly, b_list, 0.0) + elif self.d == 3 and self.ADD_DETERMINANT: + # c11 c12 c13 c21 * c32 - c31 * c22 = c13 + # C = [c21, c22, c23]; c1 x c2 = c3: c31 * c12 - c11 * c12 = c23 + # c31 c32 c33 c11 * c22 - c21 * c12 = c33 + print( + "Warning: consider implementing the determinant constraint for RobustPoseLifter, d=3" + ) + + if add_redundant and f"c_{k}" in var_dict: + if self.level == "bm": + print("No known redundant constraints for level bm") + continue + + # enforce diagonal == 1 for RR' = I + for i in range(self.d): Ei = np.zeros((self.d, self.d)) - Ei[i, j] = 1.0 - Ei[j, i] = 1.0 + Ei[i, i] = 1.0 + Ai = PolyMatrix(symmetric=True) + Ai[self.HOM, self.HOM] = -1 constraint = np.kron(np.eye(self.d), Ei) + Ai[f"c_{k}", f"c_{k}"] = constraint + self.test_and_add(A_list, Ai, output_poly, b_list, 0.0) + + if self.d == 2: + # enforce structure: + # [cos(x) -sin(x)] + # [sin(x) cos(x)] + # => [c s -s c] Ai = PolyMatrix(symmetric=True) - Ai["c", "c"] = constraint - self.test_and_add(A_list, Ai, output_poly=output_poly) - return A_list + Ai[self.HOM, f"c_{k}"] = np.array([1.0, 0, 0, -1.0])[None, :] + self.test_and_add(A_list, Ai, output_poly, b_list, 0.0) + + Ai = PolyMatrix(symmetric=True) + Ai[self.HOM, f"c_{k}"] = np.array([0, 1.0, 1.0, 0.0])[None, :] + self.test_and_add(A_list, Ai, output_poly, b_list, 0.0) + + # enforce off-diagonal == 0 for RR' = I + for i in range(self.d): + for j in range(i + 1, self.d): + Ei = np.zeros((self.d, self.d)) + Ei[i, j] = 1.0 + Ei[j, i] = 1.0 + constraint = np.kron(np.eye(self.d), Ei) + Ai = PolyMatrix(symmetric=True) + Ai[f"c_{k}", f"c_{k}"] = constraint + self.test_and_add(A_list, Ai, output_poly, b_list, 0.0) + if self.level == "bm": + return A_list, b_list + else: + return A_list def plot(self, estimates={}): import itertools @@ -256,22 +517,105 @@ def plot(self, estimates={}): from popcor.utils.plotting_tools import plot_frame fig, ax = plt.subplots() - plot_frame(ax=ax, theta=self.theta, label="gt", ls="-", scale=0.5, marker="") - - linestyles = itertools.cycle(["--", "-.", ":"]) - for label, theta in estimates.items(): + label = "gt" + for i in range(self.n_rot): plot_frame( ax=ax, - theta=theta, + theta=self.theta[:, i * self.d : (i + 1) * self.d], label=label, - ls=next(linestyles), - scale=1.0, + ls="-", + scale=0.5, marker="", + r_wc_w=np.hstack([i * 2.0] + [0.0] * (self.d - 1)), # type.ignore ) + label = None + + linestyles = itertools.cycle(["--", "-.", ":"]) + for label, theta in estimates.items(): + for i in range(self.n_rot): + plot_frame( + ax=ax, + theta=theta[:, i * self.d : (i + 1) * self.d], + label=label, + ls=next(linestyles), + scale=1.0, + marker="", + r_wc_w=np.hstack([i * 2.0] + [0.0] * (self.d - 1)), # type.ignore + ) + label = None ax.set_aspect("equal") ax.legend() return fig, ax def __repr__(self): - return f"rotation_lifter{self.d}d" + return f"rotation_lifter{self.d}d_{self.level}" + + +if __name__ == "__main__": + import matplotlib.pyplot as plt + import numpy as np + from cert_tools.linalg_tools import rank_project + from cert_tools.sdp_solvers import solve_sdp + + from popcor.utils.plotting_tools import plot_matrix + + level = "no" + # level = "bm" + + np.random.seed(0) + # lifter = RotationLifter( + # d=2, n_abs=0, n_rel=1, n_rot=4, sparsity="chain", level=level + # ) + lifter = RotationLifter( + d=2, n_abs=1, n_rel=0, n_rot=4, sparsity="chain", level=level + ) + + # add relative measurements between all subsequent rotations. + # lifter.add_relative_measurement(0, 1, noise=1e-3) + # add one absolute measurement to fix Gauge freedom + # lifter.add_absolute_measurement(0, noise=1e-5) + + y = lifter.simulate_y(noise=1e-10) + + x = lifter.get_x() + rank = x.shape[1] if np.ndim(x) == 2 else 1 + + theta_gt, *_ = lifter.local_solver(lifter.theta, y, verbose=False) + estimates = {"init gt": theta_gt} + for i in range(0): + theta_init = lifter.sample_theta() + theta_i, *_ = lifter.local_solver(theta_init, y, verbose=False) + estimates[f"init random {i}"] = theta_i + + fig, ax = lifter.plot(estimates=estimates) + ax.legend() + plt.show(block=False) + + Q = lifter.get_Q_from_y(y=y, output_poly=False) + A_known = lifter.get_A_known(output_poly=False, add_redundant=True) + constraints = lifter.get_A_b_list(A_known) + + fig, axs = plt.subplots(1, len(constraints) + 1) + fig.set_size_inches(3 * (len(constraints) + 1), 3) + for i in range(len(constraints)): + Ai = constraints[i][0] + plot_matrix(Ai.toarray(), ax=axs[i], title=f"A{i} ", colorbar=False) # type: ignore + + fig = plot_matrix(Q.toarray(), ax=axs[i + 1], title="Q", colorbar=False) # type: ignore + plt.show(block=False) + + X, info = solve_sdp(Q, constraints, verbose=False) + + x, info_rank = rank_project(X, p=rank) + print(f"EVR: {info_rank['EVR']:.2e}") + + if rank == 1: + theta_opt = lifter.get_theta(x.flatten()) + else: + theta_opt = lifter.get_theta(x[:, :rank]) + + estimates = {"init gt": theta_gt, "SDP": theta_opt} + fig, ax = lifter.plot(estimates=estimates) + + print("done") diff --git a/popcor/utils/plotting_tools.py b/popcor/utils/plotting_tools.py index 4adc893..070c5d0 100644 --- a/popcor/utils/plotting_tools.py +++ b/popcor/utils/plotting_tools.py @@ -113,12 +113,14 @@ def plot_frame( ls="--", alpha=0.5, d=3, + r_wc_w: np.ndarray | None = None, **kwargs, ): if np.ndim(theta) == 2: # used by rotation averaging C_cw = theta - r_wc_w = np.zeros((theta.shape[0])) + if r_wc_w is None: + r_wc_w = np.zeros((theta.shape[0])) else: try: C_cw, r_wc_c = get_C_r_from_theta(theta, d=d) diff --git a/tests/test_rotation_lifter.py b/tests/test_rotation_lifter.py new file mode 100644 index 0000000..06e3039 --- /dev/null +++ b/tests/test_rotation_lifter.py @@ -0,0 +1,191 @@ +""" +Tests for RotationLifter behaviors including measurement generation, local solvers, +and semidefinite programming (SDP) relaxations. Covers consistency between rank-1 +and rank-d lifters, correctness of the forward measurement model, convergence of +a local solver under varying noise levels, and recovery via an SDP relaxation. +""" + +import math + +import matplotlib.pyplot as plt +import numpy as np +import pytest +import scipy.sparse as sp +from cert_tools.linalg_tools import rank_project +from cert_tools.sdp_solvers import solve_sdp + +from popcor.examples import RotationLifter +from popcor.utils.plotting_tools import plot_matrix + +PLOT = False + + +def plot_matrices(A_known, Q): + n_cols = min(1 + len(A_known), 10) + n_rows = math.ceil((len(A_known) + 1) / n_cols) + fig, axs = plt.subplots(n_rows, n_cols) + fig.set_size_inches(3 * n_cols, 3 * n_rows) + axs = axs.flatten() + for i in range(len(A_known)): + assert isinstance(A_known[i], sp.csc_matrix) + plot_matrix(A_known[i].toarray(), ax=axs[i], title=f"A{i} ", colorbar=False) + fig = plot_matrix(Q.toarray(), ax=axs[i + 1], title="Q", colorbar=False) + [axs[j].axis("off") for j in range(i + 1, len(axs))] + + +@pytest.mark.parametrize("d", [2, 3]) +def test_rank1_vs_rankd(d): + """Ensure rank-1 and rank-d RotationLifter formulations yield identical costs.""" + n_rot = 3 + n_abs = 2 + + np.random.seed(0) + r1 = RotationLifter(level="no", n_abs=n_abs, n_rot=n_rot, d=d) + + np.random.seed(0) + r2 = RotationLifter(level="bm", n_abs=n_abs, n_rot=n_rot, d=d) + + np.testing.assert_array_equal(r1.theta, r2.theta) + + y = r1.simulate_y(noise=1e-5) + + x1 = r1.get_x() + x2 = r2.get_x() + + Q1 = r1.get_Q_from_y(y, output_poly=False) + Q2 = r2.get_Q_from_y(y, output_poly=False) + + cost1 = x1.T @ Q1 @ x1 + cost2 = np.trace(x2.T @ Q2 @ x2) + assert abs(cost1 - cost2) < 1e-10 + + # actual optimal cost should be + # 2 sum_ij tr(I) - 2 sum_ij tr(Ri.T@R_ij@R_j) + cost_optimal = cost1 + 2 * (n_abs * n_rot + n_rot - 1) * d + assert abs(cost_optimal) < 1e-8 + + +@pytest.mark.parametrize("d", [2, 3]) +def test_measurements(d, level="no"): + """Verify that the simulated measurements match the lifter's internal rotations.""" + np.random.seed(0) + n_abs = 2 + n_rot = 3 + d = 3 + lifter = RotationLifter( + d=d, n_abs=n_abs, n_rot=n_rot, sparsity="chain", level=level + ) + y = lifter.simulate_y(noise=1e-10) + + for key, R in y.items(): + if isinstance(key, int): + # unary factor + for Rk in R: + R_i = lifter.theta[:, key * lifter.d : (key + 1) * lifter.d] + np.testing.assert_allclose(R_i, Rk, atol=1e-5) + elif isinstance(key, tuple): + # binary factor + i, j = key + R_i = lifter.theta[:, i * lifter.d : (i + 1) * lifter.d] + R_j = lifter.theta[:, j * lifter.d : (j + 1) * lifter.d] + np.testing.assert_allclose(R_i.T @ R_j, R, atol=1e-5) + + x = lifter.get_x() + theta = lifter.get_theta(x) + np.testing.assert_allclose(theta, lifter.theta) + + +def test_solve_local(): + """Test the local solver's convergence from different initializations and noise levels.""" + d = 3 + n_abs = 1 + n_rot = 4 + for noise in [0.0, 0.2]: + np.random.seed(2) + lifter = RotationLifter( + d=d, n_abs=n_abs, n_rot=n_rot, sparsity="chain", level="no" + ) + + y = lifter.simulate_y(noise=noise) + + theta_gt, *_ = lifter.local_solver(lifter.theta, y, verbose=False) + estimates = {"init gt": theta_gt} + for i in range(10): + theta_init = lifter.sample_theta() + theta_i, *_ = lifter.local_solver(theta_init, y, verbose=False) + + # make sure we are starting relatively far from ground truth + assert np.any(np.abs(theta_init - theta_gt) > 1e-1) + estimates[f"init random {i}"] = theta_i + + # check convergence tolerance depending on noise + if noise == 0.0: + np.testing.assert_allclose(theta_i, lifter.theta, atol=1e-5) + else: + # 0.5 is found empirically + np.testing.assert_allclose(theta_i, lifter.theta, atol=0.5) + + if PLOT: + _, ax = lifter.plot(estimates=estimates) + handles, labels = ax.get_legend_handles_labels() + ax.legend( + handles[:3], labels[:3], loc="lower left", bbox_to_anchor=(0.0, 1.0) + ) + plt.show(block=False) + print("done") + + +def test_solve_sdp(): + """Solve the SDP relaxation and compare the recovered rotations to ground truth.""" + d = 3 + n_abs = 1 + n_rot = 4 + estimates = {} + for noise in [0.0, 0.2]: + np.random.seed(2) + lifter = RotationLifter( + d=d, n_abs=n_abs, n_rot=n_rot, sparsity="chain", level="no" + ) + + y = lifter.simulate_y(noise=noise) + Q = lifter.get_Q_from_y(y=y, output_poly=False) + + assert isinstance(Q, sp.csc_matrix) + A_known = lifter.get_A_known() + constraints = lifter.get_A_b_list(A_known) + + if noise == 0 and PLOT: + plot_matrices(A_known, Q) + + X, info = solve_sdp(Q, constraints, verbose=False) + + x, info_rank = rank_project(X, p=1) + print(f"EVR at noise {noise:.2f}: {info_rank['EVR']:.2e}") + if noise <= 1: + assert info_rank["EVR"] > 1e8 + theta_sdp = lifter.get_theta(x.flatten()) + if noise == 0.0: + np.testing.assert_allclose(theta_sdp, lifter.theta, atol=1e-5) + + estimates.update({"init gt": lifter.theta, f"SDP noise {noise:.2f}": theta_sdp}) + + if PLOT: + fig, ax = lifter.plot(estimates=estimates) + _, ax = lifter.plot(estimates=estimates) + plt.show(block=False) + return + + +if __name__ == "__main__": + test_solve_sdp() + test_solve_local() + + test_measurements(d=2, level="no") + test_measurements(d=2, level="bm") + test_measurements(d=3, level="no") + test_measurements(d=3, level="bm") + + test_rank1_vs_rankd(d=2) + test_rank1_vs_rankd(d=3) + + print("all tests passed")