【深度强化学习】(5) DDPG 模型解析,附Pytorch完整代码
创始人
2024-06-02 06:11:00
0

大家好,今天和各位分享一下深度确定性策略梯度算法 (Deterministic Policy Gradient,DDPG)。并基于 OpenAI 的 gym 环境完成一个小游戏。完整代码在我的 GitHub 中获得:

https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model


1. 基本原理

深度确定性策略梯度算法是结合确定性策略梯度算法的思想,对 DQN 的一种改进,是一种无模型的深度强化学习算法。

DDPG 算法使用演员-评论家(Actor-Critic)算法作为其基本框架,采用深度神经网络作为策略网络和动作值函数的近似,使用随机梯度法训练策略网络和价值网络模型中的参数。DDPG 算法的原理如下图所示。

DDPG 算法架构中使用双重神经网络架构,对于策略函数和价值函数均使用双重神经网络模型架构(即 Online 网络和 Target 网络),使得算法的学习过程更加稳定,收敛的速度加快。同时该算法引入经验回放机制,Actor 与环境交互生产生的经验数据样本存储到经验池中,抽取批量数据样本进行训练,即类似于 DQN 的经验回放机制,去除样本的相关性和依赖性,使得算法更加容易收敛。 


2. 公式推导

为了便于大家理解 DDPG 的推导过程,算法框架如下图所示:

DDPG 共包含 4 个神经网络,用于对 Q 值函数和策略的近似表示。Critic 目标网络用于近似估计下一时刻的状态-动作的 Q 值函数 Q_{w'}(S_{t+1},\pi _{\theta '}(S_{t+1})),其中,下一动作值是通过 Actor 目标网络近似估计得到的 \pi_{\theta' }(S_{t+1})。于是可以得到当前状态下 Q 值函数的目标值

y_i = r_i + \gamma Q_{w'}(S_{i+1}, \pi _{\theta '}(S_{i+1}))

Critic 训练网络输出当前时刻状态-动作的 Q 值函数 Q_w(S_t, a_t),用于对当前策略评价。为了增加智能体在环境中的探索,DDPG 在行为策略上添加了高斯噪声函数Critic 网络的目标定义为:

y_i - Q_w(S_i,a_i)

通过最小化损失值(均方误差损失)来更新 Critic 网络的参数,Critic 网络更新时的损失函数为:

loss =\frac{1}{N} \sum_i (y_i - Q_w(S_i,a_i))^2

其中,a_i = \pi _{\theta} (S_i) + \varepsilon\varepsilon 代表行为策略上的探索噪声。

Actor 目标网络用于提供下一个状态的策略Actor 训练网络则是提供当前状态的策略,结合 Critic 训练网络的 Q 值函数可以得到 Actor 在参数更新时的策略梯度

\bigtriangledown _ {\pi_\theta} J = \frac{1}{N}\sum_i \bigtriangledown _a Q_w(s,a)|_{s=s_i,a=\pi_\theta(s_i)} \bigtriangledown _{\theta} \pi_{\theta} (s)|_{s_i}

对于目标网络参数 w' 和 \theta ' 的更新,DDPG 通过软更新机制(每次 learn 的时候更新部分参数)保证参数可以缓慢更新,从而提高学习的稳定性:

w' \leftarrow \xi w + (1-\xi )w'

\theta ' =\leftarrow \xi \theta + (1- \xi ) \theta '

DDPG 中既有基于价值函数的方法特征,也有基于策略的方法特征,使深度强化学习可以处理连续动作,并且具有一定的探索能力。 

算法流程图如下:


3. 代码实现

DDPG 的伪代码如下:

模型代码如下:

