Value Iteration and Policy Iteration for Frozen Lake¶

Authors:¶

Yuhan Li and Ruoqing Zhu

Define the Frozen Lake Environment¶

We first define a simple environment called Frozen Lake (See detailed explanation here). Frozen lake involves crossing a frozen lake from start to goal without falling into any holes by walking over the frozen lake. For simplicity, we assume the player can always move in the intended direction (different from the original setting). For thr reward function, we assign

  • $R_t=-1$ when the player falls into the hole
  • $R_t=0$ when the player lands on frozen lake
  • $R_t=1$ when the player reaches the goal

We define a 4-by-4 grid, with holes and the goal visualized as below (Figure from gymnasium).

Frozen Lake

In [ ]:
import numpy as np

class FrozenLake:
    def __init__(self, grid_size=(4, 4), start=(0, 0), goal=None, holes=None):
        """
        Initializes the Frozen Lake environment.
        :param grid_size: Tuple specifying the dimensions of the lake (default is 4x4).
        :param start: Tuple indicating the starting position on the grid (default is top-left corner, (0, 0)).
        :param goal: Tuple indicating the goal position (default is bottom-right corner if not specified).
        :param holes: List of tuples marking the positions of holes on the grid where the player loses if they step.
        """
        self.grid_size = grid_size  # Dimensions of the grid
        self.start = start  # Starting position
        self.goal = goal if goal else (grid_size[0]-1, grid_size[1]-1)  # Goal position
        self.holes = holes if holes else [(1, 1), (2, 1), (2, 3), (3, 0)]  # Default hole positions
        self.state = start  # Current state of the agent
        self.done = False  # Flag to check if the episode has ended
        self.actions = {0: 'LEFT', 1: 'DOWN', 2: 'RIGHT', 3: 'UP'}  # Possible actions
        self.action_effects = {0: (0, -1), 1: (1, 0), 2: (0, 1), 3: (-1, 0)}  # Effects of each action on the state
        self.reset()  # Reset the game to the start

    def reset(self):
        """
        Resets the environment to the initial state.
        :return: The initial state after resetting.
        """
        self.state = self.start
        self.done = False
        return self.state

    def step(self, action):
        """
        Execute an action and update the state of the environment.
        :param action: Integer representing the direction to move.
        :return: A tuple of (new state, reward, done, info).
        """
        if self.done:
            return self.state, 0, True, {}

        # Calculate the new state after the action
        next_state = (self.state[0] + self.action_effects[action][0],
                      self.state[1] + self.action_effects[action][1])

        # Check if new state is within the grid bounds
        if (0 <= next_state[0] < self.grid_size[0]) and (0 <= next_state[1] < self.grid_size[1]):
            self.state = next_state

        # Determine the reward and if the game should end
        if self.state in self.holes:
            reward = -1  # Negative reward for falling into a hole
            self.done = True
        elif self.state == self.goal:
            reward = 1  # Positive reward for reaching the goal
            self.done = True
        else:
            reward = 0  # No reward for other moves

        return self.state, reward, self.done, {}

    def render(self):
        """
        Prints the current state of the grid with the agent's position, holes, start, and goal.
        """
        grid = np.array([['F' for _ in range(self.grid_size[1])] for _ in range(self.grid_size[0])])
        for hole in self.holes:
            grid[hole] = 'H'  # Mark holes on the grid
        grid[self.start] = 'S'  # Mark the start position
        grid[self.goal] = 'G'  # Mark the goal position
        grid[self.state] = 'A'  # Mark the agent's current position
        print("\n".join(["".join(row) for row in grid]))  # Display the grid

We provide a simple example on how the Frozen Lake environment works. 'S' refers to the starting point, 'H' refers to holes, 'G' refers to goal, 'F' refers to Frozen Lake (where the player could move), and 'A' refers to the current state the player is at.

In [ ]:
# Create an instance of the FrozenLake environment
frozen_lake = FrozenLake()

