深度强化学习:通过异步优势动作评价 (A3C) 算法玩 CartPole
文 / 软件工程实习生 Raymond Yuan
在本教程中,我们将学习如何使用深度强化学习来训练模型,使其能够在简单的 CartPole 游戏中获胜。我们会使用 tf.keras 和 OpenAI Gym 并通过被称为异步优势动作评价 (A3C) 的技术来训练智能体。强化学习一直以来备受瞩目,但它到底是什么呢?强化学习是机器学习的一个领域,其中的智能体需要在环境中执行特定操作,从而使奖励最大化或获得某些奖励。
在此过程中,我们会积累关于下列概念的实际经验,并培养对这些概念的理解:
Eager Execution — Eager Execution 是一个由运行定义的命令式接口。一旦用户从 Python 中调用此接口,便可立即执行运算。这让 TensorFlow 的入门变得更加简单,并让研发变得更加直观。
模型子类化 — 模型子类化让用户可以通过将 tf.keras 模型子类化并定义自己的正向传递,来构建可完全自定义的模型。在启用 Eager Execution 时,模型子类化特别有用,因为用户可以命令式地写入正向传递。
自定义训练循环
注:tf.keras 模型链接
https://www.tensorflow.org/api_docs/python/tf/keras/Model
我们会按照以下基本工作流程来进行介绍:
构建主智能体监控器
构建工作器智能体
执行 A3C 算法
训练智能体
直观呈现训练表现
受众:本教程面向所有对强化学习感兴趣的人。虽然我们不会深入探讨机器学习的基础知识,但会大体涵盖策略和值网络等话题。此外,我建议大家阅读一篇论文,即 Volodymyr Mnih 所著的《深度强化学习的异步方法》(Asynchronous Methods for Deep Reinforcement Learning)。这篇论文更详细地介绍了相关算法,非常值得一读。
CartPole 是什么?
Cartpole 是一个游戏,其中有一根杆子通过非驱动关节与小车相连,而小车会沿着无摩擦轨道移动。系统会随机对起始状态(小车位置、小车移动速度、杆子角度,以及杆子顶端的摆动速度)进行初始化(介于 +/-0.05 之间)。我们通过对小车施加 +1 或 -1 的力度来控制系统(向左或向右移动)。杆子一开始是竖直的,而我们的目标是阻止杆子倒下。当杆子保持竖直时,系统在每个时步都会提供 +1 的奖励。如果杆子与垂直方向的夹角大于 15 度,或者小车偏离中心超过 2.4 个单位距离,则游戏结束。
代码
您可以点击此链接获取本文中的完整代码,并在自述文件中找到安装说明。
注:此链接
https://github.com/tensorflow/models/blob/master/research/a3c_blogpost/a3c_cartpole.py
自述文件链接
https://github.com/tensorflow/models/tree/master/research/a3c_blogpost
建立基线
要正确判断您模型的实际表现和您用于评估模型的指标,建立基线通常是非常有用的方法。例如,当您看到返回的分数很高时,您的模型似乎运行良好,但实际上高分或许不能反映算法的好坏或随机操作的结果。在分类示例中,我们可以简单分析类别分布及预测最常见的类别,以建立基线表现。但是,如何建立强化学习的基线呢?为此我们将创建一个随机智能体,使其在我们的环境中只执行随机操作。
对于 CartPole 游戏,我们在 4000 次游戏中平均每次获得大约 20 个奖励。要运行随机智能体,请运行所提供的 py 文件:python a3c_cartpole.py — algorithm=random — max-eps=4000。
何为异步优势动作评价算法?
异步优势动作评价真是晦涩难懂啊!我们首先拆分这个算法的名称,然后再分析算法本身背后的机制。
异步:此为异步算法,可以并行训练多个工作器智能体,其中每个智能体都有自己的模型和环境副本。由于能够并行训练更多工作器,我们的算法可以加快训练速度;此外,由于每个工作器的经验都是独立的,智能体可获得更多样化的训练经验。
优势:优势不但是衡量操作优劣的指标,也是判断结果好坏的因素。这使算法可以侧重于缺少网络预测的方面。直观地说,这让我们可以衡量在指定时步执行操作 a 而非采用策略 π 的优势。
动作评价:此算法的动作评价方面使用的架构可以共享策略和值函数之间的层。
但它有何工作原理呢?
大体来看,A3C 算法使用异步更新协议,会根据固定时步长度的经验来运作。此算法会使用这些时间段来计算奖励的估计量和优势函数。每个工作器都会执行下列工作流程周期:
提取全局网络参数
通过采用最少(t_max,进入终止状态的步数)步数的本地策略与环境交互。
计算值和策略损失
从损失中获取梯度
使用梯度更新全局网络
重复步骤
借助此训练配置,我们有望看到智能体的数量直线上升。但是,您的机器能够支持的智能体数量受到可用 CPU 核心数量的限制。此外,A3C 甚至可以扩展到多个机器,并且某些更新的研究(如 IMPALA)还支持更进一步扩展。但增加更多机器可能会对速度和性能带来不利影响。如要了解更多深入信息,请查看此论文!
注:论文链接
https://arxiv.org/abs/1602.01783
复习策略和值函数
如果您已经熟知策略梯度,那么我建议您跳过这个部分。否则,如果您不知道何为策略/值,或者只是想快速复习一下,就请继续阅读吧!
策略的概念是指在指定某些输入状态的情况下,用参数表示操作的概率分布。我们通过创建网络来完成此操作,该网络会考虑游戏的状态,并决定我们应该做什么。同样地,当智能体在玩游戏时,每当它看到某个状态(或类似状态)时,它都会计算在指定输入状态下每种可用操作的概率,然后根据此概率分布对操作进行抽样。为了更正规地探究数学运算,我们将策略梯度作为更一般的记分函数梯度估计量的特例。一般案例以 Ex p(x | ) [f(x)] 的形式表达,换言之,在我们的案例中,奖励(或优势)函数的期望值为 f, 而在某些策略网络下则为 p。然后,使用对数求导技巧,我们得知如何更新网络参数,从而使操作样本获得更高的奖励,并以 ∇ Ex[f(x)] =Ex[f(x) ∇ log p(x)] 结束。简单来说,这个公式解释了根据奖励函数 f,我们梯度方向中的移位 θ 会如何使分数最大化。
值函数会从本质上判断某个状态的好坏。在形式上,当游戏以状态 s 开始并采用策略 p 时,值函数会定义预计的奖励总数。这便是与算法名称中的 “评价” 相关的部分。智能体会使用估算值(评价)来更新策略(动作)。
执行
首先,我们定义要使用的模型种类。主智能体会拥有全局网络,且每个本地工作器智能体在自己的进程中都会拥有此网络的副本。我们会使用模型子类化对模型进行实例化。虽然模型子类化会使进程更冗长,但却为我们提供了最大的灵活性。
1 class ActorCriticModel(keras.Model):
2 def __init__(self, state_size, action_size):
3 super(ActorCriticModel, self).__init__()
4 self.state_size = state_size
5 self.action_size = action_size
6 self.dense1 = layers.Dense(100, activation='relu')
7 self.policy_logits = layers.Dense(action_size)
8 self.dense2 = layers.Dense(100, activation='relu')
9 self.values = layers.Dense(1)
10
11 def call(self, inputs):
12 # Forward pass
13 x = self.dense1(inputs)
14 logits = self.policy_logits(x)
15 v1 = self.dense2(inputs)
16 values = self.values(v1)
17 return logits, values
正如您从我们的正向传递中看到的,我们的模型会采用输入和返回策略概率的分对数和值。
主智能体 — 主线程
让我们看看运算的控制中心。主智能体拥有一个共享优化器,可以更新其全局网络。该智能体会对全局网络进行实例化,每个工作器智能体及我们用于更新智能体的优化器都会更新。研究表明,A3C 对各种学习率都具有很高的适应性,但针对 CartPole 游戏,我们会使用学习率为 5e-4 的 AdamOptimizer。
1 class MasterAgent():
2 def __init__(self):
3 self.game_name = 'CartPole-v0'
4 save_dir = args.save_dir
5 self.save_dir = save_dir
6 if not os.path.exists(save_dir):
7 os.makedirs(save_dir)
8
9 env = gym.make(self.game_name)
10 self.state_size = env.observation_space.shape[0]
11 self.action_size = env.action_space.n
12 self.opt = tf.train.AdamOptimizer(args.lr, use_locking=True)
13 print(self.state_size, self.action_size)
14
15 self.global_model = ActorCriticModel(self.state_size, self.action_size) # global network
16 self.global_model(tf.convert_to_tensor(np.random.random((1, self.state_size)), dtype=tf.float32))
主智能体会运行训练函数来启动每个智能体并对其进行实例化。主智能体负责对每个智能体进行协调和监管。各个智能体会异步运行。(严格来说,这不是真正的异步,因为在 Python 中,由于 GIL(全局解释器锁)的存在,单一 Python 进程无法并行运行多个线程(利用多个核心),但可以同时运行多个线程(在 I/O 密集型运算期间进行上下文切换)。为了简要清晰地举例说明,我们使用线程执行指令)。
1 def train(self):
2 if args.algorithm == 'random':
3 random_agent = RandomAgent(self.game_name, args.max_eps)
4 random_agent.run()
5 return
6
7 res_queue = Queue()
8
9 workers = [Worker(self.state_size,
10 self.action_size,
11 self.global_model,
12 self.opt, res_queue,
13 i, game_name=self.game_name,
14 save_dir=self.save_dir) for i in range(multiprocessing.cpu_count())]
15
16 for i, worker in enumerate(workers):
17 print("Starting worker {}".format(i))
18 worker.start()
19
20 moving_average_rewards = [] # record episode reward to plot
21 while True:
22 reward = res_queue.get()
23 if reward is not None:
24 moving_average_rewards.append(reward)
25 else:
26 break
27 [w.join() for w in workers]
28
29 plt.plot(moving_average_rewards)
30 plt.ylabel('Moving average ep reward')
31 plt.xlabel('Step')
32 plt.savefig(os.path.join(self.save_dir,
33 '{} Moving Average.png'.format(self.game_name)))
34 plt.show()
存储分类 — 保留我们的经验
此外,为了更容易追踪训练,我们还会实行存储分类。该分类会简单地提供相关功能,以追踪我们在每一步出现的操作、奖励和状态。
1 class Memory:
2 def __init__(self):
3 self.states = []
4 self.actions = []
5 self.rewards = []
6
7 def store(self, state, action, reward):
8 self.states.append(state)
9 self.actions.append(action)
10 self.rewards.append(reward)
11
12 def clear(self):
13 self.states = []
14 self.actions = []
15 self.rewards = []
现在,我们来了解算法的关键:工作器智能体。工作器智能体继承自线程类,而且我们会替换来自线程的运行方法。这使我们可以达成 A3C 的第一个 A,即异步。首先,我们对本地模型进行实例化并设置特定的训练参数。
1 class Worker(threading.Thread):
2 # Set up global variables across different threads
3 global_episode = 0
4 # Moving average reward
5 global_moving_average_reward = 0
6 best_score = 0
7 save_lock = threading.Lock()
8
9 def __init__(self,
10 state_size,
11 action_size,
12 global_model,
13 opt,
14 result_queue,
15 idx,
16 game_name='CartPole-v0',
17 save_dir='/tmp'):
18 super(Worker, self).__init__()
19 self.state_size = state_size
20 self.action_size = action_size
21 self.result_queue = result_queue
22 self.global_model = global_model
23 self.opt = opt
24 self.local_model = ActorCriticModel(self.state_size, self.action_size)
25 self.worker_idx = idx
26 self.game_name = game_name
27 self.env = gym.make(self.game_name).unwrapped
28 self.save_dir = save_dir
29 self.ep_loss = 0.0
运行算法
下一步是执行运行函数。这实际上会运行我们的算法。我们会运行指定全局最多游戏次数的所有线程。这就是 A3C 的第三个 A(动作)在发挥作用。我们的智能体会根据策略函数 “行动”,在操作由 “评价” 评判时变为动作,即我们的值函数。这部分的代码看起来可能很密集,但真正发挥的作用不多。在每次游戏中,代码仅发挥以下作用:
获取我们基于当前框架的策略(操作概率分布)
按步骤执行根据策略选择的操作
如果智能体已采取固定数量的步骤 (args.update_freq) 或智能体已达到终止状态(已结束),则:a.使用本地模型计算的梯度更新全局模型
重复步骤
如何计算损失?
工作器智能体会计算损失,以获得有关其全部网络参数的梯度。这是 A3C 的最后一个 A(优势)在发挥作用。然后,这些损失会应用于全局网络。损失的计算方法如下:
值损失: L=∑(R — V(s))²
策略损失: L = -log(𝝅(s)) * A(s)
其中 R 为折扣奖励,V 为值函数(在输入状态下),𝛑 为策略函数(也在输入状态下),而 A 为优势函数。由于我们不能直接使用 A3C 来确定 Q 的值,我们会使用折扣奖励来估算 Q 的值。
就是这样!工作器智能体会重复以下流程:将网络参数重置为全局网络中的所有参数,并反复与其环境互动、计算损失,然后将梯度应用于全局网络。您可以通过运行以下命令来训练您的算法:python a3c_cartpole.py — train。
测试算法
我们通过启动新环境和仅采用已训练模型的策略输出来测试算法。这会呈现我们的环境和我们模型的策略分布中的示例。
对模型进行训练后,您可以使用以下命令运行算法:python a3c_cartpole.py。
要检验我们移动的平均得分:
我们需要查看分数 >200 的范围内的分数。游戏得到 “解决” 的定义是在超过 100 次连续试验中获得 195.0 的平均奖励。
在新环境中的示例表现如下:
关键知识点
我们的学习内容:
我们通过执行 A3C 解决了 CartPole!
我们通过使用 Eager Execution、模型子类化及自定义训练循环达到了这一目的。
Eager 是开发训练循环的简单方法,由于我们能够直接打印和调试张量,该方法使编码更加简单清楚。
我们学习了使用策略和值网络进行强化学习的基础知识,然后我们综合运用这些知识来执行 A3C。
我们使用 tf. 梯度应用优化的更新规则,以反复更新全局网络。
TensorFlow 社区活动:
· 程序员工作环境大 Battle!(火热进行中...)
☟ 点击 “阅读原文”
参加 TensorFlow 社区活动