44import genesis as gs
55from genesis .utils .geom import quat_to_xyz , transform_by_quat , inv_quat , transform_quat_by_quat
66
7-
87def gs_rand_float (lower , upper , shape , device ):
98 return (upper - lower ) * torch .rand (size = shape , device = device ) + lower
109
@@ -15,12 +14,14 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
1514
1615 self .num_envs = num_envs
1716 self .num_obs = obs_cfg ["num_obs" ]
18- # self.num_privileged_obs = obs_cfg.get("num_privileged_obs", None)
1917 self .num_actions = env_cfg ["num_actions" ]
2018 self .num_commands = command_cfg ["num_commands" ]
19+
20+ self .add_noise = obs_cfg .get ("add_noise" , False )
2121
2222 # observation history
23- self .frame_stack = obs_cfg .get ("frame_stack" , 1 )
23+ self .frame_stack = obs_cfg .get ("frame_stack" , 15 )
24+ self .c_frame_stack = obs_cfg .get ("c_frame_stack" , 3 )
2425 self .obs_history = collections .deque (maxlen = self .frame_stack )
2526 self .critic_history = collections .deque (maxlen = self .frame_stack )
2627
@@ -37,7 +38,7 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
3738 self .reward_scales = reward_cfg ["reward_scales" ]
3839
3940 # privileged observation config
40- self .num_privileged_obs = self . num_commands + self . num_actions * 3 + 3 + 3 + 3 # commands + dof_pos + dof_vel + actions + lin_vel + ang_vel + quat
41+ self .num_privileged_obs = 66
4142
4243 # create scene
4344 self .scene = gs .Scene (
@@ -58,16 +59,32 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
5859 show_viewer = show_viewer ,
5960 )
6061
61- # add plain
62- self .scene .add_entity (gs .morphs .URDF (file = "urdf/plane/plane.urdf" , fixed = True ))
62+ # add terrain
63+ self .terrain = self .scene .add_entity (gs .morphs .Terrain (
64+ n_subterrains = (3 , 3 ),
65+ subterrain_size = (12.0 , 12.0 ),
66+ horizontal_scale = 0.25 ,
67+ vertical_scale = 0.005 ,
68+ subterrain_types = [
69+ ["flat_terrain" , "random_uniform_terrain" , "stepping_stones_terrain" ],
70+ ["pyramid_sloped_terrain" , "discrete_obstacles_terrain" , "wave_terrain" ],
71+ ["random_uniform_terrain" , "pyramid_stairs_terrain" , "sloped_terrain" ]
72+ ],
73+ visualization = True ,
74+ collision = True
75+ ))
76+
77+ # terrain measurement
78+ self .measured_heights = None
79+ self .height_samples = 64
80+ self .obs_scales ["height_measurements" ] = 1.0
6381
6482 # add robot
6583 self .base_init_pos = torch .tensor (self .env_cfg ["base_init_pos" ], device = self .device )
6684 self .base_init_quat = torch .tensor (self .env_cfg ["base_init_quat" ], device = self .device )
6785 self .inv_base_init_quat = inv_quat (self .base_init_quat )
6886 self .robot = self .scene .add_entity (
6987 gs .morphs .URDF (
70- # file="../resources/stompymicro/robot_fixed.urdf",
7188 file = "sim/resources/stompymicro/robot_fixed.urdf" ,
7289 pos = self .base_init_pos .cpu ().numpy (),
7390 quat = self .base_init_quat .cpu ().numpy (),
@@ -80,6 +97,29 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
8097 # names to indices
8198 self .motor_dofs = [self .robot .get_joint (name ).dof_idx_local for name in self .env_cfg ["dof_names" ]]
8299
100+ # Initialize legs_joints mapping
101+ self .legs_joints = {
102+ "left_hip_pitch" : self .motor_dofs [0 ],
103+ "left_knee_pitch" : self .motor_dofs [1 ],
104+ "left_ankle_pitch" : self .motor_dofs [2 ],
105+ "right_hip_pitch" : self .motor_dofs [3 ],
106+ "right_knee_pitch" : self .motor_dofs [4 ],
107+ "right_ankle_pitch" : self .motor_dofs [5 ]
108+ }
109+
110+ # Initialize feet indices
111+ self .feet_indices = [
112+ self .robot .get_body_index ("left_foot" ),
113+ self .robot .get_body_index ("right_foot" )
114+ ]
115+
116+ # Initialize observation related buffers
117+ self .contact_forces = torch .zeros ((self .num_envs , len (self .feet_indices ), 3 ), device = self .device )
118+ self .rand_push_force = torch .zeros ((self .num_envs , 3 ), device = self .device )
119+ self .rand_push_torque = torch .zeros ((self .num_envs , 3 ), device = self .device )
120+ self .env_frictions = torch .ones ((self .num_envs ,), device = self .device )
121+ self .body_mass = torch .ones ((self .num_envs ,), device = self .device ) * 30.0
122+
83123 # PD control parameters
84124 self .robot .set_dofs_kp ([self .env_cfg ["kp" ]] * self .num_actions , self .motor_dofs )
85125 self .robot .set_dofs_kv ([self .env_cfg ["kd" ]] * self .num_actions , self .motor_dofs )
@@ -125,6 +165,10 @@ def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_vie
125165 )
126166 self .extras = dict () # extra information for logging
127167
168+ self .noise_scale_vec = self ._get_noise_scale_vec (obs_cfg )
169+
170+
171+
128172 def _resample_commands (self , envs_idx ):
129173 self .commands [envs_idx , 0 ] = gs_rand_float (* self .command_cfg ["lin_vel_x_range" ], (len (envs_idx ),), self .device )
130174 self .commands [envs_idx , 1 ] = gs_rand_float (* self .command_cfg ["lin_vel_y_range" ], (len (envs_idx ),), self .device )
@@ -133,6 +177,8 @@ def _resample_commands(self, envs_idx):
133177
134178 def step (self , actions ):
135179 self .actions = torch .clip (actions , - self .env_cfg ["clip_actions" ], self .env_cfg ["clip_actions" ])
180+ # Update last actions before execution
181+ self .last_actions [:] = self .actions [:]
136182 exec_actions = self .last_actions if self .simulate_action_latency else self .actions
137183 target_dof_pos = exec_actions * self .env_cfg ["action_scale" ] + self .default_dof_pos
138184 self .robot .control_dofs_position (target_dof_pos , self .motor_dofs )
@@ -178,55 +224,166 @@ def step(self, actions):
178224 self .rew_buf += rew
179225 self .episode_sums [name ] += rew
180226
181- # compute observations
182- obs_now = torch .cat (
183- [
184- self .base_ang_vel * self .obs_scales ["ang_vel" ], # 3
185- self .projected_gravity , # 3
186- self .commands * self .commands_scale , # 4
187- (self .dof_pos - self .default_dof_pos ) * self .obs_scales ["dof_pos" ], # 12
188- self .dof_vel * self .obs_scales ["dof_vel" ], # 12
189- self .actions , # 12
190- ],
191- axis = - 1 ,
192- )
193-
194- # privileged observations
195- if self .num_privileged_obs is not None :
196- self .privileged_obs_buf = torch .cat (
197- [
198- self .commands * self .commands_scale , # 4
199- (self .dof_pos - self .default_dof_pos ) * self .obs_scales ["dof_pos" ], # 12
200- self .dof_vel * self .obs_scales ["dof_vel" ], # 12
201- self .actions , # 12
202- self .base_lin_vel * self .obs_scales ["lin_vel" ], # 3
203- self .base_ang_vel * self .obs_scales ["ang_vel" ], # 3
204- self .base_euler * self .obs_scales ["quat" ], # 3
205- ],
206- axis = - 1 ,
207- )
208- self .critic_history .append (self .privileged_obs_buf )
209-
210- # stack observations
211- self .obs_history .append (obs_now )
212- obs_buf_all = torch .stack ([self .obs_history [i ] for i in range (self .frame_stack )], dim = 1 )
213- self .obs_buf = obs_buf_all .reshape (self .num_envs , - 1 )
227+ # Compute observations
228+ self .compute_observations ()
214229
215230 self .last_actions [:] = self .actions [:]
216231 self .last_dof_vel [:] = self .dof_vel [:]
217232
218233 return self .obs_buf , self .rew_buf , self .reset_buf , {
219234 "observations" : {
220- "critic" : self .obs_buf
235+ "critic" : self .privileged_obs_buf
221236 },
222237 ** self .extras
223238 }
224239
225240 def get_observations (self ):
226241 return self .obs_buf , {"observations" : {"critic" : self .obs_buf }}
227242
228- def get_privileged_observations (self ):
229- return None
243+ def _get_phase (self ):
244+ cycle_time = self .env_cfg .get ("cycle_time" , 1.0 )
245+ phase = self .episode_length_buf * self .dt / cycle_time
246+ return phase
247+
248+ def _get_gait_phase (self ):
249+ # return float mask 1 is stance, 0 is swing
250+ phase = self ._get_phase ()
251+ sin_pos = torch .sin (2 * torch .pi * phase )
252+ # Add double support phase
253+ stance_mask = torch .zeros ((self .num_envs , 2 ), device = self .device )
254+ # left foot stance
255+ stance_mask [:, 0 ] = sin_pos >= 0
256+ # right foot stance
257+ stance_mask [:, 1 ] = sin_pos < 0
258+ # Double support phase
259+ stance_mask [torch .abs (sin_pos ) < 0.1 ] = 1
260+ return stance_mask
261+
262+ def compute_ref_state (self ):
263+ phase = self ._get_phase ()
264+ sin_pos = torch .sin (2 * torch .pi * phase )
265+ sin_pos_l = sin_pos .clone ()
266+ sin_pos_r = sin_pos .clone ()
267+ default_clone = self .default_dof_pos .clone ()
268+ self .ref_dof_pos = self .default_dof_pos .repeat (self .num_envs , 1 )
269+
270+ scale_1 = self .env_cfg .get ("target_joint_pos_scale" , 0.1 )
271+ scale_2 = 2 * scale_1
272+ # left foot stance phase set to default joint pos
273+ sin_pos_l [sin_pos_l > 0 ] = 0
274+ self .ref_dof_pos [:, self .legs_joints ["left_hip_pitch" ]] += sin_pos_l * scale_1
275+ self .ref_dof_pos [:, self .legs_joints ["left_knee_pitch" ]] += sin_pos_l * scale_2
276+ self .ref_dof_pos [:, self .legs_joints ["left_ankle_pitch" ]] += sin_pos_l * scale_1
277+
278+ # right foot stance phase set to default joint pos
279+ sin_pos_r [sin_pos_r < 0 ] = 0
280+ self .ref_dof_pos [:, self .legs_joints ["right_hip_pitch" ]] += sin_pos_r * scale_1
281+ self .ref_dof_pos [:, self .legs_joints ["right_knee_pitch" ]] += sin_pos_r * scale_2
282+ self .ref_dof_pos [:, self .legs_joints ["right_ankle_pitch" ]] += sin_pos_r * scale_1
283+
284+ # Double support phase
285+ self .ref_dof_pos [torch .abs (sin_pos ) < 0.1 ] = 0
286+
287+ self .ref_action = 2 * self .ref_dof_pos
288+
289+ def _get_noise_scale_vec (self , cfg ):
290+ """Sets a vector used to scale the noise added to the observations.
291+ [NOTE]: Must be adapted when changing the observations structure
292+
293+ Args:
294+ cfg (Dict): Environment config file
295+
296+ Returns:
297+ [torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1]
298+ """
299+ num_actions = self .num_actions
300+ noise_vec = torch .zeros (cfg ["num_single_obs" ], device = self .device )
301+ self .add_noise = cfg .get ("add_noise" , False )
302+ noise_scales = cfg ["noise_scales" ]
303+ noise_vec [0 :5 ] = 0.0 # commands
304+ noise_vec [5 : (num_actions + 5 )] = noise_scales .dof_pos * self .obs_scales .dof_pos
305+ noise_vec [(num_actions + 5 ) : (2 * num_actions + 5 )] = noise_scales .dof_vel * self .obs_scales .dof_vel
306+ noise_vec [(2 * num_actions + 5 ) : (3 * num_actions + 5 )] = 0.0 # previous actions
307+ noise_vec [(3 * num_actions + 5 ) : (3 * num_actions + 5 ) + 3 ] = (
308+ noise_scales .ang_vel * self .obs_scales .ang_vel
309+ ) # ang vel
310+ noise_vec [(3 * num_actions + 5 ) + 3 : (3 * num_actions + 5 )] = (
311+ noise_scales .quat * self .obs_scales .quat
312+ ) # euler x,y
313+ return noise_vec
314+
315+ def compute_observations (self ):
316+ phase = self ._get_phase ()
317+ self .compute_ref_state ()
318+
319+ sin_pos = torch .sin (2 * torch .pi * phase ).unsqueeze (1 )
320+ cos_pos = torch .cos (2 * torch .pi * phase ).unsqueeze (1 )
321+
322+ stance_mask = self ._get_gait_phase ()
323+ contact_mask = self .contact_forces [:, self .feet_indices , 2 ] > 5.0
324+
325+ self .command_input = torch .cat ((sin_pos , cos_pos , self .commands [:, :3 ] * self .commands_scale ), dim = 1 )
326+ q = (self .dof_pos - self .default_dof_pos ) * self .obs_scales .dof_pos
327+ dq = self .dof_vel * self .obs_scales .dof_vel
328+
329+ diff = self .dof_pos - self .ref_dof_pos
330+ self .privileged_obs_buf = torch .cat (
331+ (
332+ self .command_input , # 2 + 3
333+ (self .dof_pos - self .default_joint_pd_target ) * self .obs_scales .dof_pos , # 10D
334+ self .dof_vel * self .obs_scales .dof_vel , # 10D
335+ self .actions , # 10D
336+ diff , # 10D
337+ self .base_lin_vel * self .obs_scales .lin_vel , # 3
338+ self .base_ang_vel * self .obs_scales .ang_vel , # 3
339+ self .base_euler_xyz * self .obs_scales .quat , # 3
340+ self .rand_push_force [:, :2 ], # 3
341+ self .rand_push_torque , # 3
342+ self .env_frictions , # 1
343+ self .body_mass / 30.0 , # 1
344+ stance_mask , # 2
345+ contact_mask , # 2
346+ ),
347+ dim = - 1 ,
348+ )
349+
350+ obs_buf = torch .cat (
351+ (
352+ self .command_input , # 5 = 2D(sin cos) + 3D(vel_x, vel_y, aug_vel_yaw)
353+ q , # 10D
354+ dq , # 10D
355+ self .actions , # 10D
356+ self .base_ang_vel * self .obs_scales .ang_vel , # 3
357+ self .base_euler_xyz * self .obs_scales .quat , # 3
358+ self .measured_heights .mean (dim = 1 , keepdim = True ) * self .obs_scales .height_measurements , # 1
359+ self .terrain_difficulty .unsqueeze (1 ), # 1
360+ ),
361+ dim = - 1 ,
362+ )
363+
364+ if self .measure_heights :
365+ heights = (
366+ torch .clip (
367+ self .root_states [:, 2 ].unsqueeze (1 ) - 0.5 - self .measured_heights ,
368+ - 1 ,
369+ 1.0 ,
370+ )
371+ * self .obs_scales .height_measurements
372+ )
373+ self .privileged_obs_buf = torch .cat ((self .obs_buf , heights ), dim = - 1 )
374+
375+ if self .add_noise :
376+ noise_level = self .obs_cfg .get ("noise_level" , 0.1 )
377+ obs_now = obs_buf .clone () + torch .randn_like (obs_buf ) * self .noise_scale_vec * noise_level
378+ else :
379+ obs_now = obs_buf .clone ()
380+ self .obs_history .append (obs_now )
381+ self .critic_history .append (self .privileged_obs_buf )
382+
383+ obs_buf_all = torch .stack ([self .obs_history [i ] for i in range (self .obs_history .maxlen )], dim = 1 ) # N,T,K
384+
385+ self .obs_buf = obs_buf_all .reshape (self .num_envs , - 1 ) # N, T*K
386+ self .privileged_obs_buf = torch .cat ([self .critic_history [i ] for i in range (self .c_frame_stack )], dim = 1 )
230387
231388 def reset_idx (self , envs_idx ):
232389 if len (envs_idx ) == 0 :
@@ -325,3 +482,47 @@ def _reward_energy_efficiency(self):
325482 def _reward_orientation (self ):
326483 # Penalize base orientation away from upright
327484 return torch .exp (- torch .abs (self .base_euler [:, 0 ]) - torch .abs (self .base_euler [:, 1 ])) * self .reward_cfg ["reward_scales" ]["orientation" ]
485+
486+ def _reward_terrain_adaptation (self ):
487+ """Reward for adapting to different terrain types"""
488+ # Calculate foot clearance
489+ foot_clearance = torch .zeros ((self .num_envs , 2 ), device = self .device )
490+ for i , foot_idx in enumerate (self .feet_indices ):
491+ foot_pos = self .robot .get_body_pos (foot_idx )
492+ terrain_height = self .terrain .get_height (foot_pos [:, :2 ])
493+ foot_clearance [:, i ] = foot_pos [:, 2 ] - terrain_height
494+
495+ # Reward for maintaining appropriate foot clearance
496+ target_clearance = 0.05 # 5cm
497+ clearance_error = torch .abs (foot_clearance - target_clearance )
498+ return torch .exp (- torch .mean (clearance_error , dim = 1 ) / 0.02 )
499+
500+ def _reward_terrain_stability (self ):
501+ """Reward for maintaining stability on uneven terrain"""
502+ # Penalize large base orientation changes
503+ base_euler = quat_to_xyz (self .base_quat )
504+ return torch .exp (- torch .abs (base_euler [:, 0 ]) - torch .abs (base_euler [:, 1 ]))
505+
506+ def _reward_terrain_progress (self ):
507+ """Reward for making progress across different terrain types"""
508+ # Calculate forward progress relative to terrain difficulty
509+ forward_vel = self .base_lin_vel [:, 0 ]
510+ terrain_difficulty = self .terrain .get_difficulty (self .base_pos [:, :2 ])
511+
512+ # Difficulty factors based on terrain type
513+ difficulty_factors = {
514+ "flat_terrain" : 0.1 ,
515+ "random_uniform_terrain" : 0.3 ,
516+ "stepping_stones_terrain" : 0.5 ,
517+ "pyramid_sloped_terrain" : 0.7 ,
518+ "discrete_obstacles_terrain" : 0.8 ,
519+ "wave_terrain" : 0.6 ,
520+ "pyramid_stairs_terrain" : 0.9 ,
521+ "sloped_terrain" : 0.4
522+ }
523+
524+ # Get current terrain type
525+ terrain_type = self .terrain .get_type (self .base_pos [:, :2 ])
526+ difficulty = difficulty_factors .get (terrain_type , 0.5 )
527+
528+ return forward_vel / (difficulty + 0.1 )
0 commit comments