# Print the initial state
print("Initial state:")
frozen_lake.render()

# Define a sequence of actions
actions = [2, 2, 1, 1, 1,2]  # RIGHT, RIGHT, DOWN, DOWN, DOWN, RIGHT

# Execute the actions
for action in actions:
    new_state, reward, done, _ = frozen_lake.step(action)
    print(f"\nAction: {frozen_lake.actions[action]}")
    frozen_lake.render()
    print(f"New State: {new_state}, Reward: {reward}, Done: {done}")

    if done:
        if reward > 0:
            print("Reached the goal!")
        else:
            print("Fell into a hole!")
        break
Initial state:
AFFF
FHFF
FHFH
HFFG

Action: RIGHT
SAFF
FHFF
FHFH
HFFG
New State: (0, 1), Reward: 0, Done: False

Action: RIGHT
SFAF
FHFF
FHFH
HFFG
New State: (0, 2), Reward: 0, Done: False

Action: DOWN
SFFF
FHAF
FHFH
HFFG
New State: (1, 2), Reward: 0, Done: False

Action: DOWN
SFFF
FHFF
FHAH
HFFG
New State: (2, 2), Reward: 0, Done: False

Action: DOWN
SFFF
FHFF
FHFH
HFAG
New State: (3, 2), Reward: 0, Done: False

Action: RIGHT
SFFF
FHFF
FHFH
HFFA
New State: (3, 3), Reward: 1, Done: True
Reached the goal!

Value Iteration¶

As we have introduced in our lectures, the value iteration keeps a vector (in this case a matrix) of values corresponding to each state. Then using the Bellman Optimality Equation, we progressively update the value function as we take further actions.

In [ ]:
def value_iteration(env, gamma=0.99, theta=0.0001):
    """
    Performs value iteration to find the optimal value function for a given environment.
    :param env: An instance of an environment.
    :param gamma: Discount factor.
    :param theta: A small threshold for determining convergence of the value function.
    :return: A tuple of the optimal value function and the number of iterations performed.
    """
    V = np.zeros(env.grid_size)  # Initialize value function array with zeros for each state in the environment.
    policy = np.zeros(env.grid_size, dtype=int)  # Initialize the policy array with zeros for each state in the grid.
    iterations = 0  # Initialize counter for iterations

    while True:
        delta = 0  # Initialize delta, which tracks the maximum change in value function across all states in an iteration.

        iterations += 1  # Increment iteration counter

        # Loop over each state in the environment's state space
        for i in range(env.grid_size[0]):
            for j in range(env.grid_size[1]):
                current_value = V[i, j]  # Store the current value of V at state (i, j)
                action_values = []  # List to store the values of taking each action from the current state

                # Evaluate each action from the current state
                for action in range(4):  # Assuming four possible actions: LEFT, DOWN, RIGHT, UP
                    env.state = (i, j)  # Set the current state in the environment
                    env.done = (i, j) in env.holes or (i, j) == env.goal  # Check if the state is terminal
                    next_state, reward, done, _ = env.step(action)  # Take the action and observe the outcome

                    # Calculate the value of taking the action
                    if done:
                        action_values.append(reward)  # If next state is terminal, the value is just the reward
                    else:
                        # Calculate expected value considering future rewards discounted by gamma
                        action_values.append(reward + gamma * V[next_state])

                # Update the value function for the current state to the best action value
                V[i, j] = max(action_values)

                # Determine the best action for the current state by finding the action with the maximum value
                best_action = np.argmax(action_values)
                policy[i, j] = best_action  # Update the policy with the best action

                # Track the largest change in the value function
                delta = max(delta, abs(current_value - V[i, j]))

        # If the maximum change in the value function is below the threshold, stop iterating
        if delta < theta:
            break

    # Mapping numeric actions to arrow symbols for readability
    action_mappings = {0: '←', 1: '↓', 2: '→', 3: '↑'}
    optimal_policy_readable = np.vectorize(action_mappings.get)(policy)  # Convert numeric actions to symbols

    return V, optimal_policy_readable, iterations