import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
import collections
import random# ------------------------------------- #
# 经验回放池
# ------------------------------------- #class ReplayBuffer:def __init__(self, capacity):  # 经验池的最大容量# 创建一个队列,先进先出self.buffer = collections.deque(maxlen=capacity)# 在队列中添加数据def add(self, state, action, reward, next_state, done):# 以list类型保存self.buffer.append((state, action, reward, next_state, done))# 在队列中随机取样batch_size组数据def sample(self, batch_size):transitions = random.sample(self.buffer, batch_size)# 将数据集拆分开来state, action, reward, next_state, done = zip(*transitions)return np.array(state), action, reward, np.array(next_state), done# 测量当前时刻的队列长度def size(self):return len(self.buffer)# ------------------------------------- #
# 策略网络
# ------------------------------------- #class PolicyNet(nn.Module):def __init__(self, n_states, n_hiddens, n_actions, action_bound):super(PolicyNet, self).__init__()# 环境可以接受的动作最大值self.action_bound = action_bound# 只包含一个隐含层self.fc1 = nn.Linear(n_states, n_hiddens)self.fc2 = nn.Linear(n_hiddens, n_actions)# 前向传播def forward(self, x):x = self.fc1(x)  # [b,n_states]-->[b,n_hiddens]x = F.relu(x)x = self.fc2(x)  # [b,n_hiddens]-->[b,n_actions]x= torch.tanh(x)  # 将数值调整到 [-1,1]x = x * self.action_bound  # 缩放到 [-action_bound, action_bound]return x# ------------------------------------- #
# 价值网络
# ------------------------------------- #class QValueNet(nn.Module):def __init__(self, n_states, n_hiddens, n_actions):super(QValueNet, self).__init__()# self.fc1 = nn.Linear(n_states + n_actions, n_hiddens)self.fc2 = nn.Linear(n_hiddens, n_hiddens)self.fc3 = nn.Linear(n_hiddens, 1)# 前向传播def forward(self, x, a):# 拼接状态和动作cat = torch.cat([x, a], dim=1)  # [b, n_states + n_actions]x = self.fc1(cat)  # -->[b, n_hiddens]x = F.relu(x)x = self.fc2(x)  # -->[b, n_hiddens]x = F.relu(x)x = self.fc3(x)  # -->[b, 1]return x# ------------------------------------- #
# 算法主体
# ------------------------------------- #class DDPG:def __init__(self, n_states, n_hiddens, n_actions, action_bound,sigma, actor_lr, critic_lr, tau, gamma, device):# 策略网络--训练self.actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)# 价值网络--训练self.critic = QValueNet(n_states, n_hiddens, n_actions).to(device)# 策略网络--目标self.target_actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)# 价值网络--目标self.target_critic = QValueNet(n_states, n_hiddens, n_actions).to(device)# 初始化价值网络的参数,两个价值网络的参数相同self.target_critic.load_state_dict(self.critic.state_dict())# 初始化策略网络的参数,两个策略网络的参数相同self.target_actor.load_state_dict(self.actor.state_dict())# 策略网络的优化器self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)# 价值网络的优化器self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)# 属性分配self.gamma = gamma  # 折扣因子self.sigma = sigma  # 高斯噪声的标准差,均值设为0self.tau = tau  # 目标网络的软更新参数self.n_actions = n_actionsself.device = device# 动作选择def take_action(self, state):# 维度变换 list[n_states]-->tensor[1,n_states]-->gpustate = torch.tensor(state, dtype=torch.float).view(1,-1).to(self.device)# 策略网络计算出当前状态下的动作价值 [1,n_states]-->[1,1]-->intaction = self.actor(state).item()# 给动作添加噪声,增加搜索action = action + self.sigma * np.random.randn(self.n_actions)return action# 软更新, 意思是每次learn的时候更新部分参数def soft_update(self, net, target_net):# 获取训练网络和目标网络需要更新的参数for param_target, param in zip(target_net.parameters(), net.parameters()):# 训练网络的参数更新要综合考虑目标网络和训练网络param_target.data.copy_(param_target.data*(1-self.tau) + param.data*self.tau)# 训练def update(self, transition_dict):# 从训练集中取出数据states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)  # [b,n_states]actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)  # [b,next_states]dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]# 价值目标网络获取下一时刻的动作[b,n_states]-->[b,n_actors]next_q_values = self.target_actor(next_states)# 策略目标网络获取下一时刻状态选出的动作价值 [b,n_states+n_actions]-->[b,1]next_q_values = self.target_critic(next_states, next_q_values)# 当前时刻的动作价值的目标值 [b,1]q_targets = rewards + self.gamma * next_q_values * (1-dones)# 当前时刻动作价值的预测值 [b,n_states+n_actions]-->[b,1]q_values = self.critic(states, actions)# 预测值和目标值之间的均方差损失critic_loss = torch.mean(F.mse_loss(q_values, q_targets))# 价值网络梯度self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# 当前状态的每个动作的价值 [b, n_actions]actor_q_values = self.actor(states)# 当前状态选出的动作价值 [b,1]score = self.critic(states, actor_q_values)# 计算损失actor_loss = -torch.mean(score)# 策略网络梯度self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# 软更新策略网络的参数  self.soft_update(self.actor, self.target_actor)# 软更新价值网络的参数self.soft_update(self.critic, self.target_critic)

