强化学习:从DQN到PPO
强化学习从DQN到PPO1. 强化学习基础强化学习Reinforcement Learning, RL是机器学习的一个重要分支它关注智能体Agent如何在环境中通过与环境的交互学习最优行为策略以最大化累积奖励。1.1 基本概念智能体Agent执行动作的实体环境Environment智能体交互的外部世界状态State环境的当前状态动作Action智能体可以执行的操作奖励Reward环境对智能体动作的反馈策略Policy智能体选择动作的规则价值函数Value Function评估状态或状态-动作对的价值Q函数评估在特定状态下执行特定动作的价值1.2 强化学习的类型基于价值的方法学习价值函数如Q-Learning、DQN基于策略的方法直接学习策略如Policy Gradient、PPO演员-评论家方法结合价值函数和策略如A2C、DDPG2. Q-Learning 与 DQN2.1 Q-LearningQ-Learning是一种基于价值的强化学习算法它通过学习Q函数来选择最优动作。import numpy as np class QLearning: def __init__(self, state_size, action_size, learning_rate0.1, discount_factor0.99, exploration_rate1.0, exploration_decay0.995, exploration_min0.01): self.q_table np.zeros((state_size, action_size)) self.learning_rate learning_rate self.discount_factor discount_factor self.exploration_rate exploration_rate self.exploration_decay exploration_decay self.exploration_min exploration_min def choose_action(self, state): if np.random.rand() self.exploration_rate: return np.random.randint(self.q_table.shape[1]) return np.argmax(self.q_table[state, :]) def learn(self, state, action, reward, next_state, done): old_value self.q_table[state, action] next_max np.max(self.q_table[next_state, :]) if done: target reward else: target reward self.discount_factor * next_max self.q_table[state, action] old_value self.learning_rate * (target - old_value) # 更新探索率 self.exploration_rate max(self.exploration_min, self.exploration_rate * self.exploration_decay)2.2 DQN (Deep Q-Network)DQN将深度神经网络引入Q-Learning解决了传统Q-Learning在处理高维状态空间时的局限性。import torch import torch.nn as nn import torch.optim as optim import random from collections import deque class DQN(nn.Module): def __init__(self, state_size, action_size): super(DQN, self).__init__() self.fc1 nn.Linear(state_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, action_size) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) return self.fc3(x) class DQNAgent: def __init__(self, state_size, action_size, learning_rate0.001, discount_factor0.99, batch_size64, memory_size10000): self.state_size state_size self.action_size action_size self.discount_factor discount_factor self.batch_size batch_size self.memory deque(maxlenmemory_size) # 在线网络和目标网络 self.policy_net DQN(state_size, action_size) self.target_net DQN(state_size, action_size) self.target_net.load_state_dict(self.policy_net.state_dict()) self.target_net.eval() self.optimizer optim.Adam(self.policy_net.parameters(), lrlearning_rate) self.criterion nn.MSELoss() self.exploration_rate 1.0 self.exploration_decay 0.995 self.exploration_min 0.01 def choose_action(self, state): if np.random.rand() self.exploration_rate: return np.random.randint(self.action_size) with torch.no_grad(): state torch.FloatTensor(state).unsqueeze(0) return self.policy_net(state).argmax().item() def remember(self, state, action, reward, next_state, done): self.memory.append((state, action, reward, next_state, done)) def learn(self): if len(self.memory) self.batch_size: return batch random.sample(self.memory, self.batch_size) states, actions, rewards, next_states, dones zip(*batch) states torch.FloatTensor(states) actions torch.LongTensor(actions).unsqueeze(1) rewards torch.FloatTensor(rewards) next_states torch.FloatTensor(next_states) dones torch.FloatTensor(dones) # 当前Q值 current_q self.policy_net(states).gather(1, actions).squeeze(1) # 目标Q值 with torch.no_grad(): next_q self.target_net(next_states).max(1)[0] target_q rewards (1 - dones) * self.discount_factor * next_q # 计算损失 loss self.criterion(current_q, target_q) # 优化 self.optimizer.zero_grad() loss.backward() self.optimizer.step() # 更新探索率 self.exploration_rate max(self.exploration_min, self.exploration_rate * self.exploration_decay) def update_target_network(self): self.target_net.load_state_dict(self.policy_net.state_dict())2.3 DQN的改进Double DQN解决Q值过估计问题Dueling DQN将Q值分解为状态价值和优势函数Prioritized Experience Replay优先采样重要的经验Noisy Networks用噪声层替代epsilon-greedy探索3. Policy Gradient 方法3.1 基本Policy GradientPolicy Gradient直接优化策略函数适合处理连续动作空间和随机策略。import torch import torch.nn as nn import torch.optim as optim class PolicyNetwork(nn.Module): def __init__(self, state_size, action_size): super(PolicyNetwork, self).__init__() self.fc1 nn.Linear(state_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, action_size) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) return torch.softmax(self.fc3(x), dim-1) class PolicyGradientAgent: def __init__(self, state_size, action_size, learning_rate0.001, discount_factor0.99): self.policy_net PolicyNetwork(state_size, action_size) self.optimizer optim.Adam(self.policy_net.parameters(), lrlearning_rate) self.discount_factor discount_factor self.rewards [] self.actions [] self.states [] def choose_action(self, state): state torch.FloatTensor(state).unsqueeze(0) action_probs self.policy_net(state) action torch.multinomial(action_probs, 1).item() self.states.append(state) self.actions.append(action) return action def remember(self, reward): self.rewards.append(reward) def learn(self): # 计算回报 returns [] G 0 for r in reversed(self.rewards): G r self.discount_factor * G returns.insert(0, G) returns torch.FloatTensor(returns) # 计算策略梯度 loss 0 for i in range(len(self.states)): state self.states[i] action self.actions[i] action_prob self.policy_net(state)[0, action] loss - torch.log(action_prob) * returns[i] # 优化 self.optimizer.zero_grad() loss.backward() self.optimizer.step() # 清空缓存 self.rewards [] self.actions [] self.states []3.2 REINFORCE算法REINFORCE是一种经典的Policy Gradient算法使用蒙特卡洛方法估计回报。3.3 Actor-Critic 方法Actor-Critic结合了价值函数和策略梯度使用价值函数作为基线来减少方差。class ValueNetwork(nn.Module): def __init__(self, state_size): super(ValueNetwork, self).__init__() self.fc1 nn.Linear(state_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, 1) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) return self.fc3(x) class ActorCriticAgent: def __init__(self, state_size, action_size, learning_rate0.001, discount_factor0.99): self.actor PolicyNetwork(state_size, action_size) self.critic ValueNetwork(state_size) self.actor_optimizer optim.Adam(self.actor.parameters(), lrlearning_rate) self.critic_optimizer optim.Adam(self.critic.parameters(), lrlearning_rate) self.discount_factor discount_factor def choose_action(self, state): state torch.FloatTensor(state).unsqueeze(0) action_probs self.actor(state) action torch.multinomial(action_probs, 1).item() return action, action_probs def learn(self, state, action, reward, next_state, done): state torch.FloatTensor(state).unsqueeze(0) next_state torch.FloatTensor(next_state).unsqueeze(0) reward torch.FloatTensor([reward]) done torch.FloatTensor([done]) # 计算优势函数 value self.critic(state) next_value self.critic(next_state) target reward (1 - done) * self.discount_factor * next_value advantage target - value # 更新评论家网络 critic_loss advantage.pow(2).mean() self.critic_optimizer.zero_grad() critic_loss.backward(retain_graphTrue) self.critic_optimizer.step() # 更新演员网络 action_probs self.actor(state) action_prob action_probs[0, action] actor_loss -torch.log(action_prob) * advantage.detach() self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step()4. PPO (Proximal Policy Optimization)PPO是一种先进的Policy Gradient算法通过限制策略更新的幅度来提高训练稳定性。4.1 PPO的基本原理目标函数使用裁剪的目标函数来限制策略更新重要性采样使用重要性权重来纠正分布偏移多次更新在同一批数据上进行多次更新4.2 PPO实现class PPOAgent: def __init__(self, state_size, action_size, learning_rate0.0003, discount_factor0.99, epsilon0.2, epochs4, batch_size64): self.actor PolicyNetwork(state_size, action_size) self.critic ValueNetwork(state_size) self.actor_optimizer optim.Adam(self.actor.parameters(), lrlearning_rate) self.critic_optimizer optim.Adam(self.critic.parameters(), lrlearning_rate) self.discount_factor discount_factor self.epsilon epsilon self.epochs epochs self.batch_size batch_size self.memory [] def choose_action(self, state): state torch.FloatTensor(state).unsqueeze(0) action_probs self.actor(state) action torch.multinomial(action_probs, 1).item() return action, action_probs[0, action].item() def remember(self, state, action, action_prob, reward, next_state, done): self.memory.append((state, action, action_prob, reward, next_state, done)) def learn(self): states, actions, old_action_probs, rewards, next_states, dones zip(*self.memory) # 计算回报和优势函数 returns [] advantages [] G 0 next_value 0 for i in reversed(range(len(rewards))): G rewards[i] (1 - dones[i]) * self.discount_factor * G returns.insert(0, G) state torch.FloatTensor(states[i]).unsqueeze(0) value self.critic(state).item() advantages.insert(0, G - value) returns torch.FloatTensor(returns) advantages torch.FloatTensor(advantages) advantages (advantages - advantages.mean()) / (advantages.std() 1e-8) # 转换为张量 states torch.FloatTensor(states) actions torch.LongTensor(actions) old_action_probs torch.FloatTensor(old_action_probs) # 多次更新 for _ in range(self.epochs): # 随机采样批次 indices torch.randperm(len(states)) for i in range(0, len(states), self.batch_size): batch_indices indices[i:iself.batch_size] batch_states states[batch_indices] batch_actions actions[batch_indices] batch_old_action_probs old_action_probs[batch_indices] batch_returns returns[batch_indices] batch_advantages advantages[batch_indices] # 计算新的动作概率 new_action_probs self.actor(batch_states).gather(1, batch_actions.unsqueeze(1)).squeeze(1) # 计算重要性权重 ratio new_action_probs / batch_old_action_probs # 裁剪的目标函数 surr1 ratio * batch_advantages surr2 torch.clamp(ratio, 1 - self.epsilon, 1 self.epsilon) * batch_advantages actor_loss -torch.min(surr1, surr2).mean() # 价值函数损失 values self.critic(batch_states).squeeze(1) critic_loss (batch_returns - values).pow(2).mean() # 优化 self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() # 清空内存 self.memory []5. 算法对比算法类型优点缺点适用场景Q-Learning基于价值简单易实现只适用于离散动作空间简单的离散动作环境DQN基于价值处理高维状态空间只适用于离散动作空间具有高维状态的离散动作环境Policy Gradient基于策略处理连续动作空间方差大收敛慢连续动作空间Actor-Critic混合方差小收敛快实现复杂各种环境PPO基于策略稳定样本效率高超参数敏感各种环境特别是复杂环境6. 训练技巧6.1 超参数调优学习率通常在1e-4到1e-3之间批量大小根据内存大小调整通常为32-256折扣因子通常为0.99PPO的epsilon通常为0.2训练轮数根据环境复杂度调整6.2 经验回放普通经验回放随机采样经验优先经验回放优先采样重要的经验批量经验回放批量处理经验6.3 探索策略epsilon-greedy平衡探索和利用噪声网络通过噪声层实现探索参数噪声向策略参数添加噪声6.4 正则化L2正则化防止过拟合熵正则化鼓励探索KL散度正则化限制策略变化7. 实际应用案例7.1 游戏AIimport gym # 训练DQN在CartPole环境 env gym.make(CartPole-v1) state_size env.observation_space.shape[0] action_size env.action_space.n agent DQNAgent(state_size, action_size) episodes 1000 for episode in range(episodes): state env.reset() done False total_reward 0 while not done: action agent.choose_action(state) next_state, reward, done, _ env.step(action) agent.remember(state, action, reward, next_state, done) agent.learn() state next_state total_reward reward if episode % 100 0: agent.update_target_network() print(fEpisode {episode}, Reward: {total_reward}) # 测试 state env.reset() done False total_reward 0 while not done: action agent.choose_action(state) state, reward, done, _ env.step(action) total_reward reward print(fTest Reward: {total_reward}) env.close()7.2 机器人控制# 使用PPO训练机器人控制器 env gym.make(Pendulum-v1) state_size env.observation_space.shape[0] action_size env.action_space.shape[0] agent PPOAgent(state_size, action_size) episodes 2000 for episode in range(episodes): state env.reset() done False total_reward 0 while not done: action, action_prob agent.choose_action(state) next_state, reward, done, _ env.step(action) agent.remember(state, action, action_prob, reward, next_state, done) state next_state total_reward reward agent.learn() if episode % 100 0: print(fEpisode {episode}, Reward: {total_reward}) # 测试 state env.reset() done False total_reward 0 while not done: action, _ agent.choose_action(state) state, reward, done, _ env.step(action) total_reward reward print(fTest Reward: {total_reward}) env.close()7.3 金融交易# 简化的金融交易环境 class TradingEnv: def __init__(self, data): self.data data self.current_step 0 self.balance 10000 self.shares 0 def reset(self): self.current_step 0 self.balance 10000 self.shares 0 return self._get_state() def _get_state(self): return [self.balance, self.shares, self.data[self.current_step]] def step(self, action): price self.data[self.current_step] if action 0: # 买入 if self.balance price: self.shares 1 self.balance - price elif action 1: # 卖出 if self.shares 0: self.shares - 1 self.balance price # 持有 self.current_step 1 done self.current_step len(self.data) - 1 # 计算奖励 total_asset self.balance self.shares * price reward total_asset - 10000 return self._get_state(), reward, done, {} # 训练交易策略 data [100, 102, 98, 105, 110, 108, 115, 120, 118, 125] env TradingEnv(data) state_size 3 action_size 3 # 买入、卖出、持有 agent DQNAgent(state_size, action_size) episodes 1000 for episode in range(episodes): state env.reset() done False total_reward 0 while not done: action agent.choose_action(state) next_state, reward, done, _ env.step(action) agent.remember(state, action, reward, next_state, done) agent.learn() state next_state total_reward reward if episode % 100 0: agent.update_target_network() print(fEpisode {episode}, Reward: {total_reward}) # 测试 state env.reset() done False total_reward 0 while not done: action agent.choose_action(state) state, reward, done, _ env.step(action) total_reward reward print(fTest Reward: {total_reward})8. 性能评估8.1 评估指标平均回报测试过程中的平均奖励成功率完成任务的比例学习曲线奖励随训练时间的变化稳定性不同种子下的性能差异8.2 对比实验import matplotlib.pyplot as plt # 比较不同算法的性能 algorithms [DQN, Actor-Critic, PPO] rewards { DQN: [10, 20, 30, 40, 45, 48, 49, 50], Actor-Critic: [5, 15, 25, 35, 42, 46, 48, 49], PPO: [8, 22, 38, 45, 48, 49, 50, 50] } plt.figure(figsize(10, 6)) for algo in algorithms: plt.plot(rewards[algo], labelalgo) plt.xlabel(Epochs) plt.ylabel(Average Reward) plt.title(Algorithm Comparison) plt.legend() plt.savefig(algorithm_comparison.png) plt.show()9. 常见问题与解决方案9.1 训练不稳定问题训练过程中奖励波动大难以收敛解决方案使用更小的学习率添加正则化使用经验回放调整批量大小9.2 过拟合问题模型在训练环境中表现好但在测试环境中表现差解决方案添加dropout使用L2正则化增加环境随机性数据增强9.3 探索不足问题智能体过早收敛到次优策略解决方案增加探索率使用噪声网络实现参数噪声设计更好的奖励函数9.4 样本效率低问题需要大量样本才能收敛解决方案使用优先经验回放实现PPO的多次更新利用演示数据迁移学习10. 未来发展趋势10.1 模型架构Transformer在RL中的应用利用自注意力机制处理序列决策模型压缩使RL模型适用于边缘设备多任务学习一个模型处理多个任务10.2 算法改进离线强化学习从静态数据集学习分层强化学习将任务分解为子任务元强化学习快速适应新环境多智能体强化学习多个智能体的协同学习10.3 应用领域自动驾驶决策和控制机器人复杂任务执行金融交易策略游戏游戏AI** healthcare**个性化治疗方案11. 代码示例完整的强化学习框架强化学习框架 import torch import torch.nn as nn import torch.optim as optim import random from collections import deque import gym class PolicyNetwork(nn.Module): def __init__(self, state_size, action_size): super(PolicyNetwork, self).__init__() self.fc1 nn.Linear(state_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, action_size) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) return torch.softmax(self.fc3(x), dim-1) class ValueNetwork(nn.Module): def __init__(self, state_size): super(ValueNetwork, self).__init__() self.fc1 nn.Linear(state_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, 1) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) return self.fc3(x) class PPOAgent: def __init__(self, state_size, action_size, learning_rate0.0003, discount_factor0.99, epsilon0.2, epochs4, batch_size64): self.actor PolicyNetwork(state_size, action_size) self.critic ValueNetwork(state_size) self.actor_optimizer optim.Adam(self.actor.parameters(), lrlearning_rate) self.critic_optimizer optim.Adam(self.critic.parameters(), lrlearning_rate) self.discount_factor discount_factor self.epsilon epsilon self.epochs epochs self.batch_size batch_size self.memory [] def choose_action(self, state): state torch.FloatTensor(state).unsqueeze(0) action_probs self.actor(state) action torch.multinomial(action_probs, 1).item() return action, action_probs[0, action].item() def remember(self, state, action, action_prob, reward, next_state, done): self.memory.append((state, action, action_prob, reward, next_state, done)) def learn(self): if len(self.memory) self.batch_size: return states, actions, old_action_probs, rewards, next_states, dones zip(*self.memory) # 计算回报和优势函数 returns [] advantages [] G 0 next_value 0 for i in reversed(range(len(rewards))): G rewards[i] (1 - dones[i]) * self.discount_factor * G returns.insert(0, G) state torch.FloatTensor(states[i]).unsqueeze(0) value self.critic(state).item() advantages.insert(0, G - value) returns torch.FloatTensor(returns) advantages torch.FloatTensor(advantages) advantages (advantages - advantages.mean()) / (advantages.std() 1e-8) # 转换为张量 states torch.FloatTensor(states) actions torch.LongTensor(actions) old_action_probs torch.FloatTensor(old_action_probs) # 多次更新 for _ in range(self.epochs): # 随机采样批次 indices torch.randperm(len(states)) for i in range(0, len(states), self.batch_size): batch_indices indices[i:iself.batch_size] batch_states states[batch_indices] batch_actions actions[batch_indices] batch_old_action_probs old_action_probs[batch_indices] batch_returns returns[batch_indices] batch_advantages advantages[batch_indices] # 计算新的动作概率 new_action_probs self.actor(batch_states).gather(1, batch_actions.unsqueeze(1)).squeeze(1) # 计算重要性权重 ratio new_action_probs / batch_old_action_probs # 裁剪的目标函数 surr1 ratio * batch_advantages surr2 torch.clamp(ratio, 1 - self.epsilon, 1 self.epsilon) * batch_advantages actor_loss -torch.min(surr1, surr2).mean() # 价值函数损失 values self.critic(batch_states).squeeze(1) critic_loss (batch_returns - values).pow(2).mean() # 优化 self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() # 清空内存 self.memory [] def train_ppo(): 训练PPO在CartPole环境 env gym.make(CartPole-v1) state_size env.observation_space.shape[0] action_size env.action_space.n agent PPOAgent(state_size, action_size) episodes 1000 rewards [] for episode in range(episodes): state env.reset() done False total_reward 0 while not done: action, action_prob agent.choose_action(state) next_state, reward, done, _ env.step(action) agent.remember(state, action, action_prob, reward, next_state, done) state next_state total_reward reward agent.learn() rewards.append(total_reward) if episode % 100 0: avg_reward sum(rewards[-100:]) / len(rewards[-100:]) print(fEpisode {episode}, Average Reward: {avg_reward}) # 测试 state env.reset() done False total_reward 0 while not done: action, _ agent.choose_action(state) state, reward, done, _ env.step(action) total_reward reward print(fTest Reward: {total_reward}) env.close() if __name__ __main__: train_ppo()12. 总结强化学习是一种强大的机器学习方法从Q-Learning到DQN再到Policy Gradient和PPO算法不断演进和完善。PPO作为当前最流行的强化学习算法之一以其稳定性和样本效率高的特点被广泛应用于各种复杂任务。关键要点算法选择根据任务特点选择合适的算法超参数调优合理设置学习率、批量大小等超参数训练技巧使用经验回放、探索策略等技巧提高训练效果评估方法使用多种指标评估模型性能实际应用根据具体应用场景调整算法未来展望强化学习在自动驾驶、机器人、金融等领域的应用前景广阔。随着模型架构的改进和算法的创新强化学习将在更多复杂任务中发挥重要作用。特别是结合深度学习和其他机器学习技术强化学习有望解决更多现实世界的挑战。