# Create an instance of the FrozenLake environment
env = FrozenLake()

# Run value iteration
optimal_value_function, optimal_policy, num_iterations = value_iteration(env)
print("Final Estimated Value:\n\n", optimal_value_function)
print("\nOptimal Policy:\n\n", optimal_policy)
print("\nTotal number of iterations:\n\n", num_iterations)
Final Estimated Value:

 [[0.95099005 0.96059601 0.970299   0.96059601]
 [0.94148015 0.         0.9801     0.970299  ]
 [0.93206535 0.         0.99       0.        ]
 [0.         0.99       1.         0.        ]]

Optimal Policy:

 [['→' '→' '↓' '←']
 ['↑' '←' '↓' '←']
 ['↑' '←' '↓' '←']
 ['←' '→' '→' '←']]

Total number of iterations:

 7

Policy Iteration Algorithm¶

We now proceed to the policy iteration. Recall that we will fully evaluate the policy at each iteration and then try to improve the policy by applying a step of policy improvement using the Bellman Optimality Equation.

In [ ]:
def policy_evaluation(policy, env, V, gamma=0.99, theta=0.0001):
    while True:
        delta = 0
        for i in range(env.grid_size[0]):
            for j in range(env.grid_size[1]):
                v = V[i, j]
                env.state = (i, j)
                env.done = (i, j) in env.holes or (i, j) == env.goal
                next_state, reward, done, _ = env.step(policy[i, j])
                V[i, j] = reward + gamma * V[next_state] if not done else reward   # update value function based on reward and value of next state
                delta = max(delta, abs(v - V[i, j]))   # update delta to check for convergence
        if delta < theta:
            break
    return V

def policy_improvement(env, V, gamma=0.99):
    policy = np.zeros(env.grid_size, dtype=int)
    for i in range(env.grid_size[0]):
        for j in range(env.grid_size[1]):
            action_values = []
            for a in range(4):
                env.state = (i, j)
                env.done = (i, j) in env.holes or (i, j) == env.goal
                next_state, reward, done, _ = env.step(a)
                action_values.append(reward + gamma * V[next_state] if not done else reward)
            best_action = np.argmax(action_values)    # choose the action with the highest value
            policy[i, j] = best_action                # update the policy
    return policy

def policy_iteration(env, gamma=0.99, theta=0.0001):
    policy = np.zeros(env.grid_size, dtype=int)  # start with a random policy
    V = np.zeros(env.grid_size)
    iter = 0
    while True:
        iter +=1
        V = policy_evaluation(policy, env, V, gamma, theta)   # evaluate the current policy
        new_policy = policy_improvement(env, V, gamma) # update the policy based on the evaluated value function
        if np.array_equal(new_policy, policy):
            break
        policy = new_policy
    return V, policy, iter

# Initialize the FrozenLake environment
env = FrozenLake()

# Run policy iteration
V, optimal_policy, iter = policy_iteration(env)

# Convert numeric actions to directions for readability
action_mappings = {0: '←', 1: '↓', 2: '→', 3: '↑'}
optimal_policy_readable = np.vectorize(action_mappings.get)(optimal_policy)

# Run value iteration
optimal_value_function, optimal_policy, num_iterations = value_iteration(env)
print("Final Estimated Value:\n\n", optimal_value_function)
print("\nOptimal Policy:\n\n", optimal_policy)
print("\nTotal number of iterations:\n\n", num_iterations)
Final Estimated Value:

 [[0.95099005 0.96059601 0.970299   0.96059601]
 [0.94148015 0.         0.9801     0.970299  ]
 [0.93206535 0.         0.99       0.        ]
 [0.         0.99       1.         0.        ]]

Optimal Policy:

 [['→' '→' '↓' '←']
 ['↑' '←' '↓' '←']
 ['↑' '←' '↓' '←']
 ['←' '→' '→' '←']]

Total number of iterations:

 7