-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path_mdp_single_device.py
More file actions
224 lines (184 loc) · 10.6 KB
/
_mdp_single_device.py
File metadata and controls
224 lines (184 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import os
os.environ['OPENBLAS_NUM_THREADS'] = '1'
import numpy as np
import math
import random
class SingleDevice():
def __init__(self, M, B, total_players, congestion_penalty_multiplier = None, congestion_penalty_exponent = None, gamma = 0.99, eps_0 = 0, exploration_rate = 0, mean_value_normal_distr = 3, sigma_normal_distr = 3, lambda_harvesting_distribution = 1, delta = 1) :
self.num_states = int(M*(B+1)*2)
self.M = int(M)
self.B = int(B)
self.total_players = total_players
self.eps0 = eps_0
self.gamma = gamma
self.h = 1
self.exploration_rate = exploration_rate
self.delta = delta
self.min_energy = - min(10, self.B-1) # minimum energy that the agent can have (i.e. the agent can have a negative energy). The minimum level is obtained if
# local processing is chose from e=0 and the cost of the action is the maximum possible
# definition of the cost distribution
self.p_C = None
self.p_H = None
self.lagrange_multiplier = 0
state = dict({'x': 0, 'e': 0, 'index': 0})
self.max_episodes_steps = 100
self.harvesting_rate_vector = []
self.processing_rate_vector = []
self.num_states = self.M*(-self.min_energy + self.B+1)
self.action_space_size = 3
self.average_constraint = 0
self.sum_average_constraints = 0
self.congestion_penalty_parameters = {'multiplier': congestion_penalty_multiplier, 'exponent': congestion_penalty_exponent}
def compute_state_index(self, x, e):
return (x-1)*(-self.min_energy + self.B +1 ) + (e - self.min_energy -1) + 1
def compute_state_coordinates(self, index):
x = (index - 1) // (-self.min_energy + self.B + 1) + 1
e = (index - 1) % (-self.min_energy + self.B + 1) + self.min_energy + 1
return x, e
def reward_function(self, state, action, training = False, other_noise = 0):
if state['e']<0 and action == 0:
# the penalty is equal to the absolute value of the energy (i.e. how many energy units we need to add in order to have a positive energy and therefore conclude the processing action)
return state['x'] - state['e']
elif state['e']<0 and action >0:
# print('action > 0 with negative energy')
return 1000
elif action == 0 and (state['x']>= self.M or state['e'] >= self.B):
# print('action 0 with x >= M or e >= B')
# print('action 0')
return state['x'] + 1000
elif action <= 1 and state['e']>=0 :
return state['x']
elif action == 2:
# we can actually simulate the amount of other agents that choose this action
# for the moment, the cost of the other agents is equal to the average amount of agents that choose the common action
cost_other_agents = lambda x : self.congestion_penalty_parameters['multiplier'] * (x-1)**self.congestion_penalty_parameters['exponent']
# print(cost_other_agents(self.sum_average_constraints + other_noise - self.average_constraint + 1))
# print(self.sum_average_constraints, self.average_constraint)
return state['x'] + cost_other_agents(self.sum_average_constraints + other_noise - self.average_constraint + 1) + training * self.lagrange_multiplier # + np.random.randint(self.total_players)
else:
print(state, action)
raise ValueError("Error in reward function")
def reward_with_interactions(self, global_state, global_action, return_vector_reward = False):
# the state is the local state of the agent dnoeted as agent_index
global_reward = 0
vector_reward = np.zeros(self.total_players)
for i in range(self.total_players):
if global_state[i]['e']<0:
# the penalty is equal to the absolute value of the energy (i.e. how many energy units we need to add in order to have a positive energy and therefore conclude the processing action)
vector_reward[i] = global_state[i]['x'] - global_state[i]['e']
global_reward += global_state[i]['x'] - global_state[i]['e']
elif global_action[i] == 0 and (global_state[i]['x']>= self.M or global_state[i]['e'] >= self.B):
vector_reward[i] = global_state[i]['x'] + 1000
global_reward += global_state[i]['x'] + 1000
elif global_action[i] <= 1 and global_state[i]['e']>=0 :
vector_reward[i] = global_state[i]['x']
global_reward += global_state[i]['x']
elif global_action[i] == 2:
# we can actually simulate the amount of other agents that choose this action
# for the moment, the cost of the other agents is equal to the average amount of agents that choose the common action
cost_other_agents = lambda x : self.congestion_penalty_parameters['multiplier'] * (x-1)**self.congestion_penalty_parameters['exponent']
# the interaction with the other agents is limited to countin the number of agents that choose the same action (note how the agent agent_index does not have to be counted)
other_agents_interation_action = np.count_nonzero(global_action == 2)
vector_reward[i] = global_state[i]['x'] + cost_other_agents(other_agents_interation_action)
global_reward += global_state[i]['x'] + cost_other_agents(other_agents_interation_action)
else:
print(state[i], action[i])
raise ValueError("Error in reward function")
if return_vector_reward:
return global_reward, vector_reward
else:
return global_reward
def cost_function(self, state, action):
if action == 2:
return 1
else:
return 0
def step(self, state, action, training = False):
# obtain energy, aoi and server availability from state index
success = False
next_state = dict({'x': 0, 'e': 0, 'index': 0})
reward = self.reward_function(state, action, training=training)
cost = self.cost_function(state, action)
if action == 1:
# we do the action: sample a cost, if it is lower than the energy available, then we can conclude the action, otherwise it fails
cost_of_action = random.choices(np.arange(len(self.p_C)), weights = self.p_C)[0]
self.processing_rate_vector.append(cost_of_action)
harvesting_rate = random.choices(np.arange(len(self.p_H)), weights = self.p_H)[0]
if cost_of_action <= state['e'] + harvesting_rate:
# action completed
next_state['x'] = 1
next_state['e'] = min(self.B, state['e'] + harvesting_rate - cost_of_action)
success = True
else:
next_state['x'] = min(state['x'] + 1, self.M)
# we need to do a number of harvestings sufficient to go back to a positive energy
next_state['e'] = state['e'] + harvesting_rate - cost_of_action
# new_energy is negative
elif action == 0:
# action = "wait"
harvesting_rate = random.choices(np.arange(len(self.p_H)), weights = self.p_H)[0]
next_state['e'] = min(state['e'] + harvesting_rate, self.B)
if state['e'] < 0 and next_state['e'] >= 0:
# we have to wait for the energy to be positive
next_state['x'] = 1
else:
next_state['x'] = min(state['x'] + 1, self.M)
elif action == 2:
# action = "remote processing"
next_state['x'] = 1
harvesting_rate = random.choices(np.arange(len(self.p_H)), weights = self.p_H)[0]
transmission_cost = 0
next_state['e'] = min(self.B, state['e'] + harvesting_rate - transmission_cost)
success = True
# self.harvesting_rate_vector.append(harvesting_rate)
# computation of next_state index
next_state['index'] = self.compute_state_index(next_state['x'], next_state['e'])
episode_ended = False
return next_state, reward, episode_ended, {'cost': cost}
# step function when h is constant
def step_old(self, state, action):
# obtain energy, aoi and server availability from state index
server_available = state // (self.M*(self.B+1))
state = state - server_available*self.M*(self.B+1)
aoi = (state // (self.B+1)) +1
energy = state % (self.B +1)
done = False
success = False
if server_available:
# action is always 1
new_aoi = 1
new_energy = min(energy + self.h, self.B)
reward = self.reward_function[min(aoi-1+self.delta, self.M-1)]
success = True
else:
if action == 1:
# we do the action: sample a cost, if it is lower than the energy available, then we can conclude the action, otherwise it fails
cost_of_action = np.random.choice(np.arange(self.B + self.h +2), p = self.p_C)
if cost_of_action <= energy + self.h:
# action completed
reward = self.reward_function[aoi-1] # - self.disadvantage_function(energy + self.h - cost_of_action)
new_aoi = 1
new_energy = min(self.B, energy + self.h - cost_of_action)
done = True
success = True
else:
new_aoi = 1
new_energy = 0
reward = self.reward_function[aoi - 1] - self.disadvantage_function(energy + self.h - cost_of_action)
done = True
elif action == 0:
new_aoi = min(aoi + 1, self.M)
new_energy = min(energy + self.h, self.B)
reward = self.reward_function[aoi - 1]
new_server_available = np.random.binomial(1, self.p_z)
# computation of next_state index
# next_state = new_server_available*(self.M*(self.B+1)) + (new_aoi-1)*(self.B+1) + new_energy
next_state = self.compute_index(new_aoi, new_energy, new_server_available)
return next_state, reward, done
def reset(self, training = False):
initial_state = dict({'x': 1, 'e': 0, 'index': 0})
# if training or False:
# initial_state['e'] = np.random.choice(self.B+1)
# initial_state['x'] = np.random.choice(self.M) + 1
initial_state['index'] = self.compute_state_index(initial_state['x'], initial_state['e'])
return initial_state