用Python手把手实现蒙特卡洛强化学习:从网格世界到实战代码
用Python手把手实现蒙特卡洛强化学习从网格世界到实战代码强化学习作为机器学习的重要分支近年来在游戏AI、机器人控制、金融交易等领域展现出惊人潜力。而蒙特卡洛方法作为强化学习中的经典算法以其直观性和有效性备受开发者青睐。本文将带你从零开始用Python实现一个完整的蒙特卡洛强化学习系统通过构建网格世界环境逐步实现智能体的训练过程。1. 环境搭建与基础概念在开始编码之前我们需要明确几个核心概念。蒙特卡洛强化学习Monte Carlo Reinforcement Learning是一种基于完整回合Episode采样来估计价值函数的方法。与动态规划不同它不需要环境的完整模型而是通过实际交互来学习。让我们先搭建一个4×4的网格世界环境import numpy as np import matplotlib.pyplot as plt class GridWorld: def __init__(self, size4): self.size size self.terminal_states [(0,0), (0,size-1), (size-1,0), (size-1,size-1)] self.actions [up, down, left, right] self.state_values np.zeros((size, size)) self.action_values np.zeros((size, size, len(self.actions))) def reset(self): 重置环境返回随机初始状态 while True: state (np.random.randint(self.size), np.random.randint(self.size)) if state not in self.terminal_states: return state def step(self, state, action): 执行动作并返回新状态和奖励 if state in self.terminal_states: return state, 0, True if action up: new_state (max(state[0]-1, 0), state[1]) elif action down: new_state (min(state[0]1, self.size-1), state[1]) elif action left: new_state (state[0], max(state[1]-1, 0)) else: # right new_state (state[0], min(state[1]1, self.size-1)) reward -1 if new_state not in self.terminal_states else 0 done new_state in self.terminal_states return new_state, reward, done这个网格世界环境有几个关键特点四个角落是终止状态每次移动获得-1的奖励鼓励智能体尽快到达终点碰到边界会停留在原地非终止状态可以执行上下左右四个动作提示在实际项目中环境类通常会设计得更通用支持自定义大小、终止状态和奖励函数。这里我们保持简单以专注于算法实现。2. 基础蒙特卡洛预测算法实现蒙特卡洛预测的目标是估计给定策略下的状态价值函数。我们首先实现最简单的首次访问型蒙特卡洛预测算法。def mc_prediction_first_visit(env, num_episodes10000, gamma1.0): 首次访问型蒙特卡洛预测算法 returns_sum np.zeros((env.size, env.size)) returns_count np.zeros((env.size, env.size)) state_values np.zeros((env.size, env.size)) for _ in range(num_episodes): episode [] state env.reset() done False # 生成一个完整回合 while not done: action np.random.choice(env.actions) # 随机策略 next_state, reward, done env.step(state, action) episode.append((state, action, reward)) state next_state # 计算回报并更新状态价值 G 0 visited_states set() for t in reversed(range(len(episode))): state, _, reward episode[t] G gamma * G reward if state not in visited_states: # 首次访问 visited_states.add(state) returns_sum[state] G returns_count[state] 1 state_values[state] returns_sum[state] / returns_count[state] return state_values这个实现有几个关键点需要注意我们使用首次访问型更新即每个状态在一个回合中只更新一次回报G是从后向前计算的考虑了折扣因子gamma采用增量式更新避免存储所有回合数据让我们运行这个算法并可视化结果env GridWorld() state_values mc_prediction_first_visit(env) plt.figure(figsize(8,6)) plt.imshow(state_values, cmaphot, interpolationnearest) plt.colorbar() plt.title(State Values under Random Policy) plt.show()你会看到一个热力图显示了在随机策略下各个状态的价值。靠近终止状态的位置价值较高负值较小因为它们能更快到达终点。3. 蒙特卡洛控制算法进阶仅仅预测状态价值还不够我们的目标是找到最优策略。下面实现蒙特卡洛控制算法逐步优化策略。3.1 探索性起始蒙特卡洛控制def mc_control_exploring_starts(env, num_episodes10000, gamma1.0): 探索性起始蒙特卡洛控制算法 action_values np.zeros((env.size, env.size, len(env.actions))) returns_sum np.zeros((env.size, env.size, len(env.actions))) returns_count np.zeros((env.size, env.size, len(env.actions))) policy np.random.choice(env.actions, size(env.size, env.size)) for _ in range(num_episodes): # 探索性起始随机选择状态和动作 state (np.random.randint(env.size), np.random.randint(env.size)) action_idx np.random.randint(len(env.actions)) action env.actions[action_idx] # 生成完整回合 episode [] done False while not done: next_state, reward, done env.step(state, action) episode.append((state, action, reward)) state next_state if not done: action policy[state] # 按当前策略选择动作 # 更新动作价值函数和策略 G 0 visited_pairs set() for t in reversed(range(len(episode))): state, action, reward episode[t] action_idx env.actions.index(action) G gamma * G reward if (state, action) not in visited_pairs: # 首次访问 visited_pairs.add((state, action)) returns_sum[state][action_idx] G returns_count[state][action_idx] 1 action_values[state][action_idx] returns_sum[state][action_idx] / returns_count[state][action_idx] # 更新策略为贪心策略 best_action_idx np.argmax(action_values[state]) policy[state] env.actions[best_action_idx] return policy, action_values这个算法通过探索性起始确保所有状态-动作对都能被充分探索。每次迭代中随机选择起始状态和动作按当前策略生成完整回合反向计算回报并更新动作价值将策略改进为对当前动作价值的贪心策略3.2 ε-贪心蒙特卡洛控制探索性起始在实际中可能难以实现我们更常用ε-贪心策略def mc_control_epsilon_greedy(env, num_episodes10000, gamma1.0, epsilon0.1): ε-贪心蒙特卡洛控制算法 action_values np.zeros((env.size, env.size, len(env.actions))) returns_sum np.zeros((env.size, env.size, len(env.actions))) returns_count np.zeros((env.size, env.size, len(env.actions))) def choose_action(state): if np.random.rand() epsilon: return np.random.choice(env.actions) else: return env.actions[np.argmax(action_values[state])] for _ in range(num_episodes): state env.reset() done False episode [] # 生成完整回合 while not done: action choose_action(state) next_state, reward, done env.step(state, action) episode.append((state, action, reward)) state next_state # 更新动作价值函数 G 0 visited_pairs set() for t in reversed(range(len(episode))): state, action, reward episode[t] action_idx env.actions.index(action) G gamma * G reward if (state, action) not in visited_pairs: visited_pairs.add((state, action)) returns_sum[state][action_idx] G returns_count[state][action_idx] 1 action_values[state][action_idx] returns_sum[state][action_idx] / returns_count[state][action_idx] # 最终策略是贪心策略 policy np.empty((env.size, env.size), dtypeobject) for i in range(env.size): for j in range(env.size): policy[i,j] env.actions[np.argmax(action_values[i,j])] return policy, action_valuesε-贪心策略在探索和利用之间取得平衡以1-ε概率选择当前最优动作利用以ε概率随机选择动作探索4. 算法优化与性能分析现在我们已经实现了基本的蒙特卡洛算法接下来探讨几个优化方向4.1 增量式实现与常数步长之前的实现使用了增量式更新但采用了算术平均。对于非平稳问题可以使用常数步长alpha 0.1 # 学习率 action_values[state][action_idx] alpha * (G - action_values[state][action_idx])4.2 动态调整ε值训练初期需要更多探索后期可以减少探索initial_epsilon 1.0 min_epsilon 0.01 decay_rate 0.999 epsilon max(min_epsilon, initial_epsilon * (decay_rate ** episode_num))4.3 价值函数可视化让我们可视化训练过程中的价值函数变化def plot_learning_curve(env, num_episodes1000): 绘制学习曲线 action_values np.zeros((env.size, env.size, len(env.actions))) returns_sum np.zeros((env.size, env.size, len(env.actions))) returns_count np.zeros((env.size, env.size, len(env.actions))) history [] for episode in range(1, num_episodes1): # ε-贪心策略生成回合和更新代码同上 ... # 记录每100回合的平均价值 if episode % 100 0: history.append(np.mean(action_values)) plt.plot(np.arange(1, num_episodes1, 100), history) plt.xlabel(Episodes) plt.ylabel(Average Action Value) plt.title(Learning Curve) plt.show()4.4 策略评估训练完成后我们可以评估策略效果def evaluate_policy(env, policy, num_episodes100): 评估策略性能 total_rewards 0 steps_per_episode [] for _ in range(num_episodes): state env.reset() done False steps 0 episode_reward 0 while not done: action policy[state] state, reward, done env.step(state, action) episode_reward reward steps 1 total_rewards episode_reward steps_per_episode.append(steps) avg_reward total_rewards / num_episodes avg_steps np.mean(steps_per_episode) print(fAverage reward per episode: {avg_reward:.2f}) print(fAverage steps per episode: {avg_steps:.2f}) return avg_reward, avg_steps在实际项目中我发现ε值的选择对训练效果影响很大。过大的ε会导致策略难以收敛过小则可能陷入局部最优。一个实用的技巧是开始时使用较大的ε如0.5然后随着训练逐渐衰减到较小的值如0.01。