import numpy as npIn this article, I implemented a simple example for value and policy iteration from scratch in Python.
This article contains fewer explanations than usual. It is meant as a reference for other students getting into the subject, who are looking for a simple-as-possible reference implementation. The code can be improved quite a lot, but I wanted to provide these examples nonetheless instead of keeping them on my computer only.
Because implement the concepts from scratch, the only import we’ll use is NumPy.
1 MDP
In order to do value- and policy iteration, we’ll want to model the MDP. In this case, our “world” will be a 2D grid, where each cell has a reward value.
To make life easier, I’ll create a wrapper around the numpy array that contains the data, to introduce some convenient functions that simplify the code.
class Grid:
    def __init__(self, matrix):
        self.matrix = np.array(matrix)
    def get_val(self, state):
        r, c = self.coord_2_index(state)
        return self.matrix[r, c]
    def set_val(self, state, value):
        r, c = self.coord_2_index(state)
        self.matrix[r, c] = value
    def coord_2_index(self, state):
        x, y = state
        return len(self.matrix) - y, x - 1
    def equals(self, grid):
        return np.array_equal(self.matrix, grid.matrix)
    def display(self):
        print(self.matrix)To perform the updates using the Bellman equations, I’ll create another class modelling the MDP.
class Mdp:
    def __init__(self, states, actions, gamma):
        self.states = states
        self.actions = actions
        self.gamma = gamma
    def T(s1, a, s2):
        pass
    def R(s1, a, s2):
        passOur grid-world example has a few specific transitions (NESW) and a simple reward function. So I’ll create a new class that can model a grid-world of any size provided by the user.
class MdpGrid(Mdp):
    def __init__(self, grid, terminal_states, gamma):
        self.grid = grid
        self.terminal_states = terminal_states
        states = [(x, y) for x in range(1, len(self.grid.matrix) + 1) for y in range(1, len(self.grid.matrix[0]) + 1)]
        actions = ["N", "E", "S", "W"]
        super().__init__(states, actions, gamma)
    def T(self, s1, a, s2):
        if s1 in self.terminal_states:
            return 0
        transitions = {
            "N": (0, 1),
            "E": (1, 0),
            "S": (0, -1),
            "W": (-1, 0),
        }
        x, y = s1
        x2, y2 = s2
        dx, dy = transitions[a]  # Deterministic actions assumed
        return int((x + dx, y + dy) == s2)
    def R(self, s1, a, s2):
        return self.grid.get_val(s2)In this notebook, we’ll experiment with a simple 4x4 grid. It has two rewards and ends as soon as the agent collects one of them.

MdpGrid4x4 worldclass MdpGrid4x4(MdpGrid):
    def __init__(self, gamma):
        matrix = [
            [0, 0, 0, 1],
            [0, 0, 0, -1],
            [0, 0, 0, 0],
            [0, 0, 0, 0],
        ]
        grid = Grid(matrix)
        super().__init__(grid, [(4, 4), (4, 3)], gamma)2 Value Iteration
\[ V_{k+1}(s) \leftarrow \max_a \sum_{s'} T(s, a, s') \left[ R(s, a, s') + \gamma V_k(s') \right] \]
class ValueIteration():
    def __init__(self, mdp):
        self.mdp = mdp
    def V(self, s, k):
        if k <= 0: return 0
        values = []
        for s2 in self.mdp.states:
            for a in self.mdp.actions:
                # Calculate components
                t = self.mdp.T(s, a, s2)
                r = self.mdp.R(s, a, s2)
                v2 = self.V(s2, k - 1) if t > 0 else 0
                # Insert components into formula
                v = t * (r + self.mdp.gamma * v2)
                # Save to results
                values.append(v)
        return max(values)
    def run(self, k):
        shape = self.mdp.grid.matrix.shape
        values = Grid(np.zeros(shape))
        for s in self.mdp.states:
            values.set_val(s, np.around(self.V(s,k), 2))
        return values3 Policy Iteration
\[ V^{\pi_{i}}_{k+1}(s) \leftarrow \sum_{s'} T(s, \pi_{i}(s), s') \left[ R(s, \pi_{i}(s), s') + \gamma V^{\pi_{i}}_{k}(s') \right] \]
class PolicyIteration():
    def __init__(self, mdp):
        self.mdp = mdp
    def policy_evaluation(self, s, pi, k):
        if k <= 0:
            return 0
        value_sum = 0
        for s2 in self.mdp.states:
            a = pi.get_val(s)
            t = self.mdp.T(s, a, s2)
            r = self.mdp.R(s, a, s2)
            v2 = self.policy_evaluation(s2, pi, k - 1) if t > 0 else 0
            value_sum += t * (r + self.mdp.gamma * v2)
        return value_sum
    def policy_improvement(self, pi, eval_k):
        new_pi = Grid(pi.matrix.copy())
        for s in self.mdp.states:
            action_values = []
            for a in self.mdp.actions:
                pi_copy = Grid(new_pi.matrix.copy())
                pi_copy.set_val(s, a)
                v = self.policy_evaluation(s, pi_copy, eval_k)
                action_values.append((v, a))
            best_action = max(action_values, key=lambda x: x[0])[1]
            new_pi.set_val(s, best_action)
        return new_pi
    def run(self, pi, policy_iterations, value_iterations):
        iter_pi = pi
        for i in range(policy_iterations):
            new_pi = self.policy_improvement(iter_pi, value_iterations)
            if new_pi.equals(iter_pi):
                break
            iter_pi = new_pi
        return iter_pi4 Examples
Let’s initialize a new grid world and run policy and value iteration on both of them to see the results.
mdp_4x4 = MdpGrid4x4(0.95)value_iter = ValueIteration(mdp_4x4)
resulting_values = value_iter.run(5)
resulting_values.display()[[0.9  0.95 1.   0.  ]
 [0.86 0.9  0.95 0.  ]
 [0.81 0.86 0.9  0.86]
 [0.   0.81 0.86 0.81]]policy_iter = PolicyIteration(mdp_4x4)
my_pi_matrix = [["N" for _ in range(4)] for _ in range(4)]
my_pi = Grid(my_pi_matrix)
resulting_policy = policy_iter.run(my_pi, 15, 10)
resulting_policy.display()[['E' 'E' 'E' 'N']
 ['N' 'N' 'N' 'N']
 ['N' 'N' 'N' 'W']
 ['N' 'N' 'N' 'N']]