diff --git a/maro/cli/data_pipeline/citi_bike.py b/maro/cli/data_pipeline/citi_bike.py index b8b18166f..b4b74cc51 100644 --- a/maro/cli/data_pipeline/citi_bike.py +++ b/maro/cli/data_pipeline/citi_bike.py @@ -8,7 +8,6 @@ from enum import Enum import geopy.distance -import numpy as np import pandas as pd from yaml import safe_load @@ -320,7 +319,7 @@ def _process_distance(self, station_info: pd.DataFrame): 0, index=station_info["station_index"], columns=station_info["station_index"], - dtype=np.float, + dtype=float, ) look_up_df = station_info[["latitude", "longitude"]] return distance_adj.apply( @@ -617,7 +616,7 @@ def _gen_distance(self, station_init: pd.DataFrame): 0, index=station_init["station_index"], columns=station_init["station_index"], - dtype=np.float, + dtype=float, ) look_up_df = station_init[["latitude", "longitude"]] distance_df = distance_adj.apply( diff --git a/maro/rl/model/abs_net.py b/maro/rl/model/abs_net.py index a559d1124..b1814191e 100644 --- a/maro/rl/model/abs_net.py +++ b/maro/rl/model/abs_net.py @@ -47,6 +47,19 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]: loss.backward() return {name: param.grad for name, param in self.named_parameters()} + def get_kl_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]: + """Get the gradients with respect to all parameters according to the given loss. + + Args: + loss (torch.tensor): Loss used to compute gradients. + + Returns: + Gradients (Dict[str, torch.Tensor]): A dict that contains gradients for all parameters. + """ + self.optim.zero_grad() + loss.backward(retain_graph=True) + return {name: param.grad for name, param in self.named_parameters()} + def apply_gradients(self, grad: Dict[str, torch.Tensor]) -> None: """Apply gradients to the net to update all parameters. diff --git a/maro/rl/policy/abs_policy.py b/maro/rl/policy/abs_policy.py index 14b0bb3a9..105a5f613 100644 --- a/maro/rl/policy/abs_policy.py +++ b/maro/rl/policy/abs_policy.py @@ -190,6 +190,18 @@ def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]: """ raise NotImplementedError + @abstractmethod + def get_kl_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]: + """Get the gradients with respect to all parameters of the internal nets according to the given loss. + + Args: + loss (torch.tensor): Loss used to update the model. + + Returns: + grad (Dict[str, torch.Tensor]): A dict that contains gradients of the internal nets for all parameters. + """ + raise NotImplementedError + @abstractmethod def apply_gradients(self, grad: dict) -> None: """Apply gradients to the net to update all parameters. diff --git a/maro/rl/policy/discrete_rl_policy.py b/maro/rl/policy/discrete_rl_policy.py index 567e9d054..b050402e2 100644 --- a/maro/rl/policy/discrete_rl_policy.py +++ b/maro/rl/policy/discrete_rl_policy.py @@ -286,6 +286,9 @@ def train_step(self, loss: torch.Tensor) -> None: def get_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]: return self._policy_net.get_gradients(loss) + def get_kl_gradients(self, loss: torch.Tensor) -> Dict[str, torch.Tensor]: + return self._policy_net.get_kl_gradients(loss) + def apply_gradients(self, grad: dict) -> None: self._policy_net.apply_gradients(grad) diff --git a/maro/rl/training/algorithms/base/ac_ppo_base.py b/maro/rl/training/algorithms/base/ac_ppo_base.py index 3227437be..478c4ec58 100644 --- a/maro/rl/training/algorithms/base/ac_ppo_base.py +++ b/maro/rl/training/algorithms/base/ac_ppo_base.py @@ -309,3 +309,7 @@ async def train_step_as_task(self) -> None: for _ in range(self._params.grad_iters): if self._ops.update_actor_with_grad(await self._ops.get_actor_grad(batch)): # early stop break + + + + diff --git a/maro/rl/training/algorithms/base/trpo_base.py b/maro/rl/training/algorithms/base/trpo_base.py new file mode 100644 index 000000000..d206dd1f9 --- /dev/null +++ b/maro/rl/training/algorithms/base/trpo_base.py @@ -0,0 +1,45 @@ +import torch +import torch.autograd as autograd +import torch.nn as nn + +class Policy(nn.Module): + def __init__(self, num_inputs, num_outputs): + super(Policy, self).__init__() + self.affine1 = nn.Linear(num_inputs, 64) + self.affine2 = nn.Linear(64, 64) + + self.action_mean = nn.Linear(64, num_outputs) + self.action_mean.weight.data.mul_(0.1) + self.action_mean.bias.data.mul_(0.0) + + self.action_log_std = nn.Parameter(torch.zeros(1, num_outputs)) + + self.saved_actions = [] + self.rewards = [] + self.final_value = 0 + + def forward(self, x): + x = torch.tanh(self.affine1(x)) + x = torch.tanh(self.affine2(x)) + + action_mean = self.action_mean(x) + action_log_std = self.action_log_std.expand_as(action_mean) + action_std = torch.exp(action_log_std) + + return action_mean, action_log_std,action_std + +class Value(nn.Module): + def __init__(self, num_inputs): + super(Value, self).__init__() + self.affine1 = nn.Linear(num_inputs, 64) + self.affine2 = nn.Linear(64, 64) + self.value_head = nn.Linear(64, 1) + self.value_head.weight.data.mul_(0.1) + self.value_head.bias.data.mul_(0.0) + + def forward(self, x): + x = torch.tanh(self.affine1(x)) + x = torch.tanh(self.affine2(x)) + + state_values = self.value_head(x) + return state_values diff --git a/maro/rl/training/algorithms/trpo.py b/maro/rl/training/algorithms/trpo.py new file mode 100644 index 000000000..f4a778d02 --- /dev/null +++ b/maro/rl/training/algorithms/trpo.py @@ -0,0 +1,468 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from abc import ABCMeta +from dataclasses import dataclass +from typing import Callable, Dict, Optional, Tuple, cast, Union, Any + +import numpy as np + +from maro.rl.model import VNet +from maro.rl.policy import ContinuousRLPolicy, DiscretePolicyGradient, RLPolicy +from maro.rl.training import AbsTrainOps, BaseTrainerParams, FIFOReplayMemory, RemoteOps, SingleAgentTrainer, remote +from maro.rl.utils import TransitionBatch, discount_cumsum, get_torch_device, ndarray_to_tensor +from maro.rl.training.algorithms.base.trpo_base import Policy +import warnings +from typing import Any, Dict, List, Optional, Type, Union +from torch.distributions import Independent, Normal, Distribution +import torch +import torch.nn.functional as F +from torch.autograd import Variable +from torch import nn, Tensor +from torch.distributions import kl_divergence + + +@dataclass +class TRPOParams(BaseTrainerParams, metaclass=ABCMeta): + """ + Parameter bundle for Actor-Critic based algorithms (Actor-Critic & PPO) + + get_v_critic_net_func (Callable[[], VNet]): Function to get V critic net. + grad_iters (int, default=1): Number of iterations to calculate gradients. + critic_loss_cls (Callable, default=None): Critic loss function. If it is None, use MSE. + lam (float, default=0.9): Lambda value for generalized advantage estimation (TD-Lambda). + min_logp (float, default=float("-inf")): Lower bound for clamping logP values during learning. + This is to prevent logP from becoming very large in magnitude and causing stability issues. + """ + + get_v_critic_net_func: Callable[[], VNet] + grad_iters: int = 1 + critic_loss_cls: Optional[Callable] = None + + clip_ratio: Optional[float] = None + max_kl: float = 0.01, + backtrack_coeff: float = 0.8, + max_backtracks: int = 10 + + +class TRPOOps(AbsTrainOps): + """Base class of Actor-Critic algorithm implementation. Reference: https://tinyurl.com/2ezte4cr""" + + def __init__( + self, + name: str, + policy: ContinuousRLPolicy, + params: TRPOParams, + dist_fn: Type[torch.distributions.Distribution], + reward_discount: float = 0.9, + parallelism: int = 1, + deterministic_eval: bool = False, + + ) -> None: + super(TRPOOps, self).__init__( + name=name, + policy=policy, + parallelism=parallelism, + ) + + assert isinstance(self._policy, (ContinuousRLPolicy, DiscretePolicyGradient)) + + self._reward_discount = reward_discount + self._critic_loss_func = params.critic_loss_cls() if params.critic_loss_cls is not None else torch.nn.MSELoss() + self._clip_ratio = params.clip_ratio + self._v_critic_net = params.get_v_critic_net_func() + self._is_discrete_action = isinstance(self._policy, DiscretePolicyGradient) + self._max_backtracks = params.max_backtracks + self._deterministic_eval = deterministic_eval + self.dist_fn = dist_fn + self.policy_net = Policy(self.policy_state_dim, self.policy_action_dim) + self._delta = 0.005 + + def _get_critic_loss(self, batch: TransitionBatch) -> torch.Tensor: + """Compute the critic loss of the batch. + + Args: + batch (TransitionBatch): Batch. + + Returns: + loss (torch.Tensor): The critic loss of the batch. + """ + states = ndarray_to_tensor(batch.states, device=self._device) + returns = ndarray_to_tensor(batch.returns, device=self._device) + + self._v_critic_net.train() + state_values = self._v_critic_net.v_values(states) + + return self._critic_loss_func(state_values, returns) + + @remote + def get_critic_grad(self, batch: TransitionBatch) -> Dict[str, torch.Tensor]: + """Compute the critic network's gradients of a batch. + + Args: + batch (TransitionBatch): Batch. + + Returns: + grad (torch.Tensor): The critic gradient of the batch. + """ + return self._v_critic_net.get_gradients(self._get_critic_loss(batch)) + + def update_critic(self, batch: TransitionBatch) -> None: + """Update the critic network using a batch. + + Args: + batch (TransitionBatch): Batch. + """ + self._v_critic_net.step(self._get_critic_loss(batch)) + + def update_critic_with_grad(self, grad_dict: dict) -> None: + """Update the critic network with remotely computed gradients. + + Args: + grad_dict (dict): Gradients. + """ + self._v_critic_net.train() + self._v_critic_net.apply_gradients(grad_dict) + + # def _get_flat_grad( + # self, y: torch.Tensor, model: nn.Module, **kwargs: Any + # ) -> torch.Tensor: + # grads = torch.autograd.grad(y, model.parameters(), **kwargs) # type: ignore + # return torch.cat([grad.reshape(-1) for grad in grads]) + + def get_kl(self, states): + mean1, log_std1, std1 = self.policy_net(Variable(states)) + mean0 = Variable(mean1.data) + log_std0 = Variable(log_std1.data) + std0 = Variable(std1.data) + kl = log_std1 - log_std0 + (std0.pow(2) + (mean0 - mean1).pow(2)) / (2.0 * std1.pow(2)) - 0.5 + return kl.sum(1, keepdim=True) + + def _get_flat_grad( + self, y: torch.Tensor, model: nn.Module, **kwargs: Any + ) -> torch.Tensor: + grads = torch.autograd.grad(y, model.parameters(), **kwargs) # type: ignore + return torch.cat([grad.reshape(-1) for grad in grads]) + + def kl_div(self, p, q): + return (p * torch.log(p / q)).sum().mean() + + def _MVP(self, v: torch.Tensor, flat_kl_grad: torch.Tensor) -> torch.Tensor: + """Matrix vector product.""" + # caculate second order gradient of kl with respect to theta + kl_v = (flat_kl_grad * v).sum() + kl_v.requires_grad_(True) + + flat_grads = self._policy.get_kl_gradients(kl_v) + flat_grad_list = [y for x, y in flat_grads.items()] + flat_kl_grad_grad = torch.cat([grad.reshape(-1) for grad in flat_grad_list]).detach() + + return flat_kl_grad_grad + v * 0.1 + + def _conjugate_gradients( + self, + minibatch: torch.Tensor, + flat_kl_grad: torch.Tensor, + nsteps: int = 10, + residual_tol: float = 1e-10 + ) -> torch.Tensor: + x = torch.zeros_like(minibatch) + r, p = minibatch.clone(), minibatch.clone() + # Note: should be 'r, p = minibatch - MVP(x)', but for x=0, MVP(x)=0. + # Change if doing warm start. + rdotr = r.dot(r) + for _ in range(nsteps): + z = self._MVP(p, flat_kl_grad) + alpha = rdotr / p.dot(z) + x += alpha * p + r -= alpha * z + new_rdotr = r.dot(r) + if new_rdotr < residual_tol: + break + p = r + new_rdotr / rdotr * p + rdotr = new_rdotr + return x + + def _set_from_flat_params( + self, model: nn.Module, flat_params: torch.Tensor + ) -> nn.Module: + prev_ind = 0 + for param in model.parameters(): + flat_size = int(np.prod(list(param.size()))) + param.data.copy_( + flat_params[prev_ind:prev_ind + flat_size].view(param.size()) + ) + prev_ind += flat_size + return model + + def _get_actor_loss(self, batch: TransitionBatch) -> Tuple[torch.Tensor, bool]: + """Compute the actor loss of the batch. + + Args: + batch (TransitionBatch): Batch. + + Returns: + loss (torch.Tensor): The actor loss of the batch. + early_stop (bool): Early stop indicator. + """ + assert isinstance(self._policy, DiscretePolicyGradient) or isinstance(self._policy, ContinuousRLPolicy) + self._policy.train() + states = ndarray_to_tensor(batch.states, device=self._device) + actions = ndarray_to_tensor(batch.actions, device=self._device) + advantages = ndarray_to_tensor(batch.advantages, device=self._device) + logps_old = ndarray_to_tensor(batch.old_logps, device=self._device) + + reward = torch.Tensor(actions.size(0), 1) + # ratio: + if self._is_discrete_action: + actions = actions.long() + logps = self._policy.get_states_actions_logps(states, actions) + ratio = torch.exp(logps - logps_old) + ratio = ratio.reshape(ratio.size(0), -1).transpose(0, 1) + # actor_loss + actor_loss = -(ratio * reward).mean() + + # flat_gard + flat_grads = self._policy.get_gradients(actor_loss) + flat_grad_list = [y for x, y in flat_grads.items()] + flat_grads = torch.cat([grad.reshape(-1) for grad in flat_grad_list]) + + # calculate natural gradient + with torch.no_grad(): + self._policy.train() + old_nograd_logps = self._policy.get_states_actions_logps(states, actions) + + # kl + """ + 因类型不同,所不采用torch中的 kl_divergence + get_kl 源于 https://github.com/ikostrikov/pytorch-trpo/blob/master/trpo.py + math type : p * torch.log(p / q).sum() + (logps * torch.log(logps / logps)).sum() + """ + + kl = self.kl_div(old_nograd_logps, logps_old) + kl.requires_grad_(True) + + # kl flat_gard + flat_kl_grad = self._policy.get_kl_gradients(kl) + flat_kl_grad_list = [y for x, y in flat_kl_grad.items()] + flat_kl_grad = torch.cat([grad.reshape(-1) for grad in flat_kl_grad_list]) + + search_direction = -self._conjugate_gradients( + flat_grads, + flat_kl_grad, + nsteps=10 + ) + + # stepsize: calculate max stepsize constrained by kl bound + step_size = torch.sqrt( + 2 * self._delta / + (search_direction * + self._MVP(search_direction, flat_kl_grad)).sum(0, keepdim=True) + ) + + # stepsize: linesearch stepsize + with torch.no_grad(): + + flat_params = torch.cat( + [param.data.view(-1) for param in self._policy.policy_net.parameters()] + ) + + for i in range(self._max_backtracks): + # print(flat_params) + # print(step_size * search_direction) + new_flat_params = flat_params + step_size * search_direction + + self._set_from_flat_params(self._policy.policy_net, new_flat_params) + # calculate kl and if in bound, loss actually down + new_logps = self._policy.get_states_actions_logps(states, actions) + new_ratio = torch.exp(new_logps - logps_old) + + new_ratio = new_ratio.reshape(ratio.size(0), -1).transpose(0, 1) + new_actor_loss = -(new_ratio * reward).mean() + + # kl = (old_nograd_logps * torch.log(old_nograd_logps / new_logps)).sum().mean() + kl = self.kl_div(old_nograd_logps,ndarray_to_tensor(batch.old_logps, device=self._device)) + kl.requires_grad_(True) + if kl < self._delta and new_actor_loss < actor_loss: + if i > 0: + warnings.warn(f"Backtracking to step {i}.") + break + elif i < self._max_backtracks - 1: + step_size = step_size * 0.8 + else: + self._set_from_flat_params(self._policy.policy_net, new_flat_params) + step_size = torch.tensor([0.0]) + warnings.warn( + "Line search failed! It seems hyperparamters" + " are poor and need to be changed." + ) + print("--------------------------------------") + print(actor_loss) + return actor_loss + + @remote + def get_actor_grad(self, batch: TransitionBatch) -> Dict[str, Tensor]: + """Compute the actor network's gradients of a batch. + + Args: + batch (TransitionBatch): Batch. + + Returns: + grad (torch.Tensor): The actor gradient of the batch. + early_stop (bool): Early stop indicator. + """ + loss = self._get_actor_loss(batch) + return self._policy.get_gradients(loss) + + def update_actor(self, batch: TransitionBatch) -> bool: + """Update the actor network using a batch. + + Args: + batch (TransitionBatch): Batch. + + Returns: + early_stop (bool): Early stop indicator. + """ + loss = self._get_actor_loss(batch) + self._policy.train_step(loss) + + def update_actor_with_grad(self, grad_dict_and_early_stop: Tuple[dict, bool]) -> bool: + """Update the actor network with remotely computed gradients. + + Args: + grad_dict_and_early_stop (Tuple[dict, bool]): Gradients and early stop indicator. + + Returns: + early stop indicator + """ + self._policy.train() + self._policy.apply_gradients(grad_dict_and_early_stop[0]) + return grad_dict_and_early_stop[1] + + def get_non_policy_state(self) -> dict: + return { + "critic": self._v_critic_net.get_state(), + } + + def set_non_policy_state(self, state: dict) -> None: + self._v_critic_net.set_state(state["critic"]) + + def preprocess_batch(self, batch: TransitionBatch) -> TransitionBatch: + """Preprocess the batch to get the returns & advantages. + + Args: + batch (TransitionBatch): Batch. + + Returns: + The updated batch. + """ + assert isinstance(batch, TransitionBatch) + + # Preprocess advantages + states = ndarray_to_tensor(batch.states, device=self._device) # s + actions = ndarray_to_tensor(batch.actions, device=self._device) # a + if self._is_discrete_action: + actions = actions.long() + + with torch.no_grad(): + self._v_critic_net.eval() + self._policy.eval() + values = self._v_critic_net.v_values(states).detach().cpu().numpy() + values = np.concatenate([values, np.zeros(1)]) + rewards = np.concatenate([batch.rewards, np.zeros(1)]) + deltas = rewards[:-1] + self._reward_discount * values[1:] - values[:-1] # r + gamma * v(s') - v(s) + batch.returns = discount_cumsum(rewards, self._reward_discount)[:-1] + batch.old_logps = self._policy.get_states_actions_logps(states, actions).detach().cpu().numpy() + + return batch + + def debug_get_v_values(self, batch: TransitionBatch) -> np.ndarray: + states = ndarray_to_tensor(batch.states, device=self._device) # s + with torch.no_grad(): + values = self._v_critic_net.v_values(states).detach().cpu().numpy() + return values + + def to_device(self, device: str = None) -> None: + self._device = get_torch_device(device) + self._policy.to_device(self._device) + self._v_critic_net.to(self._device) + + +class TRPOTrainer(SingleAgentTrainer): + """Base class of Actor-Critic algorithm implementation. + + References: + https://github.com/openai/spinningup/tree/master/spinup/algos/pytorch + https://towardsdatascience.com/understanding-actor-critic-methods-931b97b6df3f + """ + + def __init__( + self, + name: str, + params: TRPOParams, + replay_memory_capacity: int = 10000, + batch_size: int = 128, + data_parallelism: int = 1, + reward_discount: float = 0.9, + ) -> None: + super(TRPOTrainer, self).__init__( + name, + replay_memory_capacity, + batch_size, + data_parallelism, + reward_discount, + ) + self._params = params + + def _register_policy(self, policy: RLPolicy) -> None: + assert isinstance(policy, (ContinuousRLPolicy, DiscretePolicyGradient)) + self._policy = policy + + + def build(self) -> None: + self._ops = cast(TRPOOps, self.get_ops()) + self._replay_memory = FIFOReplayMemory( + capacity=self._replay_memory_capacity, + state_dim=self._ops.policy_state_dim, + action_dim=self._ops.policy_action_dim, + ) + + def _preprocess_batch(self, transition_batch: TransitionBatch) -> TransitionBatch: + return self._ops.preprocess_batch(transition_batch) + + def get_local_ops(self) -> AbsTrainOps: + return TRPOOps( + name=self._policy.name, + policy=self._policy, + parallelism=self._data_parallelism, + reward_discount=self._reward_discount, + params=self._params, + dist_fn=Type[torch.distributions.Distribution], + ) + + def _get_batch(self) -> TransitionBatch: + batch = self._replay_memory.sample(-1) + # RuntimeError: gather_out_cpu(): Expected dtype int64 for index + np.seterr(divide='ignore', invalid='ignore') + batch.advantages = (batch.advantages - batch.advantages.mean()) / batch.advantages.std() + return batch + + def train_step(self) -> None: + assert isinstance(self._ops, TRPOOps) + batch = self._get_batch() + + for _ in range(self._params.grad_iters): + self._ops.update_critic(batch) + self._ops.update_actor(batch) + + async def train_step_as_task(self) -> None: + assert isinstance(self._ops, RemoteOps) + + batch = self._get_batch() + for _ in range(self._params.grad_iters): + self._ops.update_critic_with_grad(await self._ops.get_critic_grad(batch)) + + for _ in range(self._params.grad_iters): + if self._ops.update_actor_with_grad(await self._ops.get_actor_grad(batch)): # early stop + break diff --git a/maro/rl/training/replay_memory.py b/maro/rl/training/replay_memory.py index 8d8f6e5e4..3e4f573e0 100644 --- a/maro/rl/training/replay_memory.py +++ b/maro/rl/training/replay_memory.py @@ -204,7 +204,7 @@ def __init__( self._states = np.zeros((self._capacity, self._state_dim), dtype=np.float32) self._actions = np.zeros((self._capacity, self._action_dim), dtype=np.float32) self._rewards = np.zeros(self._capacity, dtype=np.float32) - self._terminals = np.zeros(self._capacity, dtype=np.bool) + self._terminals = np.zeros(self._capacity, dtype=bool) self._next_states = np.zeros((self._capacity, self._state_dim), dtype=np.float32) self._returns = np.zeros(self._capacity, dtype=np.float32) self._advantages = np.zeros(self._capacity, dtype=np.float32) @@ -373,7 +373,7 @@ def __init__( self._actions = [np.zeros((self._capacity, action_dim), dtype=np.float32) for action_dim in self._action_dims] self._rewards = [np.zeros(self._capacity, dtype=np.float32) for _ in range(self.agent_num)] self._next_states = np.zeros((self._capacity, self._state_dim), dtype=np.float32) - self._terminals = np.zeros(self._capacity, dtype=np.bool) + self._terminals = np.zeros(self._capacity, dtype=bool) assert len(agent_states_dims) == self.agent_num self._agent_states_dims = agent_states_dims diff --git a/maro/streamit/client/metric.py b/maro/streamit/client/metric.py index 43ca9c4d7..2cdc6e7e4 100644 --- a/maro/streamit/client/metric.py +++ b/maro/streamit/client/metric.py @@ -61,7 +61,7 @@ def is_float_type(v_type: type): Returns: bool: True if an float type. """ - return v_type is float or v_type is np.float or v_type is np.float32 or v_type is np.float64 + return v_type is float or v_type is np.float16 or v_type is np.float32 or v_type is np.float64 def parse_value(value: object): diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..f894a62c1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,21 @@ +torch~=1.6.0 +numpy~=1.20.3 +scipy~=1.10.0 +pyzmq~=25.0.0 +pyyaml~=6.0 +tornado~=6.2 +redis~=4.4.2 +cryptography~=39.0.0 +requests~=2.28.2 +pyjwt~=2.6.0 +pandas~=1.5.3 +altair~=4.2.2 +streamlit~=1.17.0 +tqdm~=4.64.1 +jinja2~=3.1.2 +paramiko~=3.0.0 +ptvsd~=4.3.2 +tabulate~=0.9.0 +python-dateutil~=2.8.2 +holidays~=0.19 +setuptools~=65.6.3 \ No newline at end of file diff --git a/tests/requirements.test.txt b/tests/requirements.test.txt index e44b03370..c7ac76f45 100644 --- a/tests/requirements.test.txt +++ b/tests/requirements.test.txt @@ -6,7 +6,7 @@ deepdiff>=5.7.0 geopy>=2.0.0 holidays>=0.10.3 kubernetes>=21.7.0 -numpy>=1.19.5,<1.24.0 +numpy>=1.19.5 pandas>=0.25.3 paramiko>=2.9.2 pytest>=7.1.2 diff --git a/tests/test_frame.py b/tests/test_frame.py index 2d81cce97..a06713e50 100644 --- a/tests/test_frame.py +++ b/tests/test_frame.py @@ -311,7 +311,7 @@ def test_append_nodes(self): self.assertListEqual([0.0, 0.0, 0.0, 0.0, 9.0], list(states)[0:5]) # 2 padding (NAN) in the end - self.assertTrue((states[-2:].astype(np.int) == 0).all()) + self.assertTrue((states[-2:].astype(int) == 0).all()) states = static_snapshot[1::"a3"] diff --git a/try_maro.py b/try_maro.py new file mode 100644 index 000000000..58d5f0114 --- /dev/null +++ b/try_maro.py @@ -0,0 +1,242 @@ +# Import necessary packages +from typing import Any, Dict, List, Tuple, Union, Optional + +import numpy as np +import torch +from torch.optim import Adam, RMSprop + +from maro.rl.model import DiscreteACBasedNet, FullyConnected, VNet +from maro.rl.policy import DiscretePolicyGradient +from maro.rl.rl_component.rl_component_bundle import RLComponentBundle +from maro.rl.rollout import AbsEnvSampler, CacheElement, ExpElement +from maro.rl.training import TrainingManager +from maro.rl.training.algorithms.trpo import TRPOTrainer, TRPOParams +from maro.rl.training.algorithms.ppo import PPOTrainer, PPOParams +from maro.simulator import Env +from maro.simulator.scenarios.cim.common import Action, ActionType, DecisionEvent +from maro.rl.training.algorithms.ppo import PPOTrainer, PPOParams + + +class CIMEnvSampler(AbsEnvSampler): + def _get_global_and_agent_state_impl( + self, event: DecisionEvent, tick: int = None, + ) -> Tuple[Union[None, np.ndarray, List[object]], Dict[Any, Union[np.ndarray, List[object]]]]: + tick = self._env.tick + vessel_snapshots, port_snapshots = self._env.snapshot_list["vessels"], self._env.snapshot_list["ports"] + port_idx, vessel_idx = event.port_idx, event.vessel_idx + ticks = [max(0, tick - rt) for rt in range(state_shaping_conf["look_back"] - 1)] + future_port_list = vessel_snapshots[tick: vessel_idx: 'future_stop_list'].astype('int') + state = np.concatenate([ + port_snapshots[ticks: [port_idx] + list(future_port_list): port_attributes], + vessel_snapshots[tick: vessel_idx: vessel_attributes] + ]) + return state, {port_idx: state} + + def _translate_to_env_action( + self, action_dict: Dict[Any, Union[np.ndarray, List[object]]], event: DecisionEvent, + ) -> Dict[Any, object]: + action_space = action_shaping_conf["action_space"] + finite_vsl_space = action_shaping_conf["finite_vessel_space"] + has_early_discharge = action_shaping_conf["has_early_discharge"] + + port_idx, model_action = list(action_dict.items()).pop() + + vsl_idx, action_scope = event.vessel_idx, event.action_scope + vsl_snapshots = self._env.snapshot_list["vessels"] + vsl_space = vsl_snapshots[self._env.tick:vsl_idx:vessel_attributes][2] if finite_vsl_space else float("inf") + + percent = abs(action_space[model_action[0]]) + zero_action_idx = len(action_space) / 2 # index corresponding to value zero. + if model_action < zero_action_idx: + action_type = ActionType.LOAD + actual_action = min(round(percent * action_scope.load), vsl_space) + elif model_action > zero_action_idx: + action_type = ActionType.DISCHARGE + early_discharge = vsl_snapshots[self._env.tick:vsl_idx:"early_discharge"][0] if has_early_discharge else 0 + plan_action = percent * (action_scope.discharge + early_discharge) - early_discharge + actual_action = round(plan_action) if plan_action > 0 else round(percent * action_scope.discharge) + else: + actual_action, action_type = 0, None + + return {port_idx: Action(vsl_idx, int(port_idx), actual_action, action_type)} + + def _get_reward(self, env_action_dict: Dict[Any, object], event: DecisionEvent, tick: int) -> Dict[Any, float]: + start_tick = tick + 1 + ticks = list(range(start_tick, start_tick + reward_shaping_conf["time_window"])) + + # Get the ports that took actions at the given tick + ports = [int(port) for port in list(env_action_dict.keys())] + port_snapshots = self._env.snapshot_list["ports"] + future_fulfillment = port_snapshots[ticks:ports:"fulfillment"].reshape(len(ticks), -1) + future_shortage = port_snapshots[ticks:ports:"shortage"].reshape(len(ticks), -1) + + decay_list = [reward_shaping_conf["time_decay"] ** i for i in range(reward_shaping_conf["time_window"])] + rewards = np.float32( + reward_shaping_conf["fulfillment_factor"] * np.dot(future_fulfillment.T, decay_list) + - reward_shaping_conf["shortage_factor"] * np.dot(future_shortage.T, decay_list) + ) + return {agent_id: reward for agent_id, reward in zip(ports, rewards)} + + def _post_step(self, cache_element: CacheElement) -> None: + self._info["env_metric"] = self._env.metrics + + def _post_eval_step(self, cache_element: CacheElement) -> None: + self._post_step(cache_element) + + def post_collect(self, info_list: list, ep: int) -> None: + # print the env metric from each rollout worker + for info in info_list: + print(f"env summary (episode {ep}): {info['env_metric']}") + + # print the average env metric + if len(info_list) > 1: + metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list) + avg_metric = {key: sum(info["env_metric"][key] for info in info_list) / num_envs for key in metric_keys} + print(f"average env summary (episode {ep}): {avg_metric}") + + def post_evaluate(self, info_list: list, ep: int) -> None: + self.post_collect(info_list, ep) + + +class MyActorNet(DiscreteACBasedNet): + def __init__(self, state_dim: int, action_num: int) -> None: + super(MyActorNet, self).__init__(state_dim=state_dim, action_num=action_num) + self._actor = FullyConnected(input_dim=state_dim, output_dim=action_num, **actor_net_conf) + self._optim = Adam(self._actor.parameters(), lr=actor_learning_rate) + + def _get_action_probs_impl(self, states: torch.Tensor) -> torch.Tensor: + return self._actor(states) + + +class MyCriticNet(VNet): + def __init__(self, state_dim: int) -> None: + super(MyCriticNet, self).__init__(state_dim=state_dim) + self._critic = FullyConnected(input_dim=state_dim, **critic_net_conf) + self._optim = RMSprop(self._critic.parameters(), lr=critic_learning_rate) + + def _get_v_values(self, states: torch.Tensor) -> torch.Tensor: + return self._critic(states).squeeze(-1) + + +def get_trpo_trainer(state_dim: int, name: str) -> TRPOTrainer: + return TRPOTrainer( + name=name, + reward_discount=.0, + params=TRPOParams( + get_v_critic_net_func=lambda: MyCriticNet(state_dim), + grad_iters=10, + critic_loss_cls=torch.nn.SmoothL1Loss, + clip_ratio=0.1, + ), + ) + + +def get_ppo_trainer(state_dim: int, name: str) -> PPOTrainer: + return PPOTrainer( + name=name, + reward_discount=.0, + params=PPOParams( + get_v_critic_net_func=lambda: MyCriticNet(state_dim), + grad_iters=10, + critic_loss_cls=torch.nn.SmoothL1Loss, + lam=.0, + clip_ratio=0.1, + ), + ) + + +# env and shaping config +reward_shaping_conf = { + "time_window": 99, + "fulfillment_factor": 1.0, + "shortage_factor": 1.0, + "time_decay": 0.97, +} +state_shaping_conf = { + "look_back": 7, + "max_ports_downstream": 2, +} +port_attributes = ["empty", "full", "on_shipper", "on_consignee", "booking", "shortage", "fulfillment"] +vessel_attributes = ["empty", "full", "remaining_space"] +action_shaping_conf = { + "action_space": [(i - 10) / 10 for i in range(21)], + "finite_vessel_space": True, + "has_early_discharge": True, +} +state_dim = ( + (state_shaping_conf["look_back"] + 1) * (state_shaping_conf["max_ports_downstream"] + 1) * len(port_attributes) + + len(vessel_attributes) +) +action_num = len(action_shaping_conf["action_space"]) + +actor_net_conf = { + "hidden_dims": [256, 128, 64], + "activation": torch.nn.Tanh, + "softmax": True, + "batch_norm": False, + "head": True, +} +critic_net_conf = { + "hidden_dims": [256, 128, 64], + "output_dim": 1, + "activation": torch.nn.LeakyReLU, + "softmax": False, + "batch_norm": True, + "head": True, +} + +actor_learning_rate = 0.001 +critic_learning_rate = 0.001 + +learn_env = Env(scenario="cim", topology="toy.4p_ssdd_l0.0", durations=500) +test_env = learn_env +num_agents = len(learn_env.agent_idx_list) +agent2policy = {agent: f"ppo_{agent}.policy" for agent in learn_env.agent_idx_list} +policies = [DiscretePolicyGradient(name=f"ppo_{i}.policy", policy_net=MyActorNet(state_dim, action_num)) for i in + range(num_agents)] +trainers = [get_trpo_trainer(state_dim, f"ppo_{i}") for i in range(num_agents)] + +rl_component_bundle = RLComponentBundle( + env_sampler=CIMEnvSampler( + learn_env=learn_env, + test_env=test_env, + policies=policies, + agent2policy=agent2policy, + reward_eval_delay=reward_shaping_conf["time_window"], + ), + agent2policy=agent2policy, + policies=policies, + trainers=trainers, +) + +env_sampler = rl_component_bundle.env_sampler + +num_episodes = 30 +eval_schedule = [5, 10, 15, 20, 25, 30] +eval_point_index = 0 + +training_manager = TrainingManager(rl_component_bundle=rl_component_bundle) + +# main loop +for ep in range(1, num_episodes + 1): + result = env_sampler.sample() + experiences: List[List[ExpElement]] = result["experiences"] + + info_list: List[dict] = result["info"] + + print("Collecting result:") + env_sampler.post_collect(info_list, ep) + print() + + training_manager.record_experiences(experiences) + training_manager.train_step() + + if ep == eval_schedule[eval_point_index]: + eval_point_index += 1 + result = env_sampler.eval() + + print("Evaluation result:") + env_sampler.post_evaluate(result["info"], ep) + print() + +training_manager.exit()