diff --git a/local_pathfinding/objectives.py b/local_pathfinding/objectives.py index 835c4b7..b644f84 100644 --- a/local_pathfinding/objectives.py +++ b/local_pathfinding/objectives.py @@ -1,5 +1,6 @@ """Our custom OMPL optimization objectives.""" +import itertools import math from enum import Enum, auto @@ -63,19 +64,71 @@ class Objective(ob.StateCostIntegralObjective): - This class inherits from the OMPL class StateCostIntegralObjective: https://ompl.kavrakilab.org/classompl_1_1base_1_1StateCostIntegralObjective.html - Camelcase is used for functions that override OMPL functions, as that is their convention. + - Also computes the maximum motion cost for normalization purposes. Attributes: space_information (StateSpacePtr): Contains all the information about the space planning is done in. + max_motion_cost (float): The maximum motion cost between any two states in the state space. """ - def __init__(self, space_information): + def __init__(self, space_information, num_samples: int): super().__init__(si=space_information, enableMotionCostInterpolation=True) self.space_information = space_information + states = self.sample_states(num_samples) + # initialize to 1 so that motionCost is not normalized when finding the maximum motion cost + self.max_motion_cost = 1.0 + self.max_motion_cost = self.find_maximum_motion_cost(states) + def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: raise NotImplementedError + def find_maximum_motion_cost(self, states: list[ob.SE2StateSpace]) -> float: + """Finds the maximum motion cost between any two states in `states`. + + Args: + states (list[ob.SE2StateSpace]): OMPL states. + + Returns: + float: Maximum motion cost. + """ + return max( + self.motionCost(s1, s2).value() + for s1, s2 in itertools.combinations(iterable=states, r=2) + ) + + def sample_states(self, num_samples: int) -> list[ob.SE2StateSpace]: + """Samples `num_samples` states from the state space. + + Args: + num_samples (int): Number of states to sample. + + Returns: + list[ob.SE2StateSpace]: OMPL states. + """ + sampler = self.space_information.getStateSpace().allocDefaultStateSampler() + + sampled_states = [] + + for _ in range(num_samples): + state = self.space_information.getStateSpace().allocState() + sampler.sampleUniform(state) + sampled_states.append(state) + + return sampled_states + + def normalization(self, cost: float) -> float: + """Normalizes cost using max_motion_cost and caps it at 1. + + Args: + cost (float): motionCost value from an objective function. + Returns: + float: normalized cost between 0 to 1. + """ + normalized_cost = cost / self.max_motion_cost + return min(normalized_cost, 1.0) + class DistanceObjective(Objective): """Generates a distance objective function @@ -94,13 +147,14 @@ def __init__( method: DistanceMethod, reference=HelperLatLon(latitude=0.0, longitude=0.0), ): - super().__init__(space_information) self.method = method if self.method == DistanceMethod.OMPL_PATH_LENGTH: - self.ompl_path_objective = ob.PathLengthOptimizationObjective(self.space_information) + self.ompl_path_objective = ob.PathLengthOptimizationObjective(space_information) elif self.method == DistanceMethod.LATLON: self.reference = reference + super().__init__(space_information, num_samples=100) + def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: """Generates the distance between two points @@ -109,7 +163,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: s2 (SE2StateInternal): The ending point of the local goal state Returns: - ob.Cost: The distance between two points object + ob.Cost: The normalized distance between two points Raises: ValueError: If the distance method is not supported @@ -118,17 +172,16 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: s2_xy = cs.XY(s2.getX(), s2.getY()) if self.method == DistanceMethod.EUCLIDEAN: distance = DistanceObjective.get_euclidean_path_length_objective(s1_xy, s2_xy) - cost = ob.Cost(distance) elif self.method == DistanceMethod.LATLON: distance = DistanceObjective.get_latlon_path_length_objective( s1_xy, s2_xy, self.reference ) - cost = ob.Cost(distance) elif self.method == DistanceMethod.OMPL_PATH_LENGTH: - cost = self.ompl_path_objective.motionCost(s1_xy, s2_xy) + distance = self.ompl_path_objective.motionCost(s1, s2).value() else: - ValueError(f"Method {self.method} not supported") - return cost + raise ValueError(f"Method {self.method} not supported") + + return ob.Cost(self.normalization(distance)) @staticmethod def get_euclidean_path_length_objective(s1: cs.XY, s2: cs.XY) -> float: @@ -177,13 +230,8 @@ class MinimumTurningObjective(Objective): """ def __init__( - self, - space_information, - simple_setup, - heading_degrees: float, - method: MinimumTurningMethod, + self, space_information, simple_setup, heading_degrees: float, method: MinimumTurningMethod ): - super().__init__(space_information) self.goal = cs.XY( simple_setup.getGoal().getState().getX(), simple_setup.getGoal().getState().getY() ) @@ -191,6 +239,8 @@ def __init__( self.heading = math.radians(heading_degrees) self.method = method + super().__init__(space_information, num_samples=100) + def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: """Generates the turning cost between s1, s2, heading or the goal position @@ -199,7 +249,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: s2 (SE2StateInternal): The ending point of the local goal state Returns: - ob.Cost: The minimum turning angle in degrees + ob.Cost: The normalized minimum turning angle in degrees Raises: ValueError: If the minimum turning method is not supported @@ -213,8 +263,9 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: elif self.method == MinimumTurningMethod.HEADING_PATH: angle = self.heading_path_turn_cost(s1_xy, s2_xy, self.heading) else: - ValueError(f"Method {self.method} not supported") - return ob.Cost(angle) + raise ValueError(f"Method {self.method} not supported") + + return ob.Cost(self.normalization(angle)) @staticmethod def goal_heading_turn_cost(s1: cs.XY, goal: cs.XY, heading: float) -> float: @@ -302,10 +353,11 @@ class WindObjective(Objective): """ def __init__(self, space_information, wind_direction_degrees: float): - super().__init__(space_information) assert -180 < wind_direction_degrees <= 180 self.wind_direction = math.radians(wind_direction_degrees) + super().__init__(space_information, num_samples=100) + def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: """Generates the cost associated with the upwind and downwind directions of the boat in relation to the wind. @@ -315,11 +367,13 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: s2 (SE2StateInternal): The ending point of the local goal state Returns: - ob.Cost: The cost of going upwind or downwind + ob.Cost: The normalized cost of going upwind or downwind """ s1_xy = cs.XY(s1.getX(), s1.getY()) s2_xy = cs.XY(s2.getX(), s2.getY()) - return ob.Cost(WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction)) + wind_cost = WindObjective.wind_direction_cost(s1_xy, s2_xy, self.wind_direction) + + return ob.Cost(self.normalization(wind_cost)) @staticmethod def wind_direction_cost(s1: cs.XY, s2: cs.XY, wind_direction: float) -> float: @@ -425,7 +479,6 @@ def __init__( wind_speed: float, method: SpeedObjectiveMethod, ): - super().__init__(space_information) assert -180 < wind_direction <= 180 self.wind_direction = math.radians(wind_direction) @@ -434,6 +487,7 @@ def __init__( self.wind_speed = wind_speed self.method = method + super().__init__(space_information, num_samples=100) def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: """Generates the cost associated with the speed of the boat. @@ -443,18 +497,19 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: s2 (SE2StateInternal): The ending point of the local goal state Returns: - ob.Cost: The cost of going upwind or downwind + ob.Cost: The normalized cost of going upwind or downwind """ s1_xy = cs.XY(s1.getX(), s1.getY()) s2_xy = cs.XY(s2.getX(), s2.getY()) + heading_s1_to_s2 = math.atan2(s2_xy.x - s1_xy.x, s2_xy.y - s1_xy.y) sailbot_speed = self.get_sailbot_speed( - self.heading_direction, self.wind_direction, self.wind_speed + heading_s1_to_s2, self.wind_direction, self.wind_speed ) if sailbot_speed == 0: - return ob.Cost(10000) + return ob.Cost(1.0) if self.method == SpeedObjectiveMethod.SAILBOT_TIME: distance = DistanceObjective.get_euclidean_path_length_objective(s1_xy, s2_xy) @@ -468,7 +523,7 @@ def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost: cost = ob.Cost(self.get_continuous_cost(sailbot_speed)) else: ValueError(f"Method {self.method} not supported") - return cost + return ob.Cost(self.normalization(cost.value())) @staticmethod def get_sailbot_speed(heading: float, wind_direction: float, wind_speed: float) -> float: @@ -571,28 +626,23 @@ def get_sailing_objective( wind_speed: float, ) -> ob.OptimizationObjective: objective = ob.MultiOptimizationObjective(si=space_information) - objective.addObjective( - objective=DistanceObjective(space_information, DistanceMethod.LATLON), - weight=1.0, - ) - objective.addObjective( - objective=MinimumTurningObjective( - space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING - ), - weight=100.0, + objective_1 = DistanceObjective( + space_information=space_information, method=DistanceMethod.LATLON ) - objective.addObjective( - objective=WindObjective(space_information, wind_direction_degrees), weight=1.0 + objective_2 = MinimumTurningObjective( + space_information, simple_setup, heading_degrees, MinimumTurningMethod.GOAL_HEADING ) - objective.addObjective( - objective=SpeedObjective( - space_information, - heading_degrees, - wind_direction_degrees, - wind_speed, - SpeedObjectiveMethod.SAILBOT_TIME, - ), - weight=1.0, + objective_3 = WindObjective(space_information, wind_direction_degrees) + objective_4 = SpeedObjective( + space_information=space_information, + heading_direction=heading_degrees, + wind_direction=wind_direction_degrees, + wind_speed=wind_speed, + method=SpeedObjectiveMethod.SAILBOT_TIME, ) + objective.addObjective(objective=objective_1, weight=0.25) + objective.addObjective(objective=objective_2, weight=0.25) + objective.addObjective(objective=objective_3, weight=0.25) + objective.addObjective(objective=objective_4, weight=0.25) return objective diff --git a/test/test_objectives.py b/test/test_objectives.py index 9289b65..f9e928d 100644 --- a/test/test_objectives.py +++ b/test/test_objectives.py @@ -1,3 +1,4 @@ +import itertools import math import pytest @@ -27,20 +28,35 @@ ) +""" Tests for distance objective """ + + @pytest.mark.parametrize( - "method", + "method,max_motion_cost", [ - objectives.DistanceMethod.EUCLIDEAN, - objectives.DistanceMethod.LATLON, - objectives.DistanceMethod.OMPL_PATH_LENGTH, + (objectives.DistanceMethod.EUCLIDEAN, 2.5), + (objectives.DistanceMethod.LATLON, 2700), + (objectives.DistanceMethod.OMPL_PATH_LENGTH, 4.0), ], ) -def test_distance_objective(method: objectives.DistanceMethod): +def test_distance_objective(method: objectives.DistanceMethod, max_motion_cost: float): distance_objective = objectives.DistanceObjective( PATH._simple_setup.getSpaceInformation(), method, ) - assert distance_objective is not None + + # test sample_states() + num_samples = 3 + sampled_states = distance_objective.sample_states(num_samples) + assert len(sampled_states) == num_samples + + # test find_maximum_motion_cost() + assert distance_objective.max_motion_cost == pytest.approx(max_motion_cost, rel=1e0) + + # test if the motionCost() is normalized between 0 and 1 for 10 random samples + states = distance_objective.sample_states(10) + for s1, s2 in itertools.combinations(iterable=states, r=2): + assert 0 <= distance_objective.motionCost(s1, s2).value() <= 1 @pytest.mark.parametrize( @@ -86,22 +102,39 @@ def test_get_latlon_path_length_objective(rf: tuple, cs1: tuple, cs2: tuple): ) == pytest.approx(distance_m) +""" Tests for minimum turning objective """ + + @pytest.mark.parametrize( - "method", + "method,heading_degrees,max_motion_cost", [ - objectives.MinimumTurningMethod.GOAL_HEADING, - objectives.MinimumTurningMethod.GOAL_PATH, - objectives.MinimumTurningMethod.HEADING_PATH, + (objectives.MinimumTurningMethod.GOAL_HEADING, 60.0, 174.862), + (objectives.MinimumTurningMethod.GOAL_PATH, 60.0, 179.340), + (objectives.MinimumTurningMethod.HEADING_PATH, 60.0, 179.962), ], ) -def test_minimum_turning_objective(method: objectives.MinimumTurningMethod): +def test_minimum_turning_objective( + method: objectives.MinimumTurningMethod, heading_degrees: float, max_motion_cost: float +): minimum_turning_objective = objectives.MinimumTurningObjective( PATH._simple_setup.getSpaceInformation(), PATH._simple_setup, - PATH.state.heading_direction, + heading_degrees, method, ) - assert minimum_turning_objective is not None + + # test sample_states() + num_samples = 3 + sampled_states = minimum_turning_objective.sample_states(num_samples) + assert len(sampled_states) == num_samples + + # test find_maximum_motion_cost() + assert minimum_turning_objective.max_motion_cost == pytest.approx(max_motion_cost, rel=1e0) + + # test if the motionCost() is normalized between 0 and 1 for 10 random samples + states = minimum_turning_objective.sample_states(10) + for s1, s2 in itertools.combinations(iterable=states, r=2): + assert 0 <= minimum_turning_objective.motionCost(s1, s2).value() <= 1 @pytest.mark.parametrize( @@ -154,6 +187,36 @@ def test_heading_path_turn_cost(cs1: tuple, cs2: tuple, heading_degrees: float, ) == pytest.approx(expected, abs=1e-3) +""" Tests for wind objective """ + + +@pytest.mark.parametrize( + "wind_direction_deg,max_motion_cost", + [ + (60.0, 7593.768), + (45.0, 7763.842), + (0.0, 7579.767), + ], +) +def test_wind_objective(wind_direction_deg: float, max_motion_cost: float): + wind_objective = objectives.WindObjective( + PATH._simple_setup.getSpaceInformation(), wind_direction_deg + ) + + # test sample_states() + num_samples = 3 + sampled_states = wind_objective.sample_states(num_samples) + assert len(sampled_states) == num_samples + + # test find_maximum_motion_cost() + assert wind_objective.max_motion_cost == pytest.approx(max_motion_cost, rel=1e0) + + # test if the motionCost() is normalized between 0 and 1 for 10 random samples + states = wind_objective.sample_states(10) + for s1, s2 in itertools.combinations(iterable=states, r=2): + assert 0 <= wind_objective.motionCost(s1, s2).value() <= 1 + + @pytest.mark.parametrize( "cs1,cs2,wind_direction_deg,expected", [ @@ -225,15 +288,18 @@ def test_angle_between(afir: float, amid: float, asec: float, expected: float): ) +""" Tests for speed objective """ + + @pytest.mark.parametrize( - "method", + "method,max_motion_cost", [ - objectives.SpeedObjectiveMethod.SAILBOT_TIME, - objectives.SpeedObjectiveMethod.SAILBOT_PIECEWISE, - objectives.SpeedObjectiveMethod.SAILBOT_CONTINUOUS, + (objectives.SpeedObjectiveMethod.SAILBOT_TIME, 1.0), + (objectives.SpeedObjectiveMethod.SAILBOT_PIECEWISE, 1.0), + (objectives.SpeedObjectiveMethod.SAILBOT_CONTINUOUS, 1.0), ], ) -def test_speed_objective(method: objectives.SpeedObjectiveMethod): +def test_speed_objective(method: objectives.SpeedObjectiveMethod, max_motion_cost: float): speed_objective = objectives.SpeedObjective( PATH._simple_setup.getSpaceInformation(), PATH.state.heading_direction, @@ -241,7 +307,20 @@ def test_speed_objective(method: objectives.SpeedObjectiveMethod): PATH.state.wind_speed, method, ) - assert speed_objective is not None + + # test sample_states() + num_samples = 3 + sampled_states = speed_objective.sample_states(num_samples) + assert len(sampled_states) == num_samples + + # test find_maximum_motion_cost() + print(speed_objective.max_motion_cost) + assert speed_objective.max_motion_cost == pytest.approx(max_motion_cost, rel=1e0) + + # test if the motionCost() is normalized between 0 and 1 for 10 random samples + states = speed_objective.sample_states(10) + for s1, s2 in itertools.combinations(iterable=states, r=2): + assert 0 <= speed_objective.motionCost(s1, s2).value() <= 1 @pytest.mark.parametrize(