4. 案例演示

基于 OpenAI 的 gym 环境完成一个推车游戏,目标是将小车推到山顶旗子处。动作维度为1,属于连续值;状态维度为 2,分别是 x 坐标和小车速度。

代码如下:

import numpy as np
import torch
import matplotlib.pyplot as plt
import gym
from parsers import args
from RL_brain import ReplayBuffer, DDPG
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')# -------------------------------------- #
# 环境加载
# -------------------------------------- #env_name = "MountainCarContinuous-v0"  # 连续型动作
env = gym.make(env_name, render_mode="human")
n_states = env.observation_space.shape[0]  # 状态数 2
n_actions = env.action_space.shape[0]  # 动作数 1
action_bound = env.action_space.high[0]  # 动作的最大值 1.0# -------------------------------------- #
# 模型构建
# -------------------------------------- ## 经验回放池实例化
replay_buffer = ReplayBuffer(capacity=args.buffer_size)
# 模型实例化
agent = DDPG(n_states = n_states,  # 状态数n_hiddens = args.n_hiddens,  # 隐含层数n_actions = n_actions,  # 动作数action_bound = action_bound,  # 动作最大值sigma = args.sigma,  # 高斯噪声actor_lr = args.actor_lr,  # 策略网络学习率critic_lr = args.critic_lr,  # 价值网络学习率tau = args.tau,  # 软更新系数gamma = args.gamma,  # 折扣因子device = device)# -------------------------------------- #
# 模型训练
# -------------------------------------- #return_list = []  # 记录每个回合的return
mean_return_list = []  # 记录每个回合的return均值for i in range(10):  # 迭代10回合episode_return = 0  # 累计每条链上的rewardstate = env.reset()[0]  # 初始时的状态done = False  # 回合结束标记while not done:# 获取当前状态对应的动作action = agent.take_action(state)# 环境更新next_state, reward, done, _, _ = env.step(action)# 更新经验回放池replay_buffer.add(state, action, reward, next_state, done)# 状态更新state = next_state# 累计每一步的rewardepisode_return += reward# 如果经验池超过容量,开始训练if replay_buffer.size() > args.min_size:# 经验池随机采样batch_size组s, a, r, ns, d = replay_buffer.sample(args.batch_size)# 构造数据集transition_dict = {'states': s,'actions': a,'rewards': r,'next_states': ns,'dones': d,}# 模型训练agent.update(transition_dict)# 保存每一个回合的回报return_list.append(episode_return)mean_return_list.append(np.mean(return_list[-10:]))  # 平滑# 打印回合信息print(f'iter:{i}, return:{episode_return}, mean_return:{np.mean(return_list[-10:])}')# 关闭动画窗格
env.close()# -------------------------------------- #
# 绘图
# -------------------------------------- #x_range = list(range(len(return_list)))plt.subplot(121)
plt.plot(x_range, return_list)  # 每个回合return
plt.xlabel('episode')
plt.ylabel('return')
plt.subplot(122)
plt.plot(x_range, mean_return_list)  # 每回合return均值
plt.xlabel('episode')
plt.ylabel('mean_return')

相关内容

热门资讯

诗人李白的简介 简介李白(701年-762年) ,字太白,号青莲居士,又号“谪仙人”,是唐代伟大的浪漫主义诗人,被后...
咏梅诗词精选 关于咏梅诗词精选有哪一些呢,unjs小编为大家推荐下文,希望可以帮助到你。欢迎大家阅读参考。咏梅诗词...
秦少游诗词 秦少游诗词  1.《江城子·南来飞燕北归鸿》  年代: 宋 作者: 秦观  南来飞燕北归鸿。偶相逢,...
文天祥写的古诗 文天祥写的古诗  在日常学习、工作和生活中,许多人对一些广为流传的古诗都不陌生吧,从格律上看,古诗可...
描写梨花美丽的诗句 描写梨花美丽的诗句  梨花的花语是纯真,代表着唯美纯净的爱情,但是也有谐音“离别”的意思。下面是小编...
霜降的经典古诗词和诗句 关于霜降的经典古诗词和诗句  在日常学习、工作和生活中,大家都接触过古诗吧,汉魏以后的古诗一般以五七...
陶冶情操的诗句 有关陶冶情操的诗句  1、不求金玉重重贵,但愿儿孙个个贤。枯木逢春犹再岁,人无两度再少年。  2、是...
刘禹锡《酬乐天扬州初逢席上见... 刘禹锡《酬乐天扬州初逢席上见赠》全诗翻译赏析  “今日听君歌一曲,暂凭杯酒长精神”的诗意:今天听了你...
描写冬雨的优美诗句 描写冬雨的优美诗句  冬雨,温暖的大气遗留下来的最后一点残迹,已没有了夏日暴风骤雨的.豪情,也没有了...
词人黄庭坚“千年一叹” 词人黄庭坚“千年一叹”  导语:黄庭坚是一位才华洋溢的词人,他一生好酒,也是“江西诗派”的代表诗人。...
描写红梅映雪的诗 描写红梅映雪的诗  无论在学习、工作或是生活中,大家总少不了接触一些耳熟能详的'古诗吧,古诗的篇幅可...
有哪些是李白写的诗 有哪些是李白写的诗  李白,字太白,号青莲居士。中国唐朝诗人,有“诗仙”之称,是伟大的浪漫主义诗人。...
杜甫《赠花卿》译文及注释 杜甫《赠花卿》译文及注释  《赠花卿》是唐代伟大诗人杜甫的作品,约作于唐上元二年(761年)。全诗四...
滁州西涧韦应物注音版 滁州西涧韦应物注音版  滁州西涧湖这一景点,被称为滁州十二景之一。西涧湖就是唐代著名山水诗人韦应物最...
异乡物态与人殊,惟有东风旧相... “异乡物态与人殊,惟有东风旧相识。”出处 出自 宋代 欧阳修 的《春日西湖寄谢法曹韵》“异乡物态与人...
描写春节的古诗句 描写春节的古诗句  在平平淡淡的日常中,大家一定没少看到经典的`古诗吧,古诗有四言、五言、七言、杂言...
常用的描写菊花的诗句 常用的描写菊花的诗句  一树木犀供夜雨,清香移在菊花枝。下面是小编收集的一些常用的描写菊花的诗句,欢...
金陵酒肆留别古诗 金陵酒肆留别古诗  《金陵酒肆留别》  风吹柳花满店香,吴姬压酒唤客尝。  金陵子弟来相送,欲行不行...
苏轼写夏天的诗句古诗 苏轼写夏天的诗句古诗  在夏天来,一切当时,阳光倾洒,万物生气勃勃,愈加灵动。  苏轼写夏天的诗句古...