Value Iteration and Policy Iteration for Frozen Lake¶
Authors:¶
Yuhan Li and Ruoqing Zhu
Define the Frozen Lake Environment¶
We first define a simple environment called Frozen Lake (See detailed explanation here). Frozen lake involves crossing a frozen lake from start to goal without falling into any holes by walking over the frozen lake. For simplicity, we assume the player can always move in the intended direction (different from the original setting). For thr reward function, we assign
- $R_t=-1$ when the player falls into the hole
- $R_t=0$ when the player lands on frozen lake
- $R_t=1$ when the player reaches the goal
We define a 4-by-4 grid, with holes and the goal visualized as below (Figure from gymnasium).
import numpy as np
class FrozenLake:
def __init__(self, grid_size=(4, 4), start=(0, 0), goal=None, holes=None):
"""
Initializes the Frozen Lake environment.
:param grid_size: Tuple specifying the dimensions of the lake (default is 4x4).
:param start: Tuple indicating the starting position on the grid (default is top-left corner, (0, 0)).
:param goal: Tuple indicating the goal position (default is bottom-right corner if not specified).
:param holes: List of tuples marking the positions of holes on the grid where the player loses if they step.
"""
self.grid_size = grid_size # Dimensions of the grid
self.start = start # Starting position
self.goal = goal if goal else (grid_size[0]-1, grid_size[1]-1) # Goal position
self.holes = holes if holes else [(1, 1), (2, 1), (2, 3), (3, 0)] # Default hole positions
self.state = start # Current state of the agent
self.done = False # Flag to check if the episode has ended
self.actions = {0: 'LEFT', 1: 'DOWN', 2: 'RIGHT', 3: 'UP'} # Possible actions
self.action_effects = {0: (0, -1), 1: (1, 0), 2: (0, 1), 3: (-1, 0)} # Effects of each action on the state
self.reset() # Reset the game to the start
def reset(self):
"""
Resets the environment to the initial state.
:return: The initial state after resetting.
"""
self.state = self.start
self.done = False
return self.state
def step(self, action):
"""
Execute an action and update the state of the environment.
:param action: Integer representing the direction to move.
:return: A tuple of (new state, reward, done, info).
"""
if self.done:
return self.state, 0, True, {}
# Calculate the new state after the action
next_state = (self.state[0] + self.action_effects[action][0],
self.state[1] + self.action_effects[action][1])
# Check if new state is within the grid bounds
if (0 <= next_state[0] < self.grid_size[0]) and (0 <= next_state[1] < self.grid_size[1]):
self.state = next_state
# Determine the reward and if the game should end
if self.state in self.holes:
reward = -1 # Negative reward for falling into a hole
self.done = True
elif self.state == self.goal:
reward = 1 # Positive reward for reaching the goal
self.done = True
else:
reward = 0 # No reward for other moves
return self.state, reward, self.done, {}
def render(self):
"""
Prints the current state of the grid with the agent's position, holes, start, and goal.
"""
grid = np.array([['F' for _ in range(self.grid_size[1])] for _ in range(self.grid_size[0])])
for hole in self.holes:
grid[hole] = 'H' # Mark holes on the grid
grid[self.start] = 'S' # Mark the start position
grid[self.goal] = 'G' # Mark the goal position
grid[self.state] = 'A' # Mark the agent's current position
print("\n".join(["".join(row) for row in grid])) # Display the grid
We provide a simple example on how the Frozen Lake environment works. 'S' refers to the starting point, 'H' refers to holes, 'G' refers to goal, 'F' refers to Frozen Lake (where the player could move), and 'A' refers to the current state the player is at.
# Create an instance of the FrozenLake environment
frozen_lake = FrozenLake()
# Print the initial state
print("Initial state:")
frozen_lake.render()
# Define a sequence of actions
actions = [2, 2, 1, 1, 1,2] # RIGHT, RIGHT, DOWN, DOWN, DOWN, RIGHT
# Execute the actions
for action in actions:
new_state, reward, done, _ = frozen_lake.step(action)
print(f"\nAction: {frozen_lake.actions[action]}")
frozen_lake.render()
print(f"New State: {new_state}, Reward: {reward}, Done: {done}")
if done:
if reward > 0:
print("Reached the goal!")
else:
print("Fell into a hole!")
break
Initial state: AFFF FHFF FHFH HFFG Action: RIGHT SAFF FHFF FHFH HFFG New State: (0, 1), Reward: 0, Done: False Action: RIGHT SFAF FHFF FHFH HFFG New State: (0, 2), Reward: 0, Done: False Action: DOWN SFFF FHAF FHFH HFFG New State: (1, 2), Reward: 0, Done: False Action: DOWN SFFF FHFF FHAH HFFG New State: (2, 2), Reward: 0, Done: False Action: DOWN SFFF FHFF FHFH HFAG New State: (3, 2), Reward: 0, Done: False Action: RIGHT SFFF FHFF FHFH HFFA New State: (3, 3), Reward: 1, Done: True Reached the goal!
Value Iteration¶
As we have introduced in our lectures, the value iteration keeps a vector (in this case a matrix) of values corresponding to each state. Then using the Bellman Optimality Equation, we progressively update the value function as we take further actions.
def value_iteration(env, gamma=0.99, theta=0.0001):
"""
Performs value iteration to find the optimal value function for a given environment.
:param env: An instance of an environment.
:param gamma: Discount factor.
:param theta: A small threshold for determining convergence of the value function.
:return: A tuple of the optimal value function and the number of iterations performed.
"""
V = np.zeros(env.grid_size) # Initialize value function array with zeros for each state in the environment.
policy = np.zeros(env.grid_size, dtype=int) # Initialize the policy array with zeros for each state in the grid.
iterations = 0 # Initialize counter for iterations
while True:
delta = 0 # Initialize delta, which tracks the maximum change in value function across all states in an iteration.
iterations += 1 # Increment iteration counter
# Loop over each state in the environment's state space
for i in range(env.grid_size[0]):
for j in range(env.grid_size[1]):
current_value = V[i, j] # Store the current value of V at state (i, j)
action_values = [] # List to store the values of taking each action from the current state
# Evaluate each action from the current state
for action in range(4): # Assuming four possible actions: LEFT, DOWN, RIGHT, UP
env.state = (i, j) # Set the current state in the environment
env.done = (i, j) in env.holes or (i, j) == env.goal # Check if the state is terminal
next_state, reward, done, _ = env.step(action) # Take the action and observe the outcome
# Calculate the value of taking the action
if done:
action_values.append(reward) # If next state is terminal, the value is just the reward
else:
# Calculate expected value considering future rewards discounted by gamma
action_values.append(reward + gamma * V[next_state])
# Update the value function for the current state to the best action value
V[i, j] = max(action_values)
# Determine the best action for the current state by finding the action with the maximum value
best_action = np.argmax(action_values)
policy[i, j] = best_action # Update the policy with the best action
# Track the largest change in the value function
delta = max(delta, abs(current_value - V[i, j]))
# If the maximum change in the value function is below the threshold, stop iterating
if delta < theta:
break
# Mapping numeric actions to arrow symbols for readability
action_mappings = {0: '←', 1: '↓', 2: '→', 3: '↑'}
optimal_policy_readable = np.vectorize(action_mappings.get)(policy) # Convert numeric actions to symbols
return V, optimal_policy_readable, iterations
# Create an instance of the FrozenLake environment
env = FrozenLake()
# Run value iteration
optimal_value_function, optimal_policy, num_iterations = value_iteration(env)
print("Final Estimated Value:\n\n", optimal_value_function)
print("\nOptimal Policy:\n\n", optimal_policy)
print("\nTotal number of iterations:\n\n", num_iterations)
Final Estimated Value: [[0.95099005 0.96059601 0.970299 0.96059601] [0.94148015 0. 0.9801 0.970299 ] [0.93206535 0. 0.99 0. ] [0. 0.99 1. 0. ]] Optimal Policy: [['→' '→' '↓' '←'] ['↑' '←' '↓' '←'] ['↑' '←' '↓' '←'] ['←' '→' '→' '←']] Total number of iterations: 7
Policy Iteration Algorithm¶
We now proceed to the policy iteration. Recall that we will fully evaluate the policy at each iteration and then try to improve the policy by applying a step of policy improvement using the Bellman Optimality Equation.
def policy_evaluation(policy, env, V, gamma=0.99, theta=0.0001):
while True:
delta = 0
for i in range(env.grid_size[0]):
for j in range(env.grid_size[1]):
v = V[i, j]
env.state = (i, j)
env.done = (i, j) in env.holes or (i, j) == env.goal
next_state, reward, done, _ = env.step(policy[i, j])
V[i, j] = reward + gamma * V[next_state] if not done else reward # update value function based on reward and value of next state
delta = max(delta, abs(v - V[i, j])) # update delta to check for convergence
if delta < theta:
break
return V
def policy_improvement(env, V, gamma=0.99):
policy = np.zeros(env.grid_size, dtype=int)
for i in range(env.grid_size[0]):
for j in range(env.grid_size[1]):
action_values = []
for a in range(4):
env.state = (i, j)
env.done = (i, j) in env.holes or (i, j) == env.goal
next_state, reward, done, _ = env.step(a)
action_values.append(reward + gamma * V[next_state] if not done else reward)
best_action = np.argmax(action_values) # choose the action with the highest value
policy[i, j] = best_action # update the policy
return policy
def policy_iteration(env, gamma=0.99, theta=0.0001):
policy = np.zeros(env.grid_size, dtype=int) # start with a random policy
V = np.zeros(env.grid_size)
iter = 0
while True:
iter +=1
V = policy_evaluation(policy, env, V, gamma, theta) # evaluate the current policy
new_policy = policy_improvement(env, V, gamma) # update the policy based on the evaluated value function
if np.array_equal(new_policy, policy):
break
policy = new_policy
return V, policy, iter
# Initialize the FrozenLake environment
env = FrozenLake()
# Run policy iteration
V, optimal_policy, iter = policy_iteration(env)
# Convert numeric actions to directions for readability
action_mappings = {0: '←', 1: '↓', 2: '→', 3: '↑'}
optimal_policy_readable = np.vectorize(action_mappings.get)(optimal_policy)
# Run value iteration
optimal_value_function, optimal_policy, num_iterations = value_iteration(env)
print("Final Estimated Value:\n\n", optimal_value_function)
print("\nOptimal Policy:\n\n", optimal_policy)
print("\nTotal number of iterations:\n\n", num_iterations)
Final Estimated Value: [[0.95099005 0.96059601 0.970299 0.96059601] [0.94148015 0. 0.9801 0.970299 ] [0.93206535 0. 0.99 0. ] [0. 0.99 1. 0. ]] Optimal Policy: [['→' '→' '↓' '←'] ['↑' '←' '↓' '←'] ['↑' '←' '↓' '←'] ['←' '→' '→' '←']] Total number of iterations: 7