diff --git a/start_test_env.sh b/start_test_env.sh index 475434f0..b97e6f67 100755 --- a/start_test_env.sh +++ b/start_test_env.sh @@ -44,17 +44,17 @@ if [ $? -ne 0 ]; then fi # Change to the ssl-game-controller directory and run the game controller, suppressing output -echo "Starting game controller..." -cd ssl-game-controller/ -./ssl-game-controller > /dev/null 2>&1 & -GAME_CONTROLLER_PID=$! -cd .. +# echo "Starting game controller..." +# cd ssl-game-controller/ +# ./ssl-game-controller > /dev/null 2>&1 & +# GAME_CONTROLLER_PID=$! +# cd .. # Check if the game controller started successfully -if [ $? -ne 0 ]; then - echo "Failed to start game controller. Exiting." - cleanup -fi +# if [ $? -ne 0 ]; then +# echo "Failed to start game controller. Exiting." +# cleanup +# fi # Change to the AutoReferee directory and run the run.sh script, suppressing output echo "Starting AutoReferee..." diff --git a/utama_core/data_processing/receivers/referee_receiver.py b/utama_core/data_processing/receivers/referee_receiver.py index 50a54a21..f16280f7 100644 --- a/utama_core/data_processing/receivers/referee_receiver.py +++ b/utama_core/data_processing/receivers/referee_receiver.py @@ -179,6 +179,9 @@ def _update_data(self, referee_packet: Referee) -> None: if referee_packet.HasField("current_action_time_remaining") else None ), + game_events=list(referee_packet.game_events), + match_type=referee_packet.match_type, + status_message=referee_packet.status_message if referee_packet.status_message else None, ) # add to referee buffer diff --git a/utama_core/data_processing/refiners/referee.py b/utama_core/data_processing/refiners/referee.py index ec1e0f9c..788a0042 100644 --- a/utama_core/data_processing/refiners/referee.py +++ b/utama_core/data_processing/refiners/referee.py @@ -1,3 +1,4 @@ +import dataclasses from typing import Optional from utama_core.data_processing.refiners.base_refiner import BaseRefiner @@ -8,15 +9,30 @@ class RefereeRefiner(BaseRefiner): - def refine(self, game, data): - return game - + def __init__(self): self._referee_records = [] + def refine(self, game_frame, data: Optional[RefereeData]): + """Process referee data and update the game frame. + + Args: + game_frame: Current GameFrame object + data: Referee data to process (None if no referee) + + Returns: + Updated GameFrame with referee data attached, or the original frame if data is None + """ + if data is None: + return game_frame + + # Add to history + self.add_new_referee_data(data) + + # Return a new GameFrame with referee data injected + return dataclasses.replace(game_frame, referee=data) + def add_new_referee_data(self, referee_data: RefereeData) -> None: - if not self._referee_records: - self._referee_records.append(referee_data) - elif referee_data[1:] != self._referee_records[-1][1:]: + if not self._referee_records or referee_data != self._referee_records[-1]: self._referee_records.append(referee_data) def source_identifier(self) -> Optional[str]: diff --git a/utama_core/entities/data/referee.py b/utama_core/entities/data/referee.py index ba4c331a..b97b6c2a 100644 --- a/utama_core/entities/data/referee.py +++ b/utama_core/entities/data/referee.py @@ -1,12 +1,18 @@ -from typing import NamedTuple, Optional, Tuple +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, List, Optional, Tuple -from utama_core.entities.game.team_info import TeamInfo from utama_core.entities.referee.referee_command import RefereeCommand from utama_core.entities.referee.stage import Stage +if TYPE_CHECKING: + from utama_core.entities.game.team_info import TeamInfo + -class RefereeData(NamedTuple): - """Namedtuple for referee data.""" +@dataclass(eq=False) +class RefereeData: + """Dataclass for referee data.""" source_identifier: Optional[str] time_sent: float @@ -36,17 +42,47 @@ class RefereeData(NamedTuple): # * ball placement current_action_time_remaining: Optional[int] = None + # All game events detected since the last RUNNING state (e.g. foul type, ball-out side). + # Stored as raw protobuf GameEvent objects. Cleared when the game resumes. + # Useful for logging and future decision-making; not required for basic compliance. + game_events: List = field(default_factory=list) + + # Meta information about the match type: + # 0 = UNKNOWN_MATCH, 1 = GROUP_PHASE, 2 = ELIMINATION_PHASE, 3 = FRIENDLY + match_type: int = 0 + + # Human-readable message from the referee UI (e.g. reason for a stoppage). + status_message: Optional[str] = None + def __eq__(self, other): if not isinstance(other, RefereeData): return NotImplemented + # game_events, match_type, status_message, source_identifier, and + # timestamps are intentionally excluded from equality so they do not + # trigger spurious re-records in RefereeRefiner. + # TeamInfo has no __eq__ so compare the mutable game-state fields only. return ( self.stage == other.stage and self.referee_command == other.referee_command and self.referee_command_timestamp == other.referee_command_timestamp - and self.yellow_team == other.yellow_team - and self.blue_team == other.blue_team + and self.yellow_team.score == other.yellow_team.score + and self.yellow_team.goalkeeper == other.yellow_team.goalkeeper + and self.blue_team.score == other.blue_team.score + and self.blue_team.goalkeeper == other.blue_team.goalkeeper and self.designated_position == other.designated_position and self.blue_team_on_positive_half == other.blue_team_on_positive_half and self.next_command == other.next_command and self.current_action_time_remaining == other.current_action_time_remaining ) + + def __hash__(self): + return hash( + ( + self.stage, + self.referee_command, + self.referee_command_timestamp, + self.designated_position, + self.blue_team_on_positive_half, + self.next_command, + ) + ) diff --git a/utama_core/entities/game/current_game_frame.py b/utama_core/entities/game/current_game_frame.py index 4ae31849..33b851e6 100644 --- a/utama_core/entities/game/current_game_frame.py +++ b/utama_core/entities/game/current_game_frame.py @@ -18,6 +18,7 @@ def __init__(self, game: GameFrame): object.__setattr__(self, "friendly_robots", game.friendly_robots) object.__setattr__(self, "enemy_robots", game.enemy_robots) object.__setattr__(self, "ball", game.ball) + object.__setattr__(self, "referee", game.referee) object.__setattr__(self, "robot_with_ball", self._set_robot_with_ball(game)) object.__setattr__(self, "proximity_lookup", self._init_proximity_lookup(game)) diff --git a/utama_core/entities/game/game.py b/utama_core/entities/game/game.py index 3bb25e18..44d7054d 100644 --- a/utama_core/entities/game/game.py +++ b/utama_core/entities/game/game.py @@ -53,3 +53,7 @@ def robot_with_ball(self): @property def proximity_lookup(self): return self.current.proximity_lookup + + @property + def referee(self): + return self.current.referee diff --git a/utama_core/entities/game/game_frame.py b/utama_core/entities/game/game_frame.py index d7e62a69..85c8ee2c 100644 --- a/utama_core/entities/game/game_frame.py +++ b/utama_core/entities/game/game_frame.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from typing import Dict, Optional +from utama_core.entities.data.referee import RefereeData from utama_core.entities.game.ball import Ball from utama_core.entities.game.field import Field from utama_core.entities.game.robot import Robot @@ -18,6 +19,7 @@ class GameFrame: friendly_robots: Dict[int, Robot] enemy_robots: Dict[int, Robot] ball: Optional[Ball] + referee: Optional[RefereeData] = None def is_ball_in_goal(self, right_goal: bool) -> bool: ball_pos = self.ball.p diff --git a/utama_core/entities/referee/stage.py b/utama_core/entities/referee/stage.py index 5e92f726..b0693ed2 100644 --- a/utama_core/entities/referee/stage.py +++ b/utama_core/entities/referee/stage.py @@ -26,10 +26,6 @@ def from_id(stage_id: int): except ValueError: raise ValueError(f"Invalid stage ID: {stage_id}") - @property - def name(self): - return self.name - @property def stage_id(self): return self.value diff --git a/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py b/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py index 1eb026da..0ae38885 100644 --- a/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py +++ b/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py @@ -14,6 +14,7 @@ ) from utama_core.entities.data.command import RobotResponse from utama_core.entities.data.raw_vision import RawBallData, RawRobotData, RawVisionData +from utama_core.entities.data.referee import RefereeData from utama_core.global_utils.math_utils import ( deg_to_rad, normalise_heading_deg, @@ -173,10 +174,11 @@ def _frame_to_observations( ) -> Tuple[RawVisionData, RobotResponse, RobotResponse]: """Return observation data that aligns with grSim. There may be Gaussian noise and vanishing added. - Returns (vision_observation, yellow_robot_feedback, blue_robot_feedback) + Returns (vision_observation, yellow_robot_feedback, blue_robot_feedback, referee_data) vision_observation: closely aligned to SSLVision that returns a FramData object yellow_robots_info: feedback from individual yellow robots that returns a List[RobotInfo] blue_robots_info: feedback from individual blue robots that returns a List[RobotInfo] + referee_data: current referee state from embedded referee state machine """ if self.latest_observation[0] == self.steps: @@ -216,6 +218,9 @@ def _frame_to_observations( # note that ball_obs stored in list to standardise with SSLVision # As there is sometimes multiple possible positions for the ball + # Get referee data + # current_time = self.time_step * self.steps + # Camera id as 0, only one camera for RSim result = ( RawVisionData(self.time_step * self.steps, yellow_obs, blue_obs, ball_obs, 0), diff --git a/utama_core/rsoccer_simulator/src/ssl/referee_state_machine.py b/utama_core/rsoccer_simulator/src/ssl/referee_state_machine.py new file mode 100644 index 00000000..caa8ec2a --- /dev/null +++ b/utama_core/rsoccer_simulator/src/ssl/referee_state_machine.py @@ -0,0 +1,289 @@ +"""Embedded referee state machine for RSim environment. + +This module implements a referee system for the RSim SSL environment that +generates RefereeData synchronously with simulation steps, maintaining +interface compatibility with network-based referee systems. +""" + +import logging +from typing import Optional + +import numpy as np + +from utama_core.entities.data.referee import RefereeData +from utama_core.entities.game.team_info import TeamInfo +from utama_core.entities.referee.referee_command import RefereeCommand +from utama_core.entities.referee.stage import Stage +from utama_core.rsoccer_simulator.src.Entities import Frame + +logger = logging.getLogger(__name__) + + +class RefereeStateMachine: + """Manages referee state and generates RefereeData for RSim. + + This class detects game events from simulation state, tracks scores and + timers, and generates valid RefereeData objects compatible with the + network-based referee system used in GRSIM/REAL modes. + + Attributes: + stage: Current game stage (NORMAL_FIRST_HALF, etc.) + command: Current referee command (HALT, STOP, NORMAL_START, etc.) + command_counter: Increments each time command changes + command_timestamp: Timestamp when current command was issued + stage_start_time: When current stage started + stage_duration: Duration of current stage in seconds + yellow_team: Team info for yellow team (score, cards, etc.) + blue_team: Team info for blue team + """ + + def __init__( + self, + n_robots_blue: int, + n_robots_yellow: int, + field, + initial_stage: Stage = Stage.NORMAL_FIRST_HALF_PRE, + initial_command: RefereeCommand = RefereeCommand.HALT, + ): + """Initialize referee state machine. + + Args: + n_robots_blue: Number of blue robots + n_robots_yellow: Number of yellow robots + field: Field object with dimensions + initial_stage: Starting game stage + initial_command: Starting referee command + """ + self.n_robots_blue = n_robots_blue + self.n_robots_yellow = n_robots_yellow + self.field = field + + # Field dimensions (meters) + self.field_half_length = field.length / 2 + self.field_half_width = field.width / 2 + + # State tracking + self.stage = initial_stage + self.command = initial_command + self.command_counter = 0 + self.command_timestamp = 0.0 + + # Timers + self.stage_start_time = 0.0 + self.stage_duration = 300.0 # 5 minutes per half + self.action_timeout = None + + # Team info + self.yellow_team = TeamInfo( + name="Yellow", + score=0, + red_cards=0, + yellow_card_times=[], + yellow_cards=0, + timeouts=4, + timeout_time=300, + goalkeeper=0, + foul_counter=0, + ball_placement_failures=0, + can_place_ball=True, + max_allowed_bots=n_robots_yellow, + bot_substitution_intent=False, + bot_substitution_allowed=True, + bot_substitutions_left=5, + ) + + self.blue_team = TeamInfo( + name="Blue", + score=0, + red_cards=0, + yellow_card_times=[], + yellow_cards=0, + timeouts=4, + timeout_time=300, + goalkeeper=0, + foul_counter=0, + ball_placement_failures=0, + can_place_ball=True, + max_allowed_bots=n_robots_blue, + bot_substitution_intent=False, + bot_substitution_allowed=True, + bot_substitutions_left=5, + ) + + # Game state + self.ball_last_touched_by = None + self.ball_placement_target = None + self.next_command = None + self.goal_scored_by = None + + # Event detection state + self.last_ball_position = None + self.last_goal_time = 0.0 + self.goal_cooldown = 0.5 # seconds before detecting another goal + + logger.info( + "RefereeStateMachine initialized: stage=%s, command=%s", + self.stage.name, + self.command.name, + ) + + def update(self, frame: Frame, current_time: float) -> None: + """Update referee state based on simulation frame. + + This should be called every simulation step. + + Args: + frame: Current simulation frame with ball and robot positions + current_time: Current simulation time in seconds + """ + # Detect and process game events + self._detect_and_process_events(frame, current_time) + + # Update timers + self._update_timers(current_time) + + def _detect_and_process_events(self, frame: Frame, current_time: float) -> None: + """Detect game events and update state accordingly. + + Args: + frame: Current simulation frame + current_time: Current simulation time + """ + # Goal detection (with cooldown to prevent multiple detections) + if current_time - self.last_goal_time > self.goal_cooldown: + if self._is_goal(frame): + self._process_goal(current_time) + + def _is_goal(self, frame: Frame) -> bool: + """Detect if ball is in goal. + + Args: + frame: Current simulation frame + + Returns: + True if goal scored, False otherwise + """ + ball = frame.ball + # goal_depth = 0.2 # meters behind goal line + goal_width = self.field.goal_width / 2 # half width + + # Left goal (yellow defends) - negative x + if ball.x < -self.field_half_length and abs(ball.y) < goal_width: + self.goal_scored_by = "blue" + logger.info("Goal scored by blue team!") + return True + + # Right goal (blue defends) - positive x + if ball.x > self.field_half_length and abs(ball.y) < goal_width: + self.goal_scored_by = "yellow" + logger.info("Goal scored by yellow team!") + return True + + return False + + def _process_goal(self, current_time: float) -> None: + """Process a goal event. + + Updates score, sets STOP command, and prepares kickoff for opposite team. + + Args: + current_time: Time when goal was scored + """ + if self.goal_scored_by == "yellow": + self.yellow_team = self.yellow_team._replace(score=self.yellow_team.score + 1) + self.next_command = RefereeCommand.PREPARE_KICKOFF_BLUE + logger.info("Yellow scored! Score: Yellow %d - Blue %d", self.yellow_team.score, self.blue_team.score) + elif self.goal_scored_by == "blue": + self.blue_team = self.blue_team._replace(score=self.blue_team.score + 1) + self.next_command = RefereeCommand.PREPARE_KICKOFF_YELLOW + logger.info("Blue scored! Score: Yellow %d - Blue %d", self.yellow_team.score, self.blue_team.score) + + # Set STOP command after goal + self.command = RefereeCommand.STOP + self.command_counter += 1 + self.command_timestamp = current_time + self.last_goal_time = current_time + + logger.info( + "Referee command: STOP (after goal), next: %s", self.next_command.name if self.next_command else "None" + ) + + def _update_timers(self, current_time: float) -> None: + """Update stage and action timers. + + Args: + current_time: Current simulation time + """ + # Stage timer automatically counts down based on elapsed time + # No action needed here, calculated in _generate_referee_data() + pass + + def _generate_referee_data(self, current_time: float) -> RefereeData: + """Generate RefereeData from current state. + + Args: + current_time: Current simulation time + + Returns: + RefereeData object with current referee state + """ + stage_time_left = max(0, self.stage_duration - (current_time - self.stage_start_time)) + + return RefereeData( + source_identifier="rsim-embedded", + time_sent=current_time, + time_received=current_time, + referee_command=self.command, + referee_command_timestamp=self.command_timestamp, + stage=self.stage, + stage_time_left=stage_time_left, + blue_team=self.blue_team, + yellow_team=self.yellow_team, + designated_position=self.ball_placement_target, + blue_team_on_positive_half=None, + next_command=self.next_command, + current_action_time_remaining=self.action_timeout, + ) + + def get_referee_data(self, current_time: float) -> RefereeData: + """Get current referee data without updating state. + + Args: + current_time: Current simulation time + + Returns: + RefereeData object with current referee state + """ + return self._generate_referee_data(current_time) + + def set_command(self, command: RefereeCommand, timestamp: float = None) -> None: + """Manually set referee command (for testing/scenarios). + + Args: + command: Referee command to set + timestamp: Optional timestamp, uses current command timestamp if None + """ + self.command = command + self.command_counter += 1 + if timestamp is not None: + self.command_timestamp = timestamp + logger.info("Referee command manually set to: %s", command.name) + + def advance_stage(self, new_stage: Stage, timestamp: float) -> None: + """Manually advance to a new stage. + + Args: + new_stage: Stage to advance to + timestamp: Timestamp when stage change occurs + """ + logger.info("Stage advancing from %s to %s", self.stage.name, new_stage.name) + self.stage = new_stage + self.stage_start_time = timestamp + + # Set appropriate duration for new stage + if new_stage in [Stage.NORMAL_FIRST_HALF, Stage.NORMAL_SECOND_HALF]: + self.stage_duration = 300.0 # 5 minutes + elif new_stage in [Stage.EXTRA_FIRST_HALF, Stage.EXTRA_SECOND_HALF]: + self.stage_duration = 150.0 # 2.5 minutes + else: + self.stage_duration = 0.0 diff --git a/utama_core/run/strategy_runner.py b/utama_core/run/strategy_runner.py index f2b1f602..e0c916ea 100644 --- a/utama_core/run/strategy_runner.py +++ b/utama_core/run/strategy_runner.py @@ -20,9 +20,10 @@ MAX_GAME_HISTORY, TIMESTEP, ) -from utama_core.data_processing.receivers import VisionReceiver +from utama_core.data_processing.receivers import RefereeMessageReceiver, VisionReceiver from utama_core.data_processing.refiners import ( PositionRefiner, + RefereeRefiner, RobotInfoRefiner, VelocityRefiner, ) @@ -150,6 +151,7 @@ def __init__( self.vision_buffers, self.ref_buffer = self._setup_vision_and_referee() assert_valid_bounding_box(self.field_bounds) + self.referee_refiner = RefereeRefiner() self.my, self.opp = self._setup_sides_data( strategy, opp_strategy, filtering, control_scheme, opp_control_scheme @@ -233,25 +235,23 @@ def data_update_listener(self, receiver: VisionReceiver): # Start receiving game data; this will run in a separate thread. receiver.pull_game_data() - def start_threads(self, vision_receiver: VisionReceiver): # , referee_receiver): - """Start background threads for receiving vision (and referee) data. + def start_threads(self, vision_receiver: VisionReceiver, referee_receiver: RefereeMessageReceiver): + """Start background threads for receiving vision and referee data. Starts daemon threads so they do not prevent process exit. Args: vision_receiver: VisionReceiver to run in a background thread. + referee_receiver: RefereeMessageReceiver to run in a background thread. """ - # Start the data receiving in separate threads vision_thread = threading.Thread(target=vision_receiver.pull_game_data) - # referee_thread = threading.Thread(target=referee_receiver.pull_referee_data) + referee_thread = threading.Thread(target=referee_receiver.pull_referee_data) - # Allows the thread to close when the main program exits vision_thread.daemon = True - # referee_thread.daemon = True + referee_thread.daemon = True - # Start both thread vision_thread.start() - # referee_thread.start() + referee_thread.start() def _setup_sides_data( self, @@ -395,10 +395,10 @@ def _setup_vision_and_referee(self) -> Tuple[deque, deque]: """ vision_buffers = [deque(maxlen=1) for _ in range(MAX_CAMERAS)] ref_buffer = deque(maxlen=1) - # referee_receiver = RefereeMessageReceiver(ref_buffer, debug=False) vision_receiver = VisionReceiver(vision_buffers) if self.mode != Mode.RSIM: - self.start_threads(vision_receiver) # , referee_receiver) + referee_receiver = RefereeMessageReceiver(ref_buffer) + self.start_threads(vision_receiver, referee_receiver) return vision_buffers, ref_buffer @@ -774,20 +774,28 @@ def _run_step(self): """ frame_start = time.perf_counter() if self.mode == Mode.RSIM: - vision_frames = [self.rsim_env._frame_to_observations()[0]] + obs = self.rsim_env._frame_to_observations() + if len(obs) == 4: + # New format with referee embedded in observations + vision_frames = [obs[0]] + referee_data = obs[3] + else: + # Standard format — check ref_buffer for externally injected referee data + vision_frames = [obs[0]] + referee_data = self.ref_buffer.popleft() if self.ref_buffer else None else: vision_frames = [buffer.popleft() if buffer else None for buffer in self.vision_buffers] - # referee_frame = ref_buffer.popleft() + referee_data = self.ref_buffer.popleft() if self.ref_buffer else None # alternate between opp and friendly playing if self.toggle_opp_first: if self.opp: - self._step_game(vision_frames, True) - self._step_game(vision_frames, False) + self._step_game(vision_frames, referee_data, True) + self._step_game(vision_frames, referee_data, False) else: - self._step_game(vision_frames, False) + self._step_game(vision_frames, referee_data, False) if self.opp: - self._step_game(vision_frames, True) + self._step_game(vision_frames, referee_data, True) self.toggle_opp_first = not self.toggle_opp_first # --- rate limiting --- @@ -807,8 +815,29 @@ def _run_step(self): if self.elapsed_time >= FPS_PRINT_INTERVAL: fps = self.num_frames_elapsed / self.elapsed_time - # Update the live FPS area (one line, no box) - self._fps_live.update(Text(f"FPS: {fps:.2f}")) + ref = self.referee_refiner + stage_secs = ref.stage_time_left + stage_min = int(stage_secs // 60) + stage_sec = int(stage_secs % 60) + + display = Text() + display.append(f"FPS: {fps:.1f}", style="bold cyan") + display.append(" | ") + display.append(ref.last_command.name, style="bold yellow") + display.append(" | ") + display.append(ref.stage.name.replace("_", " ").title()) + display.append(" | Blue ") + display.append(str(ref.blue_team.score), style="bold blue") + display.append(" - ") + display.append(str(ref.yellow_team.score), style="bold yellow") + display.append(" Yellow") + display.append(f" | {stage_min}:{stage_sec:02d} left") + + last_ref = self.referee_refiner._referee_records[-1] if self.referee_refiner._referee_records else None + if last_ref and last_ref.status_message: + display.append(f" | {last_ref.status_message}", style="dim") + + self._fps_live.update(display) self._fps_live.refresh() self.elapsed_time = 0.0 @@ -817,12 +846,14 @@ def _run_step(self): def _step_game( self, vision_frames: List[RawVisionData], + referee_data, running_opp: bool, ): """Step the game for the robot controller and strategy. Args: vision_frames (List[RawVisionData]): The vision frames. + referee_data: The referee data from RSim or network receiver. running_opp (bool): Whether to run the opponent strategy. """ side = self.opp if running_opp else self.my @@ -834,7 +865,7 @@ def _step_game( new_game_frame = side.position_refiner.refine(side.current_game_frame, vision_frames) new_game_frame = side.velocity_refiner.refine(side.game_history, new_game_frame) # , robot_frame.imu_data) new_game_frame = side.robot_info_refiner.refine(new_game_frame, responses) - # new_game_frame = self.referee_refiner.refine(new_game_frame, responses) + new_game_frame = self.referee_refiner.refine(new_game_frame, referee_data) # Store updated game frame side.current_game_frame = new_game_frame diff --git a/utama_core/strategy/common/abstract_strategy.py b/utama_core/strategy/common/abstract_strategy.py index 5a1a4dd7..975568b1 100644 --- a/utama_core/strategy/common/abstract_strategy.py +++ b/utama_core/strategy/common/abstract_strategy.py @@ -68,7 +68,20 @@ class AbstractStrategy(ABC): ################################################# def __init__(self): - self.behaviour_tree = py_trees.trees.BehaviourTree(self.create_behaviour_tree()) + # Lazy import to break the circular dependency: + # abstract_strategy → referee.tree → referee.conditions → abstract_behaviour + # → strategy.common.__init__ → abstract_strategy + from utama_core.strategy.referee.tree import build_referee_override_tree + + strategy_subtree = self.create_behaviour_tree() + + # Wrap the user's strategy tree with the referee override layer (Option B). + # The root Selector checks referee commands first; if none match (e.g. NORMAL_START + # or FORCE_START), it falls through to the strategy subtree. + root = py_trees.composites.Selector(name="Root", memory=False) + root.add_children([build_referee_override_tree(), strategy_subtree]) + + self.behaviour_tree = py_trees.trees.BehaviourTree(root) ### These attributes are set by the StrategyRunner before the strategy is run. ### self.robot_controller: AbstractRobotController = None diff --git a/utama_core/strategy/referee/__init__.py b/utama_core/strategy/referee/__init__.py new file mode 100644 index 00000000..54ab846e --- /dev/null +++ b/utama_core/strategy/referee/__init__.py @@ -0,0 +1,2 @@ +from utama_core.strategy.referee.conditions import CheckRefereeCommand +from utama_core.strategy.referee.tree import build_referee_override_tree diff --git a/utama_core/strategy/referee/actions.py b/utama_core/strategy/referee/actions.py new file mode 100644 index 00000000..a2d5f049 --- /dev/null +++ b/utama_core/strategy/referee/actions.py @@ -0,0 +1,375 @@ +"""Hardcoded action nodes for each referee game state. + +Each node: + - Reads game state from blackboard.game + - Writes robot commands to blackboard.cmd_map for every friendly robot + - Returns RUNNING (the parent Selector holds here until the command changes) + +All positions are in the ssl-vision coordinate system (metres). +Team side is resolved at tick-time via game.my_team_is_yellow and +game.my_team_is_right so no construction-time team colour is needed. +""" + +import math + +import py_trees + +from utama_core.entities.data.vector import Vector2D +from utama_core.entities.referee.referee_command import RefereeCommand +from utama_core.skills.src.utils.move_utils import empty_command, move +from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour + +# SSL Div B field constants (metres) +_PENALTY_MARK_DIST = 6.0 # distance from goal centre to penalty mark +_HALF_FIELD_X = 4.5 # half field length +_CENTRE_CIRCLE_R = 0.5 # centre circle radius +_BALL_KEEP_DIST = 0.55 # ≥0.5 m required; 5 cm buffer +_PENALTY_BEHIND_OFFSET = 0.4 # robots must be ≥0.4 m behind penalty mark +_OPP_DEF_AREA_KEEP_DIST = 0.25 # ≥0.2 m from opponent defence area; 5 cm buffer + + +def _all_stop(blackboard) -> py_trees.common.Status: + """Send empty_command to every friendly robot and return RUNNING.""" + for robot_id in blackboard.game.friendly_robots: + blackboard.cmd_map[robot_id] = empty_command(False) + return py_trees.common.Status.RUNNING + + +# --------------------------------------------------------------------------- +# HALT — zero velocity, highest priority +# --------------------------------------------------------------------------- + + +class HaltStep(AbstractBehaviour): + """Sends zero-velocity commands to all friendly robots. + + Required: robots must stop immediately on HALT (2-second grace period allowed). + """ + + def update(self) -> py_trees.common.Status: + return _all_stop(self.blackboard) + + +# --------------------------------------------------------------------------- +# STOP — stop in place (≤1.5 m/s; ≥0.5 m from ball) +# Stopping cold satisfies both constraints. +# --------------------------------------------------------------------------- + + +class StopStep(AbstractBehaviour): + """Sends zero-velocity commands to all friendly robots. + + Complies with STOP: robots are stationary, so speed = 0 m/s ≤ 1.5 m/s + and they do not approach the ball. + """ + + def update(self) -> py_trees.common.Status: + return _all_stop(self.blackboard) + + +# --------------------------------------------------------------------------- +# BALL PLACEMENT — ours +# --------------------------------------------------------------------------- + + +class BallPlacementOursStep(AbstractBehaviour): + """Moves the closest friendly robot to the designated_position to place the ball. + + All other robots stop in place. If can_place_ball is False, all robots stop. + + The placing robot drives toward designated_position using the move() skill. + Ball capture and release are handled by the dribbler (future: dribble_subtree). + """ + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + ref = game.referee + motion_controller = self.blackboard.motion_controller + + # Determine which team is ours + our_team = ref.yellow_team if game.my_team_is_yellow else ref.blue_team + if not getattr(our_team, "can_place_ball", True): + return _all_stop(self.blackboard) + + target = ref.designated_position + if target is None: + return _all_stop(self.blackboard) + + target_pos = Vector2D(target[0], target[1]) + ball = game.ball + + # Pick the placer: robot closest to the ball + placer_id = min( + game.friendly_robots, + key=lambda rid: game.friendly_robots[rid].p.distance_to(ball.p) if ball else float("inf"), + ) + + for robot_id in game.friendly_robots: + if robot_id == placer_id: + # Face the target while approaching + robot = game.friendly_robots[robot_id] + oren = robot.p.angle_to(target_pos) + self.blackboard.cmd_map[robot_id] = move( + game, motion_controller, robot_id, target_pos, oren, dribbling=True + ) + else: + self.blackboard.cmd_map[robot_id] = empty_command(False) + + return py_trees.common.Status.RUNNING + + +# --------------------------------------------------------------------------- +# BALL PLACEMENT — theirs +# --------------------------------------------------------------------------- + + +class BallPlacementTheirsStep(AbstractBehaviour): + """Stops all friendly robots during the opponent's ball placement. + + Robots stopped in place are guaranteed not to approach the ball or interfere + with the placement. Active clearance (move ≥0.5 m from ball) is a future + enhancement. + """ + + def update(self) -> py_trees.common.Status: + return _all_stop(self.blackboard) + + +# --------------------------------------------------------------------------- +# PREPARE_KICKOFF — ours +# --------------------------------------------------------------------------- + +# Kickoff formation positions (own half, outside centre circle). +# Relative x is negative = own half when we are on the right; sign is flipped below. +_KICKOFF_SUPPORT_POSITIONS_RIGHT = [ + Vector2D(-0.8, 0.5), + Vector2D(-0.8, -0.5), + Vector2D(-1.5, 0.8), + Vector2D(-1.5, -0.8), + Vector2D(-2.5, 0.0), +] +_KICKOFF_SUPPORT_POSITIONS_LEFT = [Vector2D(-p.x, p.y) for p in _KICKOFF_SUPPORT_POSITIONS_RIGHT] + + +class PrepareKickoffOursStep(AbstractBehaviour): + """Positions robots for our kickoff. + + Robot with the lowest ID approaches the ball at (0, 0). + All other robots move to own-half support positions outside the centre circle. + """ + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + motion_controller = self.blackboard.motion_controller + + robot_ids = sorted(game.friendly_robots.keys()) + kicker_id = robot_ids[0] + + # Support positions depend on which side we defend + support_positions = ( + _KICKOFF_SUPPORT_POSITIONS_RIGHT if game.my_team_is_right else _KICKOFF_SUPPORT_POSITIONS_LEFT + ) + + support_idx = 0 + for robot_id in robot_ids: + if robot_id == kicker_id: + # Approach the ball at centre, face the opponent goal + target = Vector2D(0.0, 0.0) + goal_x = _HALF_FIELD_X if not game.my_team_is_right else -_HALF_FIELD_X + oren = math.atan2(0.0 - target.y, goal_x - target.x) + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, target, oren) + else: + pos = support_positions[support_idx % len(support_positions)] + support_idx += 1 + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, pos, 0.0) + + return py_trees.common.Status.RUNNING + + +# --------------------------------------------------------------------------- +# PREPARE_KICKOFF — theirs +# --------------------------------------------------------------------------- + +_KICKOFF_DEFENCE_POSITIONS_RIGHT = [ + Vector2D(-0.8, 0.4), + Vector2D(-0.8, -0.4), + Vector2D(-1.5, 0.6), + Vector2D(-1.5, -0.6), + Vector2D(-2.5, 0.0), + Vector2D(-1.5, 0.0), +] +_KICKOFF_DEFENCE_POSITIONS_LEFT = [Vector2D(-p.x, p.y) for p in _KICKOFF_DEFENCE_POSITIONS_RIGHT] + + +class PrepareKickoffTheirsStep(AbstractBehaviour): + """Moves all our robots to own half, outside the centre circle, for the opponent kickoff.""" + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + motion_controller = self.blackboard.motion_controller + + positions = _KICKOFF_DEFENCE_POSITIONS_RIGHT if game.my_team_is_right else _KICKOFF_DEFENCE_POSITIONS_LEFT + + for idx, robot_id in enumerate(sorted(game.friendly_robots.keys())): + pos = positions[idx % len(positions)] + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, pos, 0.0) + + return py_trees.common.Status.RUNNING + + +# --------------------------------------------------------------------------- +# PREPARE_PENALTY — ours +# --------------------------------------------------------------------------- + + +class PreparePenaltyOursStep(AbstractBehaviour): + """Positions robots for our penalty kick. + + Kicker (lowest non-keeper ID): moves to our penalty mark, faces goal. + All others: stop on a line 0.4 m behind the penalty mark (on own side). + + Penalty mark is at (opp_goal_x ∓ 6.0, 0), sign depends on which side we attack. + """ + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + ref = game.referee + motion_controller = self.blackboard.motion_controller + + # Our goalkeeper ID from the referee packet + our_team_info = ref.yellow_team if game.my_team_is_yellow else ref.blue_team + keeper_id = our_team_info.goalkeeper + + # Opponent goal is on the right if we are on the right, else on the left + opp_goal_x = _HALF_FIELD_X if not game.my_team_is_right else -_HALF_FIELD_X + sign = 1 if not game.my_team_is_right else -1 + penalty_mark = Vector2D(opp_goal_x - sign * _PENALTY_MARK_DIST, 0.0) + behind_line_x = penalty_mark.x - sign * _PENALTY_BEHIND_OFFSET + + goal_oren = math.atan2(0.0, opp_goal_x - penalty_mark.x) + + robot_ids = sorted(game.friendly_robots.keys()) + non_keeper_ids = [rid for rid in robot_ids if rid != keeper_id] + kicker_id = non_keeper_ids[0] if non_keeper_ids else robot_ids[0] + + behind_idx = 0 + behind_y_step = 0.35 + for robot_id in robot_ids: + if robot_id == kicker_id: + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, penalty_mark, goal_oren) + else: + # Place behind the line, spread in y + offset = (behind_idx - (len(robot_ids) - 1) / 2.0) * behind_y_step + pos = Vector2D(behind_line_x, offset) + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, pos, 0.0) + behind_idx += 1 + + return py_trees.common.Status.RUNNING + + +# --------------------------------------------------------------------------- +# PREPARE_PENALTY — theirs +# --------------------------------------------------------------------------- + + +class PreparePenaltyTheirsStep(AbstractBehaviour): + """Positions our robots for the opponent's penalty kick. + + Goalkeeper: moves to our goal line centre. + All others: move to a line 0.4 m behind the penalty mark on our half. + """ + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + ref = game.referee + motion_controller = self.blackboard.motion_controller + + our_team_info = ref.yellow_team if game.my_team_is_yellow else ref.blue_team + keeper_id = our_team_info.goalkeeper + + # Our goal is on the right if my_team_is_right, else on the left + our_goal_x = _HALF_FIELD_X if game.my_team_is_right else -_HALF_FIELD_X + sign = 1 if game.my_team_is_right else -1 + + # Opponent's penalty mark is in their half attacking our goal + opp_penalty_mark_x = our_goal_x - sign * _PENALTY_MARK_DIST + behind_line_x = opp_penalty_mark_x + sign * _PENALTY_BEHIND_OFFSET + + robot_ids = sorted(game.friendly_robots.keys()) + behind_idx = 0 + behind_y_step = 0.35 + + for robot_id in robot_ids: + if robot_id == keeper_id: + # Keeper on own goal line, facing the incoming ball + keeper_pos = Vector2D(our_goal_x, 0.0) + self.blackboard.cmd_map[robot_id] = move( + game, motion_controller, robot_id, keeper_pos, math.pi if game.my_team_is_right else 0.0 + ) + else: + offset = (behind_idx - (len(robot_ids) - 1) / 2.0) * behind_y_step + pos = Vector2D(behind_line_x, offset) + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, pos, 0.0) + behind_idx += 1 + + return py_trees.common.Status.RUNNING + + +# --------------------------------------------------------------------------- +# DIRECT_FREE — ours +# --------------------------------------------------------------------------- + + +class DirectFreeOursStep(AbstractBehaviour): + """Positions our robots for our direct free kick. + + The robot closest to the ball becomes the kicker and drives toward the ball. + All other robots stop in place (they may be repositioned by the strategy tree + after NORMAL_START transitions the override layer to pass-through). + """ + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + motion_controller = self.blackboard.motion_controller + ball = game.ball + + kicker_id = min( + game.friendly_robots, + key=lambda rid: game.friendly_robots[rid].p.distance_to(ball.p) if ball else float("inf"), + ) + + for robot_id in game.friendly_robots: + if robot_id == kicker_id and ball: + robot = game.friendly_robots[robot_id] + oren = robot.p.angle_to(ball.p) + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, ball.p, oren) + else: + self.blackboard.cmd_map[robot_id] = empty_command(False) + + return py_trees.common.Status.RUNNING + + +# --------------------------------------------------------------------------- +# DIRECT_FREE — theirs +# --------------------------------------------------------------------------- + + +class DirectFreeTheirsStep(AbstractBehaviour): + """Stops all our robots during the opponent's direct free kick. + + All robots must remain ≥ 0.5 m from the ball. Stopping in place satisfies this + assuming robots are not already within 0.5 m (future: add active clearance). + """ + + def update(self) -> py_trees.common.Status: + return _all_stop(self.blackboard) + + +# --------------------------------------------------------------------------- +# Helper: resolve bilateral commands +# --------------------------------------------------------------------------- + + +def is_our_command(command: RefereeCommand, our_command: RefereeCommand, their_command: RefereeCommand) -> bool: + """Not used directly — bilateral resolution is done in tree.py via command sets.""" + return command == our_command diff --git a/utama_core/strategy/referee/conditions.py b/utama_core/strategy/referee/conditions.py new file mode 100644 index 00000000..a0c100d1 --- /dev/null +++ b/utama_core/strategy/referee/conditions.py @@ -0,0 +1,31 @@ +from typing import Tuple + +import py_trees + +from utama_core.entities.referee.referee_command import RefereeCommand +from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour + + +class CheckRefereeCommand(AbstractBehaviour): + """Returns SUCCESS if the current referee command matches any of the given expected commands. + + Returns FAILURE if there is no referee data, or if the command does not match. + Used as the first child of each referee subtree's Sequence so the Sequence fails fast + and the parent Selector moves on to the next subtree. + + Args: + expected_commands: One or more RefereeCommand values to match against. + """ + + def __init__(self, *expected_commands: RefereeCommand): + name = "CheckCmd?" + "|".join(c.name for c in expected_commands) + super().__init__(name=name) + self.expected_commands: Tuple[RefereeCommand, ...] = expected_commands + + def update(self) -> py_trees.common.Status: + ref = self.blackboard.game.referee + if ref is None: + return py_trees.common.Status.FAILURE + if ref.referee_command in self.expected_commands: + return py_trees.common.Status.SUCCESS + return py_trees.common.Status.FAILURE diff --git a/utama_core/strategy/referee/tree.py b/utama_core/strategy/referee/tree.py new file mode 100644 index 00000000..01413d68 --- /dev/null +++ b/utama_core/strategy/referee/tree.py @@ -0,0 +1,270 @@ +"""Factory for the RefereeOverride subtree. + +The RefereeOverride Selector sits as the first (highest-priority) child of the root +Selector in AbstractStrategy. Each child is a Sequence: + + Sequence + ├── CheckRefereeCommand(expected_command, ...) ← FAILURE if no match → Selector continues + └── ← RUNNING while command is active + +When no override matches (e.g. NORMAL_START, FORCE_START), the Selector falls through +to the user's strategy subtree. + +Bilateral commands (KICKOFF / PENALTY / FREE_KICK / BALL_PLACEMENT) are split into +"ours" and "theirs" at tick-time: each action node reads my_team_is_yellow from the +game frame, so no construction-time team colour is needed. +""" + +import py_trees + +from utama_core.entities.referee.referee_command import RefereeCommand +from utama_core.strategy.referee.actions import ( + BallPlacementOursStep, + BallPlacementTheirsStep, + DirectFreeOursStep, + DirectFreeTheirsStep, + HaltStep, + PrepareKickoffOursStep, + PrepareKickoffTheirsStep, + PreparePenaltyOursStep, + PreparePenaltyTheirsStep, + StopStep, +) +from utama_core.strategy.referee.conditions import CheckRefereeCommand + + +def _make_subtree(name: str, condition: CheckRefereeCommand, action: py_trees.behaviour.Behaviour): + """Create a Sequence([condition, action]) subtree for one referee command group.""" + seq = py_trees.composites.Sequence(name=name, memory=False) + seq.add_children([condition, action]) + return seq + + +def build_referee_override_tree() -> py_trees.composites.Selector: + """Build and return the RefereeOverride Selector. + + The returned Selector should be added as the *first* child of the root Selector + in AbstractStrategy so that referee compliance always takes priority over strategy. + + Ours-vs-theirs resolution for bilateral commands: + Each pair of action nodes (e.g. BallPlacementOursStep / BallPlacementTheirsStep) + reads my_team_is_yellow from the game frame at tick-time to determine which role + to play. The CheckRefereeCommand condition simply checks which specific command + (YELLOW or BLUE variant) is active; the action node then maps that to our/their role. + + Priority order (top = highest): + 1. HALT — immediate stop, no exceptions + 2. STOP — slowed stop, keep distance from ball + 3. TIMEOUT — idle (same as STOP) + 4. BALL_PLACEMENT — ours or theirs + 5. PREPARE_KICKOFF + 6. PREPARE_PENALTY + 7. DIRECT_FREE + """ + override = py_trees.composites.Selector(name="RefereeOverride", memory=False) + + # 1. HALT + override.add_child( + _make_subtree( + "Halt", + CheckRefereeCommand(RefereeCommand.HALT), + HaltStep(name="HaltStep"), + ) + ) + + # 2. STOP + override.add_child( + _make_subtree( + "Stop", + CheckRefereeCommand(RefereeCommand.STOP), + StopStep(name="StopStep"), + ) + ) + + # 3. TIMEOUT (yellow or blue — same behaviour: idle) + override.add_child( + _make_subtree( + "Timeout", + CheckRefereeCommand(RefereeCommand.TIMEOUT_YELLOW, RefereeCommand.TIMEOUT_BLUE), + StopStep(name="TimeoutStop"), + ) + ) + + # 4a. BALL_PLACEMENT — yellow team places ball + override.add_child( + _make_subtree( + "BallPlacementYellow", + CheckRefereeCommand(RefereeCommand.BALL_PLACEMENT_YELLOW), + _BallPlacementDispatch(is_yellow_command=True, name="BallPlacementYellowStep"), + ) + ) + + # 4b. BALL_PLACEMENT — blue team places ball + override.add_child( + _make_subtree( + "BallPlacementBlue", + CheckRefereeCommand(RefereeCommand.BALL_PLACEMENT_BLUE), + _BallPlacementDispatch(is_yellow_command=False, name="BallPlacementBlueStep"), + ) + ) + + # 5a. PREPARE_KICKOFF — yellow team kicks off + override.add_child( + _make_subtree( + "KickoffYellow", + CheckRefereeCommand(RefereeCommand.PREPARE_KICKOFF_YELLOW), + _KickoffDispatch(is_yellow_command=True, name="KickoffYellowStep"), + ) + ) + + # 5b. PREPARE_KICKOFF — blue team kicks off + override.add_child( + _make_subtree( + "KickoffBlue", + CheckRefereeCommand(RefereeCommand.PREPARE_KICKOFF_BLUE), + _KickoffDispatch(is_yellow_command=False, name="KickoffBlueStep"), + ) + ) + + # 6a. PREPARE_PENALTY — yellow team takes penalty + override.add_child( + _make_subtree( + "PenaltyYellow", + CheckRefereeCommand(RefereeCommand.PREPARE_PENALTY_YELLOW), + _PenaltyDispatch(is_yellow_command=True, name="PenaltyYellowStep"), + ) + ) + + # 6b. PREPARE_PENALTY — blue team takes penalty + override.add_child( + _make_subtree( + "PenaltyBlue", + CheckRefereeCommand(RefereeCommand.PREPARE_PENALTY_BLUE), + _PenaltyDispatch(is_yellow_command=False, name="PenaltyBlueStep"), + ) + ) + + # 7a. DIRECT_FREE — yellow team's free kick + override.add_child( + _make_subtree( + "DirectFreeYellow", + CheckRefereeCommand(RefereeCommand.DIRECT_FREE_YELLOW), + _DirectFreeDispatch(is_yellow_command=True, name="DirectFreeYellowStep"), + ) + ) + + # 7b. DIRECT_FREE — blue team's free kick + override.add_child( + _make_subtree( + "DirectFreeBlue", + CheckRefereeCommand(RefereeCommand.DIRECT_FREE_BLUE), + _DirectFreeDispatch(is_yellow_command=False, name="DirectFreeBlueStep"), + ) + ) + + return override + + +# --------------------------------------------------------------------------- +# Dispatcher nodes +# +# Each dispatcher reads my_team_is_yellow from the game frame at tick-time +# and delegates to the correct Ours/Theirs action node. +# +# Using separate Ours/Theirs classes directly (rather than conditionals in +# a single node) keeps each action node's logic clean and single-purpose. +# The dispatcher is a thin routing layer that composes them. +# --------------------------------------------------------------------------- + +from utama_core.strategy.common.abstract_behaviour import ( # noqa: E402 + AbstractBehaviour, +) + + +class _BallPlacementDispatch(AbstractBehaviour): + """Routes to BallPlacementOursStep or BallPlacementTheirsStep at tick-time.""" + + def __init__(self, is_yellow_command: bool, name: str): + super().__init__(name=name) + self._is_yellow_command = is_yellow_command + self._ours = BallPlacementOursStep(name="BallPlacementOurs") + self._theirs = BallPlacementTheirsStep(name="BallPlacementTheirs") + + def setup_(self): + # Propagate setup to the inner nodes so their blackboards are initialised + self._ours.setup(is_opp_strat=False) + self._theirs.setup(is_opp_strat=False) + + def update(self) -> py_trees.common.Status: + if self._is_yellow_command == self.blackboard.game.my_team_is_yellow: + self._ours.blackboard = self.blackboard + return self._ours.update() + else: + self._theirs.blackboard = self.blackboard + return self._theirs.update() + + +class _KickoffDispatch(AbstractBehaviour): + """Routes to PrepareKickoffOursStep or PrepareKickoffTheirsStep at tick-time.""" + + def __init__(self, is_yellow_command: bool, name: str): + super().__init__(name=name) + self._is_yellow_command = is_yellow_command + self._ours = PrepareKickoffOursStep(name="KickoffOurs") + self._theirs = PrepareKickoffTheirsStep(name="KickoffTheirs") + + def setup_(self): + self._ours.setup(is_opp_strat=False) + self._theirs.setup(is_opp_strat=False) + + def update(self) -> py_trees.common.Status: + if self._is_yellow_command == self.blackboard.game.my_team_is_yellow: + self._ours.blackboard = self.blackboard + return self._ours.update() + else: + self._theirs.blackboard = self.blackboard + return self._theirs.update() + + +class _PenaltyDispatch(AbstractBehaviour): + """Routes to PreparePenaltyOursStep or PreparePenaltyTheirsStep at tick-time.""" + + def __init__(self, is_yellow_command: bool, name: str): + super().__init__(name=name) + self._is_yellow_command = is_yellow_command + self._ours = PreparePenaltyOursStep(name="PenaltyOurs") + self._theirs = PreparePenaltyTheirsStep(name="PenaltyTheirs") + + def setup_(self): + self._ours.setup(is_opp_strat=False) + self._theirs.setup(is_opp_strat=False) + + def update(self) -> py_trees.common.Status: + if self._is_yellow_command == self.blackboard.game.my_team_is_yellow: + self._ours.blackboard = self.blackboard + return self._ours.update() + else: + self._theirs.blackboard = self.blackboard + return self._theirs.update() + + +class _DirectFreeDispatch(AbstractBehaviour): + """Routes to DirectFreeOursStep or DirectFreeTheirsStep at tick-time.""" + + def __init__(self, is_yellow_command: bool, name: str): + super().__init__(name=name) + self._is_yellow_command = is_yellow_command + self._ours = DirectFreeOursStep(name="DirectFreeOurs") + self._theirs = DirectFreeTheirsStep(name="DirectFreeTheirs") + + def setup_(self): + self._ours.setup(is_opp_strat=False) + self._theirs.setup(is_opp_strat=False) + + def update(self) -> py_trees.common.Status: + if self._is_yellow_command == self.blackboard.game.my_team_is_yellow: + self._ours.blackboard = self.blackboard + return self._ours.update() + else: + self._theirs.blackboard = self.blackboard + return self._theirs.update() diff --git a/utama_core/tests/referee/__init__.py b/utama_core/tests/referee/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utama_core/tests/referee/referee_sim.py b/utama_core/tests/referee/referee_sim.py new file mode 100644 index 00000000..8f2ea5c3 --- /dev/null +++ b/utama_core/tests/referee/referee_sim.py @@ -0,0 +1,163 @@ +"""Visual RSim simulation for verifying referee command behaviour. + +Run with: + pixi run python utama_core/tests/referee/referee_sim.py + +What it does: + - Starts a 3v3 RSim with StartupStrategy as the base strategy. + - The RefereeOverride tree (built automatically by AbstractStrategy) intercepts + referee commands and overrides robot behaviour accordingly. + - A scripted referee cycles through all referee commands every few seconds so + you can watch how robots respond visually in the RSim window. + +Command cycle (each held for SECS_PER_COMMAND seconds): + 1. HALT → all robots stop immediately + 2. STOP → all robots stop in place + 3. TIMEOUT_YELLOW → all robots idle + 4. PREPARE_KICKOFF_YELLOW → robot 0 approaches centre, others fan out to own half + 5. PREPARE_KICKOFF_BLUE → all robots move to own-half defence positions + 6. PREPARE_PENALTY_YELLOW → kicker at penalty mark, others line up behind + 7. PREPARE_PENALTY_BLUE → goalkeeper on goal line, others line up behind mark + 8. DIRECT_FREE_YELLOW → closest robot approaches ball + 9. DIRECT_FREE_BLUE → all robots stop + 10. BALL_PLACEMENT_YELLOW → closest robot drives to designated position + 11. NORMAL_START → pass-through: StartupStrategy runs freely +""" + +import time + +from utama_core.entities.data.referee import RefereeData +from utama_core.entities.game.team_info import TeamInfo +from utama_core.entities.referee.referee_command import RefereeCommand +from utama_core.entities.referee.stage import Stage +from utama_core.run import StrategyRunner +from utama_core.tests.referee.wandering_strategy import WanderingStrategy + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +SECS_PER_COMMAND = 5.0 # seconds to hold each referee command before advancing +MY_TEAM_IS_YELLOW = True +MY_TEAM_IS_RIGHT = True +N_ROBOTS = 3 # robots per side + +# Ball placement designated position (used for BALL_PLACEMENT_* commands) +DESIGNATED_POSITION = (1.0, 0.5) + +# Goalkeeper robot ID used in PREPARE_PENALTY scenarios +GOALKEEPER_ID = 0 + +_COMMAND_SEQUENCE = [ + (RefereeCommand.NORMAL_START, "NORMAL_START — normal play (strategy free)"), + (RefereeCommand.HALT, "HALT — all robots stop immediately"), + (RefereeCommand.STOP, "STOP — all robots idle, keep ball distance"), + (RefereeCommand.TIMEOUT_YELLOW, "TIMEOUT_YELLOW — team idle"), + (RefereeCommand.PREPARE_KICKOFF_YELLOW, "PREPARE_KICKOFF_YELLOW — we kick off"), + (RefereeCommand.PREPARE_KICKOFF_BLUE, "PREPARE_KICKOFF_BLUE — opponent kicks off"), + (RefereeCommand.PREPARE_PENALTY_YELLOW, "PREPARE_PENALTY_YELLOW — our penalty"), + (RefereeCommand.PREPARE_PENALTY_BLUE, "PREPARE_PENALTY_BLUE — opponent penalty"), + (RefereeCommand.DIRECT_FREE_YELLOW, "DIRECT_FREE_YELLOW — our direct free kick"), + (RefereeCommand.DIRECT_FREE_BLUE, "DIRECT_FREE_BLUE — opponent direct free kick"), + (RefereeCommand.BALL_PLACEMENT_YELLOW, "BALL_PLACEMENT_YELLOW — we place the ball"), +] + + +# --------------------------------------------------------------------------- +# Scripted referee state machine +# --------------------------------------------------------------------------- + + +class _ScriptedReferee: + """Cycles through _COMMAND_SEQUENCE, advancing every SECS_PER_COMMAND seconds.""" + + def __init__(self): + self._index = 0 + self._start = time.time() + print("\n=== Referee Visualisation Simulation ===") + print(f"Each command lasts {SECS_PER_COMMAND}s. Press Ctrl+C to stop.\n") + self._print_current() + + def _print_current(self): + cmd, desc = _COMMAND_SEQUENCE[self._index] + print(f" [{self._index + 1}/{len(_COMMAND_SEQUENCE)}] {desc}") + + def current_data(self) -> RefereeData: + now = time.time() + if now - self._start >= SECS_PER_COMMAND: + self._index = (self._index + 1) % len(_COMMAND_SEQUENCE) + self._start = now + self._print_current() + + cmd, _ = _COMMAND_SEQUENCE[self._index] + + is_placement = cmd in ( + RefereeCommand.BALL_PLACEMENT_YELLOW, + RefereeCommand.BALL_PLACEMENT_BLUE, + ) + + yellow_gk = GOALKEEPER_ID if MY_TEAM_IS_YELLOW else GOALKEEPER_ID + blue_gk = GOALKEEPER_ID if not MY_TEAM_IS_YELLOW else GOALKEEPER_ID + + return RefereeData( + source_identifier="scripted", + time_sent=now, + time_received=now, + referee_command=cmd, + referee_command_timestamp=now, + stage=Stage.NORMAL_FIRST_HALF, + stage_time_left=300.0, + blue_team=_team_info(goalkeeper=blue_gk), + yellow_team=_team_info(goalkeeper=yellow_gk), + designated_position=DESIGNATED_POSITION if is_placement else None, + blue_team_on_positive_half=False, + ) + + +def _team_info(goalkeeper: int = 0) -> TeamInfo: + return TeamInfo( + name="Demo", + score=0, + red_cards=0, + yellow_card_times=[], + yellow_cards=0, + timeouts=2, + timeout_time=300_000_000, + goalkeeper=goalkeeper, + ) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +def main(): + scripted_referee = _ScriptedReferee() + + runner = StrategyRunner( + strategy=WanderingStrategy(), + my_team_is_yellow=MY_TEAM_IS_YELLOW, + my_team_is_right=MY_TEAM_IS_RIGHT, + mode="rsim", + exp_friendly=N_ROBOTS, + exp_enemy=N_ROBOTS, + print_real_fps=True, + ) + + # Patch _run_step to push scripted RefereeData into ref_buffer before each + # step. StrategyRunner._run_step now reads ref_buffer in RSim mode when + # _frame_to_observations returns the standard 3-tuple. + original_run_step = runner._run_step + + def _patched_run_step(): + runner.ref_buffer.append(scripted_referee.current_data()) + original_run_step() + + runner._run_step = _patched_run_step + + runner.run() + + +if __name__ == "__main__": + main() diff --git a/utama_core/tests/referee/test_referee_unit.py b/utama_core/tests/referee/test_referee_unit.py new file mode 100644 index 00000000..fd6f40af --- /dev/null +++ b/utama_core/tests/referee/test_referee_unit.py @@ -0,0 +1,556 @@ +"""Unit tests for the referee integration layer. + +Tests cover: + - RefereeData new fields (game_events, match_type, status_message) and custom __eq__ + - RefereeRefiner.refine injects data into GameFrame; deduplication logic + - Game.referee property proxies correctly from CurrentGameFrame + - CheckRefereeCommand condition node (SUCCESS / FAILURE / None-referee guard) + - Dispatcher routing (ours vs. theirs) for bilateral commands + - build_referee_override_tree structure and priority +""" + +from types import SimpleNamespace + +import py_trees +import pytest + +from utama_core.data_processing.refiners.referee import RefereeRefiner +from utama_core.entities.data.referee import RefereeData +from utama_core.entities.data.vector import Vector2D, Vector3D +from utama_core.entities.game.ball import Ball +from utama_core.entities.game.field import Field +from utama_core.entities.game.game import Game +from utama_core.entities.game.game_frame import GameFrame +from utama_core.entities.game.game_history import GameHistory +from utama_core.entities.game.robot import Robot +from utama_core.entities.game.team_info import TeamInfo +from utama_core.entities.referee.referee_command import RefereeCommand +from utama_core.entities.referee.stage import Stage +from utama_core.strategy.referee.conditions import CheckRefereeCommand +from utama_core.strategy.referee.tree import build_referee_override_tree + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _team_info(goalkeeper: int = 0) -> TeamInfo: + return TeamInfo( + name="TestTeam", + score=0, + red_cards=0, + yellow_card_times=[], + yellow_cards=0, + timeouts=0, + timeout_time=0, + goalkeeper=goalkeeper, + ) + + +def _make_referee_data( + command: RefereeCommand = RefereeCommand.HALT, + stage: Stage = Stage.NORMAL_FIRST_HALF, + game_events=None, + match_type: int = 0, + status_message=None, +) -> RefereeData: + return RefereeData( + source_identifier="test", + time_sent=1.0, + time_received=1.0, + referee_command=command, + referee_command_timestamp=1.0, + stage=stage, + stage_time_left=300.0, + blue_team=_team_info(goalkeeper=1), + yellow_team=_team_info(goalkeeper=2), + game_events=game_events if game_events is not None else [], + match_type=match_type, + status_message=status_message, + ) + + +def _robot(robot_id: int, x: float = 0.0, y: float = 0.0) -> Robot: + zv = Vector2D(0, 0) + return Robot(id=robot_id, is_friendly=True, has_ball=False, p=Vector2D(x, y), v=zv, a=zv, orientation=0.0) + + +def _ball(x: float = 0.0, y: float = 0.0) -> Ball: + zv = Vector3D(0, 0, 0) + return Ball(p=Vector3D(x, y, 0), v=zv, a=zv) + + +def _make_game_frame( + friendly_robots=None, + referee=None, + my_team_is_yellow: bool = True, + my_team_is_right: bool = True, +) -> GameFrame: + if friendly_robots is None: + friendly_robots = {0: _robot(0)} + return GameFrame( + ts=0.0, + my_team_is_yellow=my_team_is_yellow, + my_team_is_right=my_team_is_right, + friendly_robots=friendly_robots, + enemy_robots={}, + ball=_ball(), + referee=referee, + ) + + +def _make_game( + friendly_robots=None, + referee=None, + my_team_is_yellow: bool = True, + my_team_is_right: bool = True, +) -> Game: + frame = _make_game_frame(friendly_robots, referee, my_team_is_yellow, my_team_is_right) + history = GameHistory(10) + return Game(past=history, current=frame, field=Field.FULL_FIELD_BOUNDS) + + +def _make_blackboard(game: Game, cmd_map=None): + """Construct a minimal SimpleNamespace blackboard as used by AbstractBehaviour.""" + bb = SimpleNamespace() + bb.game = game + bb.cmd_map = cmd_map if cmd_map is not None else {} + bb.motion_controller = None + return bb + + +# --------------------------------------------------------------------------- +# RefereeData — new fields and __eq__ +# --------------------------------------------------------------------------- + + +class TestRefereeDataNewFields: + def test_default_game_events_is_empty_list(self): + data = _make_referee_data() + assert data.game_events == [] + + def test_default_match_type_is_zero(self): + data = _make_referee_data() + assert data.match_type == 0 + + def test_default_status_message_is_none(self): + data = _make_referee_data() + assert data.status_message is None + + def test_custom_game_events_stored(self): + events = [object(), object()] + data = _make_referee_data(game_events=events) + assert data.game_events is events + + def test_custom_match_type_stored(self): + data = _make_referee_data(match_type=2) + assert data.match_type == 2 + + def test_custom_status_message_stored(self): + data = _make_referee_data(status_message="Foul by blue") + assert data.status_message == "Foul by blue" + + def test_eq_ignores_game_events(self): + """Two records with different game_events but identical core fields must compare equal.""" + a = _make_referee_data(game_events=[]) + b = _make_referee_data(game_events=["something"]) + assert a == b + + def test_eq_ignores_match_type(self): + a = _make_referee_data(match_type=0) + b = _make_referee_data(match_type=3) + assert a == b + + def test_eq_ignores_status_message(self): + a = _make_referee_data(status_message=None) + b = _make_referee_data(status_message="Ball out of bounds") + assert a == b + + def test_eq_sensitive_to_referee_command(self): + a = _make_referee_data(command=RefereeCommand.HALT) + b = _make_referee_data(command=RefereeCommand.STOP) + assert a != b + + def test_eq_sensitive_to_stage(self): + a = _make_referee_data(stage=Stage.NORMAL_FIRST_HALF) + b = _make_referee_data(stage=Stage.NORMAL_SECOND_HALF) + assert a != b + + +# --------------------------------------------------------------------------- +# RefereeRefiner +# --------------------------------------------------------------------------- + + +class TestRefereeRefiner: + def setup_method(self): + self.refiner = RefereeRefiner() + + def test_refine_none_data_returns_original_frame(self): + frame = _make_game_frame() + result = self.refiner.refine(frame, None) + assert result is frame + + def test_refine_injects_referee_into_frame(self): + frame = _make_game_frame(referee=None) + data = _make_referee_data(command=RefereeCommand.STOP) + result = self.refiner.refine(frame, data) + assert result.referee is data + + def test_refine_preserves_other_frame_fields(self): + robots = {0: _robot(0, 1.0, 2.0)} + frame = _make_game_frame(friendly_robots=robots, my_team_is_yellow=False) + data = _make_referee_data() + result = self.refiner.refine(frame, data) + assert result.my_team_is_yellow is False + assert result.friendly_robots == robots + + def test_first_data_is_always_recorded(self): + data = _make_referee_data() + frame = _make_game_frame() + self.refiner.refine(frame, data) + assert len(self.refiner._referee_records) == 1 + + def test_duplicate_data_not_re_recorded(self): + """Records with same core fields (equal by __eq__) are not duplicated.""" + data1 = _make_referee_data() + data2 = _make_referee_data(status_message="different but equal core") + frame = _make_game_frame() + self.refiner.refine(frame, data1) + self.refiner.refine(frame, data2) + assert len(self.refiner._referee_records) == 1 + + def test_changed_command_is_recorded(self): + frame = _make_game_frame() + data1 = _make_referee_data(command=RefereeCommand.HALT) + data2 = _make_referee_data(command=RefereeCommand.STOP) + self.refiner.refine(frame, data1) + self.refiner.refine(frame, data2) + assert len(self.refiner._referee_records) == 2 + + def test_last_command_property(self): + frame = _make_game_frame() + self.refiner.refine(frame, _make_referee_data(command=RefereeCommand.BALL_PLACEMENT_BLUE)) + assert self.refiner.last_command == RefereeCommand.BALL_PLACEMENT_BLUE + + def test_last_command_defaults_to_halt_when_empty(self): + assert self.refiner.last_command == RefereeCommand.HALT + + def test_source_identifier_none_when_empty(self): + assert self.refiner.source_identifier() is None + + def test_source_identifier_after_record(self): + frame = _make_game_frame() + self.refiner.refine(frame, _make_referee_data()) + assert self.refiner.source_identifier() == "test" + + +# --------------------------------------------------------------------------- +# Game.referee property +# --------------------------------------------------------------------------- + + +class TestGameRefereeProperty: + def test_referee_none_when_no_data(self): + game = _make_game(referee=None) + assert game.referee is None + + def test_referee_returns_injected_data(self): + data = _make_referee_data(command=RefereeCommand.STOP) + game = _make_game(referee=data) + assert game.referee is data + + def test_referee_command_accessible(self): + data = _make_referee_data(command=RefereeCommand.PREPARE_KICKOFF_YELLOW) + game = _make_game(referee=data) + assert game.referee.referee_command == RefereeCommand.PREPARE_KICKOFF_YELLOW + + def test_add_game_frame_updates_referee(self): + """After add_game_frame, game.referee reflects the new frame.""" + game = _make_game(referee=None) + assert game.referee is None + + new_data = _make_referee_data(command=RefereeCommand.HALT) + new_frame = _make_game_frame(referee=new_data) + game.add_game_frame(new_frame) + assert game.referee is new_data + + +# --------------------------------------------------------------------------- +# CheckRefereeCommand condition node +# --------------------------------------------------------------------------- + + +def _setup_check_node(*commands: RefereeCommand, game: Game) -> CheckRefereeCommand: + """Build and set up a CheckRefereeCommand node with the given expected commands.""" + # Reset py_trees blackboard between tests + py_trees.blackboard.Blackboard.enable_activity_stream() + node = CheckRefereeCommand(*commands) + node.blackboard = _make_blackboard(game) + return node + + +class TestCheckRefereeCommand: + def test_returns_failure_when_referee_is_none(self): + game = _make_game(referee=None) + node = CheckRefereeCommand(RefereeCommand.HALT) + node.blackboard = _make_blackboard(game) + assert node.update() == py_trees.common.Status.FAILURE + + def test_returns_success_on_matching_single_command(self): + data = _make_referee_data(command=RefereeCommand.HALT) + game = _make_game(referee=data) + node = CheckRefereeCommand(RefereeCommand.HALT) + node.blackboard = _make_blackboard(game) + assert node.update() == py_trees.common.Status.SUCCESS + + def test_returns_failure_on_non_matching_command(self): + data = _make_referee_data(command=RefereeCommand.STOP) + game = _make_game(referee=data) + node = CheckRefereeCommand(RefereeCommand.HALT) + node.blackboard = _make_blackboard(game) + assert node.update() == py_trees.common.Status.FAILURE + + def test_returns_success_on_any_matching_multi_command(self): + for cmd in (RefereeCommand.TIMEOUT_YELLOW, RefereeCommand.TIMEOUT_BLUE): + data = _make_referee_data(command=cmd) + game = _make_game(referee=data) + node = CheckRefereeCommand(RefereeCommand.TIMEOUT_YELLOW, RefereeCommand.TIMEOUT_BLUE) + node.blackboard = _make_blackboard(game) + assert node.update() == py_trees.common.Status.SUCCESS + + def test_returns_failure_when_command_not_in_multi_list(self): + data = _make_referee_data(command=RefereeCommand.HALT) + game = _make_game(referee=data) + node = CheckRefereeCommand(RefereeCommand.TIMEOUT_YELLOW, RefereeCommand.TIMEOUT_BLUE) + node.blackboard = _make_blackboard(game) + assert node.update() == py_trees.common.Status.FAILURE + + def test_node_name_contains_command_names(self): + node = CheckRefereeCommand(RefereeCommand.HALT, RefereeCommand.STOP) + assert "HALT" in node.name + assert "STOP" in node.name + + +# --------------------------------------------------------------------------- +# HaltStep and StopStep — basic output verification +# --------------------------------------------------------------------------- + + +def _make_cmd_map(game: Game) -> dict: + return {rid: None for rid in game.friendly_robots} + + +class TestHaltAndStopStep: + def _run_step(self, step_class, game: Game) -> tuple: + from types import SimpleNamespace + + cmd_map = _make_cmd_map(game) + bb = _make_blackboard(game, cmd_map) + node = step_class(name="TestStep") + node.blackboard = bb + status = node.update() + return status, cmd_map + + def test_halt_returns_running(self): + from utama_core.strategy.referee.actions import HaltStep + + game = _make_game(referee=_make_referee_data(command=RefereeCommand.HALT)) + status, _ = self._run_step(HaltStep, game) + assert status == py_trees.common.Status.RUNNING + + def test_halt_writes_to_all_robots(self): + from utama_core.strategy.referee.actions import HaltStep + + robots = {0: _robot(0), 1: _robot(1)} + game = _make_game(friendly_robots=robots, referee=_make_referee_data()) + status, cmd_map = self._run_step(HaltStep, game) + assert set(cmd_map.keys()) == {0, 1} + for rid in robots: + assert cmd_map[rid] is not None + + def test_stop_returns_running(self): + from utama_core.strategy.referee.actions import StopStep + + game = _make_game(referee=_make_referee_data(command=RefereeCommand.STOP)) + status, _ = self._run_step(StopStep, game) + assert status == py_trees.common.Status.RUNNING + + def test_stop_writes_to_all_robots(self): + from utama_core.strategy.referee.actions import StopStep + + robots = {0: _robot(0), 1: _robot(1), 2: _robot(2)} + game = _make_game(friendly_robots=robots, referee=_make_referee_data()) + status, cmd_map = self._run_step(StopStep, game) + assert set(cmd_map.keys()) == {0, 1, 2} + + +# --------------------------------------------------------------------------- +# build_referee_override_tree — structure checks +# --------------------------------------------------------------------------- + + +class TestRefereeOverrideTreeStructure: + def setup_method(self): + self.tree = build_referee_override_tree() + + def test_root_is_selector(self): + assert isinstance(self.tree, py_trees.composites.Selector) + + def test_root_name(self): + assert self.tree.name == "RefereeOverride" + + def test_has_eleven_children(self): + # HALT, STOP, TIMEOUT, BALL_PLACEMENT×2, KICKOFF×2, PENALTY×2, DIRECT_FREE×2 + assert len(self.tree.children) == 11 + + def test_each_child_is_sequence(self): + for child in self.tree.children: + assert isinstance(child, py_trees.composites.Sequence) + + def test_each_sequence_has_two_children(self): + for child in self.tree.children: + assert len(child.children) == 2 + + def test_each_sequence_first_child_is_check_command(self): + for child in self.tree.children: + assert isinstance(child.children[0], CheckRefereeCommand) + + def test_halt_is_first(self): + first_seq = self.tree.children[0] + condition = first_seq.children[0] + assert RefereeCommand.HALT in condition.expected_commands + + def test_stop_is_second(self): + second_seq = self.tree.children[1] + condition = second_seq.children[0] + assert RefereeCommand.STOP in condition.expected_commands + + def test_timeout_handles_both_colours(self): + timeout_seq = self.tree.children[2] + condition = timeout_seq.children[0] + assert RefereeCommand.TIMEOUT_YELLOW in condition.expected_commands + assert RefereeCommand.TIMEOUT_BLUE in condition.expected_commands + + def test_all_bilateral_commands_covered(self): + """Every bilateral referee command must appear in at least one condition node.""" + covered = set() + for child in self.tree.children: + condition = child.children[0] + covered.update(condition.expected_commands) + + bilateral = { + RefereeCommand.BALL_PLACEMENT_YELLOW, + RefereeCommand.BALL_PLACEMENT_BLUE, + RefereeCommand.PREPARE_KICKOFF_YELLOW, + RefereeCommand.PREPARE_KICKOFF_BLUE, + RefereeCommand.PREPARE_PENALTY_YELLOW, + RefereeCommand.PREPARE_PENALTY_BLUE, + RefereeCommand.DIRECT_FREE_YELLOW, + RefereeCommand.DIRECT_FREE_BLUE, + } + assert bilateral.issubset(covered) + + +# --------------------------------------------------------------------------- +# Dispatcher ours-vs-theirs routing (no actual motion controller required) +# --------------------------------------------------------------------------- + + +def _make_dispatch_blackboard(game: Game) -> SimpleNamespace: + bb = _make_blackboard(game) + return bb + + +class TestDispatcherRouting: + """Verify that dispatcher nodes call the correct Ours/Theirs child based on team colour.""" + + def _tick_dispatcher(self, dispatcher, game: Game) -> py_trees.common.Status: + cmd_map = {rid: None for rid in game.friendly_robots} + bb = _make_blackboard(game, cmd_map) + dispatcher.blackboard = bb + # Propagate blackboard to inner ours/theirs nodes + dispatcher._ours.blackboard = bb + dispatcher._theirs.blackboard = bb + return dispatcher.update() + + def test_ball_placement_yellow_calls_ours_when_yellow(self): + from utama_core.strategy.referee.actions import ( + BallPlacementOursStep, + BallPlacementTheirsStep, + ) + from utama_core.strategy.referee.tree import _BallPlacementDispatch + + data = _make_referee_data(command=RefereeCommand.BALL_PLACEMENT_YELLOW) + # my_team_is_yellow=True, is_yellow_command=True → ours + game = _make_game(referee=data, my_team_is_yellow=True) + dispatcher = _BallPlacementDispatch(is_yellow_command=True, name="test") + + called = [] + # original_ours = dispatcher._ours.update + # original_theirs = dispatcher._theirs.update + dispatcher._ours.update = lambda: called.append("ours") or py_trees.common.Status.RUNNING + dispatcher._theirs.update = lambda: called.append("theirs") or py_trees.common.Status.RUNNING + + self._tick_dispatcher(dispatcher, game) + assert called == ["ours"] + + def test_ball_placement_yellow_calls_theirs_when_blue(self): + from utama_core.strategy.referee.tree import _BallPlacementDispatch + + data = _make_referee_data(command=RefereeCommand.BALL_PLACEMENT_YELLOW) + # my_team_is_yellow=False, is_yellow_command=True → theirs + game = _make_game(referee=data, my_team_is_yellow=False) + dispatcher = _BallPlacementDispatch(is_yellow_command=True, name="test") + + called = [] + dispatcher._ours.update = lambda: called.append("ours") or py_trees.common.Status.RUNNING + dispatcher._theirs.update = lambda: called.append("theirs") or py_trees.common.Status.RUNNING + + self._tick_dispatcher(dispatcher, game) + assert called == ["theirs"] + + def test_kickoff_blue_calls_ours_when_blue(self): + from utama_core.strategy.referee.tree import _KickoffDispatch + + data = _make_referee_data(command=RefereeCommand.PREPARE_KICKOFF_BLUE) + # my_team_is_yellow=False, is_yellow_command=False → ours + game = _make_game(referee=data, my_team_is_yellow=False) + dispatcher = _KickoffDispatch(is_yellow_command=False, name="test") + + called = [] + dispatcher._ours.update = lambda: called.append("ours") or py_trees.common.Status.RUNNING + dispatcher._theirs.update = lambda: called.append("theirs") or py_trees.common.Status.RUNNING + + self._tick_dispatcher(dispatcher, game) + assert called == ["ours"] + + def test_penalty_yellow_calls_theirs_when_blue(self): + from utama_core.strategy.referee.tree import _PenaltyDispatch + + data = _make_referee_data(command=RefereeCommand.PREPARE_PENALTY_YELLOW) + # my_team_is_yellow=False, is_yellow_command=True → theirs + game = _make_game(referee=data, my_team_is_yellow=False) + dispatcher = _PenaltyDispatch(is_yellow_command=True, name="test") + + called = [] + dispatcher._ours.update = lambda: called.append("ours") or py_trees.common.Status.RUNNING + dispatcher._theirs.update = lambda: called.append("theirs") or py_trees.common.Status.RUNNING + + self._tick_dispatcher(dispatcher, game) + assert called == ["theirs"] + + def test_direct_free_blue_calls_ours_when_blue(self): + from utama_core.strategy.referee.tree import _DirectFreeDispatch + + data = _make_referee_data(command=RefereeCommand.DIRECT_FREE_BLUE) + # my_team_is_yellow=False, is_yellow_command=False → ours + game = _make_game(referee=data, my_team_is_yellow=False) + dispatcher = _DirectFreeDispatch(is_yellow_command=False, name="test") + + called = [] + dispatcher._ours.update = lambda: called.append("ours") or py_trees.common.Status.RUNNING + dispatcher._theirs.update = lambda: called.append("theirs") or py_trees.common.Status.RUNNING + + self._tick_dispatcher(dispatcher, game) + assert called == ["ours"] diff --git a/utama_core/tests/referee/wandering_strategy.py b/utama_core/tests/referee/wandering_strategy.py new file mode 100644 index 00000000..3680aa39 --- /dev/null +++ b/utama_core/tests/referee/wandering_strategy.py @@ -0,0 +1,119 @@ +"""WanderingStrategy — base strategy for referee visualisation. + +Each robot cycles through its own list of waypoints on the field indefinitely. +When a referee command fires, the RefereeOverride tree (built into AbstractStrategy) +intercepts before this strategy runs, so you can clearly see robots interrupted +and repositioned by the referee. +""" + +import math + +import py_trees + +from utama_core.entities.data.vector import Vector2D +from utama_core.skills.src.utils.move_utils import move +from utama_core.strategy.common import AbstractBehaviour, AbstractStrategy + +# One waypoint list per robot (by index into sorted robot IDs). +# Robots on the right half defend the right goal, so positions are spread +# across both halves to make motion easy to see. +_WAYPOINT_SETS = [ + # Robot 0 — large figure-8 across the field + [ + Vector2D(-3.0, 1.5), + Vector2D(0.0, 0.0), + Vector2D(3.0, -1.5), + Vector2D(0.0, 0.0), + ], + # Robot 1 — diagonal patrol + [ + Vector2D(-2.0, -2.0), + Vector2D(2.0, 2.0), + ], + # Robot 2 — wide horizontal sweep + [ + Vector2D(-3.5, 0.5), + Vector2D(3.5, 0.5), + Vector2D(3.5, -0.5), + Vector2D(-3.5, -0.5), + ], + # Robot 3 — small loop near centre + [ + Vector2D(1.0, 1.0), + Vector2D(-1.0, 1.0), + Vector2D(-1.0, -1.0), + Vector2D(1.0, -1.0), + ], + # Robot 4 — left-half patrol + [ + Vector2D(-3.0, 0.0), + Vector2D(-1.0, 2.0), + Vector2D(-1.0, -2.0), + ], + # Robot 5 — right-half patrol + [ + Vector2D(3.0, 0.0), + Vector2D(1.0, 2.0), + Vector2D(1.0, -2.0), + ], +] + +_ARRIVAL_THRESHOLD = 0.15 # metres — how close counts as "reached" + + +class WanderingStep(AbstractBehaviour): + """Moves each robot through its waypoint list, advancing when it arrives.""" + + def initialise(self): + # Track waypoint index per robot ID + self._wp_index: dict[int, int] = {} + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + motion_controller = self.blackboard.motion_controller + + robot_ids = sorted(game.friendly_robots.keys()) + + for slot, robot_id in enumerate(robot_ids): + waypoints = _WAYPOINT_SETS[slot % len(_WAYPOINT_SETS)] + + if robot_id not in self._wp_index: + self._wp_index[robot_id] = 0 + + wp_idx = self._wp_index[robot_id] + target = waypoints[wp_idx] + + robot = game.friendly_robots[robot_id] + dist = robot.p.distance_to(target) + + if dist < _ARRIVAL_THRESHOLD: + # Advance to next waypoint + self._wp_index[robot_id] = (wp_idx + 1) % len(waypoints) + target = waypoints[self._wp_index[robot_id]] + + oren = robot.p.angle_to(target) + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, target, oren) + + return py_trees.common.Status.RUNNING + + +class WanderingStrategy(AbstractStrategy): + """Strategy where every robot continuously patrols a set of waypoints. + + Intended for use with the referee visualisation simulation so that referee + commands visibly interrupt robot motion. + """ + + def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int) -> bool: + return True + + def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool) -> bool: + return True + + def get_min_bounding_zone(self): + return None + + def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: + root = py_trees.composites.Sequence(name="WanderingRoot", memory=False) + root.add_child(WanderingStep()) + return root