-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTF_Gridworld.py
More file actions
198 lines (165 loc) · 6.93 KB
/
TF_Gridworld.py
File metadata and controls
198 lines (165 loc) · 6.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# TF-Agents tensorflow environment boilerplate from https://www.tensorflow.org/agents/tutorials/2_environments_tutorial
from typing import Optional, Text
import numpy as np
import scipy
import pygame
import tf_agents.typing.types as types
from pygame.color import Color
from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts
agent_mode = 0
class TFGridWorld(py_environment.PyEnvironment):
def __init__(self, rows=5, cols=5, terminal_idx=24, walls=(), agent_start_idx=0, reward_value=10):
if terminal_idx in walls:
raise ValueError("Goal state is contained within a wall...")
if agent_start_idx in walls:
raise ValueError("Agent would start in a wall...")
super().__init__()
self._action_spec = array_spec.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=3, name='action'
)
self._observation_spec = array_spec.BoundedArraySpec(
shape=(rows*cols,), dtype=np.int32, minimum=0, maximum=1, name='observation'
)
self._state = np.zeros(rows*cols, dtype=np.int32)
self._agent_pos = agent_start_idx
self._state[agent_start_idx] = 1
self._agent_start_idx = agent_start_idx
self._episode_ended = False
self._terminal_idx = terminal_idx
self._rewards = np.zeros((rows * cols), dtype=np.int32) - 1
if terminal_idx >= 0: # allow the user to set terminal_idx = -1 if no goal is on map.
self._rewards[terminal_idx] = reward_value
self._actions = np.array([rows, 1, -rows, -1])
self._cols = np.int32(cols)
self._rows = np.int32(rows)
self._walls = walls
self._window = None
def action_spec(self) -> types.NestedArraySpec:
return self._action_spec
def observation_spec(self) -> types.NestedArraySpec:
return self._observation_spec
def _reset(self) -> ts.TimeStep:
self._state[:] = 0
self._state[self._agent_start_idx] = 1
self._agent_pos = self._agent_start_idx
self._episode_ended = False
return ts.restart(observation=self._state)
def _step(self, action):
if self._episode_ended:
return self.reset()
# c_pos = np.where(self._state == 1)[0][0]
c_pos = self._agent_pos
new_pos = self.apply_delta(c_pos, self._actions[action])
self._episode_ended = self._terminal_idx == new_pos
self._state[c_pos] = 0
self._state[new_pos] = 1
c_pos = new_pos
self._agent_pos = c_pos
if self._episode_ended:
return ts.termination(observation=self._state,
reward=self._rewards[c_pos])
else:
return ts.transition(observation=self._state,
reward=self._rewards[c_pos],
discount=1.0)
def render(self, mode: Text = 'rgb_array') -> Optional[types.NestedArray]:
global agent_mode
if mode == 'none':
return
window_width = window_height = 800
if self._window is None:
self._window = pygame.display.set_mode((window_width, window_height))
w = self._window
if self._episode_ended:
w.fill((0, 0, 0))
pygame.display.update()
return None
green = Color('green')
grey = Color('grey')
black = Color('black')
white = Color('white')
red = Color('red')
blue = Color('blue')
brown = Color('brown')
c_w = window_width/self._cols # column width
r_w = window_height/self._rows # row width
on_grey = True
for i in range(self._rows):
for j in range(self._cols):
idx = (self._rows * self._cols) + i - ((j+1) * self._cols)
c_box = (i * c_w, j * r_w, (i+1) * c_w, (j+1) * r_w) # current box coords.
if idx == self._agent_pos:
w.fill(red if agent_mode == 0 else blue, c_box) # blue = in-option
elif idx == self._terminal_idx:
w.fill(green, c_box)
elif idx == self._agent_start_idx:
w.fill(brown, c_box)
elif idx in self._walls:
w.fill(black, c_box)
else:
w.fill(grey if on_grey else white, c_box)
on_grey = not on_grey
pygame.display.update()
return None
def get_shape(self):
return self._rows, self._cols
def game_over(self):
return self._episode_ended
def apply_delta(self, pos, delta) -> int:
# if we are in the left-most column, don't try to go left
if pos / self._cols == pos // self._cols and delta == -1:
return pos
# if we are in the right-most column, don't try to go right
elif (pos % self._cols) == self._cols - 1 and delta == 1:
return pos
# else make sure we are not going too far down/up
elif not 0 <= pos + delta < self._cols * self._rows:
return pos
# wall check
if pos+delta in self._walls:
return pos
return pos + delta
def get_adjacent_cells(self, pos):
if pos in self._walls:
return []
deltas = self._actions
adjacents = np.zeros_like(self._actions)
for idx, delta in enumerate(deltas):
adjacents[idx] = self.apply_delta(pos, delta)
return adjacents
def get_degree_matrix(self):
state_map = np.array([x for x in range(self._cols * self._rows)], dtype=np.int32)
degree_matrix = np.zeros_like(state_map, dtype=np.int32)
for idx in range(state_map.shape[0]):
degree_matrix[idx] = np.sum(self.get_adjacent_cells(idx) != idx)
return degree_matrix
def get_adjacency_matrix(self):
matrix_width = self._cols * self._rows
adjacency_matrix = np.zeros((matrix_width, matrix_width), dtype=np.int32)
for i in range(matrix_width):
for j in range(matrix_width):
if i == j:
adjacency_matrix[i, j] = 0
else:
if j in self.get_adjacent_cells(i):
adjacency_matrix[i, j] = 1
return adjacency_matrix
def get_laplacian(self):
deg = np.diag(self.get_degree_matrix())
adj = self.get_adjacency_matrix()
return deg - adj
def get_eigenvalues(self, framework='scipy'):
lap = self.get_laplacian()
if framework == 'numpy':
return np.linalg.eig(lap)
elif framework == 'scipy':
return scipy.linalg.eig(lap)
def test_matrix_functions(self):
deg = self.get_degree_matrix()
adj = self.get_adjacency_matrix()
for i in range(self._rows*self._cols):
assert deg[i] == np.sum(adj[i])
def get_walls(self):
return self._walls