import numpy as np
In this article, I implemented a simple example for value and policy iteration from scratch in Python.
This article contains fewer explanations than usual. It is meant as a reference for other students getting into the subject, who are looking for a simple-as-possible reference implementation. The code can be improved quite a lot, but I wanted to provide these examples nonetheless instead of keeping them on my computer only.
Because implement the concepts from scratch, the only import
we’ll use is NumPy.
1 MDP
In order to do value- and policy iteration, we’ll want to model the MDP. In this case, our “world” will be a 2D grid, where each cell has a reward value.
To make life easier, I’ll create a wrapper around the numpy array that contains the data, to introduce some convenient functions that simplify the code.
class Grid:
def __init__(self, matrix):
self.matrix = np.array(matrix)
def get_val(self, state):
= self.coord_2_index(state)
r, c return self.matrix[r, c]
def set_val(self, state, value):
= self.coord_2_index(state)
r, c self.matrix[r, c] = value
def coord_2_index(self, state):
= state
x, y return len(self.matrix) - y, x - 1
def equals(self, grid):
return np.array_equal(self.matrix, grid.matrix)
def display(self):
print(self.matrix)
To perform the updates using the Bellman equations, I’ll create another class modelling the MDP.
class Mdp:
def __init__(self, states, actions, gamma):
self.states = states
self.actions = actions
self.gamma = gamma
def T(s1, a, s2):
pass
def R(s1, a, s2):
pass
Our grid-world example has a few specific transitions (NESW) and a simple reward function. So I’ll create a new class that can model a grid-world of any size provided by the user.
class MdpGrid(Mdp):
def __init__(self, grid, terminal_states, gamma):
self.grid = grid
self.terminal_states = terminal_states
= [(x, y) for x in range(1, len(self.grid.matrix) + 1) for y in range(1, len(self.grid.matrix[0]) + 1)]
states = ["N", "E", "S", "W"]
actions
super().__init__(states, actions, gamma)
def T(self, s1, a, s2):
if s1 in self.terminal_states:
return 0
= {
transitions "N": (0, 1),
"E": (1, 0),
"S": (0, -1),
"W": (-1, 0),
}= s1
x, y = s2
x2, y2 = transitions[a] # Deterministic actions assumed
dx, dy return int((x + dx, y + dy) == s2)
def R(self, s1, a, s2):
return self.grid.get_val(s2)
In this notebook, we’ll experiment with a simple 4x4
grid. It has two rewards and ends as soon as the agent collects one of them.
class MdpGrid4x4(MdpGrid):
def __init__(self, gamma):
= [
matrix 0, 0, 0, 1],
[0, 0, 0, -1],
[0, 0, 0, 0],
[0, 0, 0, 0],
[
]= Grid(matrix)
grid super().__init__(grid, [(4, 4), (4, 3)], gamma)
2 Value Iteration
\[ V_{k+1}(s) \leftarrow \max_a \sum_{s'} T(s, a, s') \left[ R(s, a, s') + \gamma V_k(s') \right] \]
class ValueIteration():
def __init__(self, mdp):
self.mdp = mdp
def V(self, s, k):
if k <= 0: return 0
= []
values
for s2 in self.mdp.states:
for a in self.mdp.actions:
# Calculate components
= self.mdp.T(s, a, s2)
t = self.mdp.R(s, a, s2)
r = self.V(s2, k - 1) if t > 0 else 0
v2
# Insert components into formula
= t * (r + self.mdp.gamma * v2)
v
# Save to results
values.append(v)
return max(values)
def run(self, k):
= self.mdp.grid.matrix.shape
shape = Grid(np.zeros(shape))
values
for s in self.mdp.states:
self.V(s,k), 2))
values.set_val(s, np.around(
return values
3 Policy Iteration
\[ V^{\pi_{i}}_{k+1}(s) \leftarrow \sum_{s'} T(s, \pi_{i}(s), s') \left[ R(s, \pi_{i}(s), s') + \gamma V^{\pi_{i}}_{k}(s') \right] \]
class PolicyIteration():
def __init__(self, mdp):
self.mdp = mdp
def policy_evaluation(self, s, pi, k):
if k <= 0:
return 0
= 0
value_sum for s2 in self.mdp.states:
= pi.get_val(s)
a = self.mdp.T(s, a, s2)
t = self.mdp.R(s, a, s2)
r = self.policy_evaluation(s2, pi, k - 1) if t > 0 else 0
v2 += t * (r + self.mdp.gamma * v2)
value_sum return value_sum
def policy_improvement(self, pi, eval_k):
= Grid(pi.matrix.copy())
new_pi for s in self.mdp.states:
= []
action_values for a in self.mdp.actions:
= Grid(new_pi.matrix.copy())
pi_copy
pi_copy.set_val(s, a)= self.policy_evaluation(s, pi_copy, eval_k)
v
action_values.append((v, a))
= max(action_values, key=lambda x: x[0])[1]
best_action
new_pi.set_val(s, best_action)
return new_pi
def run(self, pi, policy_iterations, value_iterations):
= pi
iter_pi for i in range(policy_iterations):
= self.policy_improvement(iter_pi, value_iterations)
new_pi if new_pi.equals(iter_pi):
break
= new_pi
iter_pi return iter_pi
4 Examples
Let’s initialize a new grid world and run policy and value iteration on both of them to see the results.
= MdpGrid4x4(0.95) mdp_4x4
= ValueIteration(mdp_4x4)
value_iter
= value_iter.run(5)
resulting_values resulting_values.display()
[[0.9 0.95 1. 0. ]
[0.86 0.9 0.95 0. ]
[0.81 0.86 0.9 0.86]
[0. 0.81 0.86 0.81]]
= PolicyIteration(mdp_4x4)
policy_iter
= [["N" for _ in range(4)] for _ in range(4)]
my_pi_matrix = Grid(my_pi_matrix)
my_pi
= policy_iter.run(my_pi, 15, 10)
resulting_policy resulting_policy.display()
[['E' 'E' 'E' 'N']
['N' 'N' 'N' 'N']
['N' 'N' 'N' 'W']
['N' 'N' 'N' 'N']]