查看原文
其他

深度强化学习:通过异步优势动作评价 (A3C) 算法玩 CartPole

Google TensorFlow 2018-11-07

文 / 软件工程实习生 Raymond Yuan


在本教程中,我们将学习如何使用深度强化学习来训练模型,使其能够在简单的 CartPole 游戏中获胜。我们会使用 tf.keras 和 OpenAI Gym 并通过被称为异步优势动作评价 (A3C) 的技术来训练智能体。强化学习一直以来备受瞩目,但它到底是什么呢?强化学习是机器学习的一个领域,其中的智能体需要在环境中执行特定操作,从而使奖励最大化或获得某些奖励。


在此过程中,我们会积累关于下列概念的实际经验,并培养对这些概念的理解:

  • Eager Execution — Eager Execution 是一个由运行定义的命令式接口。一旦用户从 Python 中调用此接口,便可立即执行运算。这让 TensorFlow 的入门变得更加简单,并让研发变得更加直观。

  • 模型子类化 — 模型子类化让用户可以通过将 tf.keras 模型子类化并定义自己的正向传递,来构建可完全自定义的模型。在启用 Eager Execution 时,模型子类化特别有用,因为用户可以命令式地写入正向传递。

  • 自定义训练循环

注:tf.keras 模型链接

https://www.tensorflow.org/api_docs/python/tf/keras/Model


我们会按照以下基本工作流程来进行介绍:

  • 构建主智能体监控器

  • 构建工作器智能体

  • 执行 A3C 算法

  • 训练智能体

  • 直观呈现训练表现


受众:本教程面向所有对强化学习感兴趣的人。虽然我们不会深入探讨机器学习的基础知识,但会大体涵盖策略和值网络等话题。此外,我建议大家阅读一篇论文,即 Volodymyr Mnih 所著的《深度强化学习的异步方法》(Asynchronous Methods for Deep Reinforcement Learning)。这篇论文更详细地介绍了相关算法,非常值得一读。



CartPole 是什么?

Cartpole 是一个游戏,其中有一根杆子通过非驱动关节与小车相连,而小车会沿着无摩擦轨道移动。系统会随机对起始状态(小车位置、小车移动速度、杆子角度,以及杆子顶端的摆动速度)进行初始化(介于 +/-0.05 之间)。我们通过对小车施加 +1 或 -1 的力度来控制系统(向左或向右移动)。杆子一开始是竖直的,而我们的目标是阻止杆子倒下。当杆子保持竖直时,系统在每个时步都会提供 +1 的奖励。如果杆子与垂直方向的夹角大于 15 度,或者小车偏离中心超过 2.4 个单位距离,则游戏结束。


代码

您可以点击此链接获取本文中的完整代码,并在自述文件中找到安装说明。

注:此链接

https://github.com/tensorflow/models/blob/master/research/a3c_blogpost/a3c_cartpole.py

自述文件链接

https://github.com/tensorflow/models/tree/master/research/a3c_blogpost



建立基线

要正确判断您模型的实际表现和您用于评估模型的指标,建立基线通常是非常有用的方法。例如,当您看到返回的分数很高时,您的模型似乎运行良好,但实际上高分或许不能反映算法的好坏或随机操作的结果。在分类示例中,我们可以简单分析类别分布及预测最常见的类别,以建立基线表现。但是,如何建立强化学习的基线呢?为此我们将创建一个随机智能体,使其在我们的环境中只执行随机操作。


对于 CartPole 游戏,我们在 4000 次游戏中平均每次获得大约 20 个奖励。要运行随机智能体,请运行所提供的 py 文件:python a3c_cartpole.py — algorithm=random — max-eps=4000。



何为异步优势动作评价算法?

异步优势动作评价真是晦涩难懂啊!我们首先拆分这个算法的名称,然后再分析算法本身背后的机制。


  • 异步:此为异步算法,可以并行训练多个工作器智能体,其中每个智能体都有自己的模型和环境副本。由于能够并行训练更多工作器,我们的算法可以加快训练速度;此外,由于每个工作器的经验都是独立的,智能体可获得更多样化的训练经验。

  • 优势:优势不但是衡量操作优劣的指标,也是判断结果好坏的因素。这使算法可以侧重于缺少网络预测的方面。直观地说,这让我们可以衡量在指定时步执行操作 a 而非采用策略 π 的优势。

  • 动作评价:此算法的动作评价方面使用的架构可以共享策略和值函数之间的层。



但它有何工作原理呢?

大体来看,A3C 算法使用异步更新协议,会根据固定时步长度的经验来运作。此算法会使用这些时间段来计算奖励的估计量和优势函数。每个工作器都会执行下列工作流程周期:


  • 提取全局网络参数

  • 通过采用最少(t_max,进入终止状态的步数)步数的本地策略与环境交互。

  • 计算值和策略损失

  • 从损失中获取梯度

  • 使用梯度更新全局网络

  • 重复步骤


借助此训练配置,我们有望看到智能体的数量直线上升。但是,您的机器能够支持的智能体数量受到可用 CPU 核心数量的限制。此外,A3C 甚至可以扩展到多个机器,并且某些更新的研究(如 IMPALA)还支持更进一步扩展。但增加更多机器可能会对速度和性能带来不利影响。如要了解更多深入信息,请查看此论文

注:论文链接

https://arxiv.org/abs/1602.01783



复习策略和值函数

如果您已经熟知策略梯度,那么我建议您跳过这个部分。否则,如果您不知道何为策略/值,或者只是想快速复习一下,就请继续阅读吧!


策略的概念是指在指定某些输入状态的情况下,用参数表示操作的概率分布。我们通过创建网络来完成此操作,该网络会考虑游戏的状态,并决定我们应该做什么。同样地,当智能体在玩游戏时,每当它看到某个状态(或类似状态)时,它都会计算在指定输入状态下每种可用操作的概率,然后根据此概率分布对操作进行抽样。为了更正规地探究数学运算,我们将策略梯度作为更一般的记分函数梯度估计量的特例。一般案例以 Ex p(x | ) [f(x)] 的形式表达,换言之,在我们的案例中,奖励(或优势)函数的期望值为 f, 而在某些策略网络下则为 p。然后,使用对数求导技巧,我们得知如何更新网络参数,从而使操作样本获得更高的奖励,并以 ∇ Ex[f(x)] =Ex[f(x) ∇ log p(x)] 结束。简单来说,这个公式解释了根据奖励函数 f,我们梯度方向中的移位 θ 会如何使分数最大化。


值函数会从本质上判断某个状态的好坏。在形式上,当游戏以状态 s 开始并采用策略 p 时,值函数会定义预计的奖励总数。这便是与算法名称中的 “评价” 相关的部分。智能体会使用估算值(评价)来更新策略(动作)。



执行

首先,我们定义要使用的模型种类。主智能体会拥有全局网络,且每个本地工作器智能体在自己的进程中都会拥有此网络的副本。我们会使用模型子类化对模型进行实例化。虽然模型子类化会使进程更冗长,但却为我们提供了最大的灵活性。


1    class ActorCriticModel(keras.Model):    

2        def __init__(self, state_size, action_size):    

3            super(ActorCriticModel, self).__init__()    

4            self.state_size = state_size    

5            self.action_size = action_size    

6            self.dense1 = layers.Dense(100, activation='relu')    

7            self.policy_logits = layers.Dense(action_size)

           self.dense2 = layers.Dense(100, activation='relu')    

           self.values = layers.Dense(1) 

10

11        def call(self, inputs):    

12            # Forward pass    

13            x = self.dense1(inputs)    

14            logits = self.policy_logits(x)    

15            v1 = self.dense2(inputs)    

16            values = self.values(v1)    

17            return logits, values 


正如您从我们的正向传递中看到的,我们的模型会采用输入和返回策略概率的分对数和值。



主智能体 — 主线程

让我们看看运算的控制中心。主智能体拥有一个共享优化器,可以更新其全局网络。该智能体会对全局网络进行实例化,每个工作器智能体及我们用于更新智能体的优化器都会更新。研究表明,A3C 对各种学习率都具有很高的适应性,但针对 CartPole 游戏,我们会使用学习率为 5e-4 的 AdamOptimizer。


   class MasterAgent():  

2        def __init__(self):    

3            self.game_name = 'CartPole-v0'    

           save_dir = args.save_dir    

           self.save_dir = save_dir   

6            if not os.path.exists(save_dir):    

               os.makedirs(save_dir)    

8

           env = gym.make(self.game_name)   

10            self.state_size = env.observation_space.shape[0]    

11            self.action_size = env.action_space.n    

12            self.opt = tf.train.AdamOptimizer(args.lr, use_locking=True)   

13           print(self.state_size, self.action_size)   

14

15            self.global_model = ActorCriticModel(self.state_size, self.action_size)  # global network   

16            self.global_model(tf.convert_to_tensor(np.random.random((1, self.state_size)), dtype=tf.float32))


主智能体会运行训练函数来启动每个智能体并对其进行实例化。主智能体负责对每个智能体进行协调和监管。各个智能体会异步运行。(严格来说,这不是真正的异步,因为在 Python 中,由于 GIL(全局解释器锁)的存在,单一 Python 进程无法并行运行多个线程(利用多个核心),但可以同时运行多个线程(在 I/O 密集型运算期间进行上下文切换)。为了简要清晰地举例说明,我们使用线程执行指令)。


1    def train(self):    

2            if args.algorithm == 'random':    

3                    random_agent = RandomAgent(self.game_name, args.max_eps)    

                   random_agent.run()    

5                    return    

6

7                res_queue = Queue()

8

9                workers = [Worker(self.state_size,    

10                                        self.action_size,    

11                                        self.global_model,    

12                                        self.opt, res_queue,    

13                                        i, game_name=self.game_name,    

14                                        save_dir=self.save_dir) for i in range(multiprocessing.cpu_count())]    

15

16            for i, worker in enumerate(workers):    

17                print("Starting worker {}".format(i))    

18                worker.start() 

19

20            moving_average_rewards = []  # record episode reward to plot    

21            while True:    

22                reward = res_queue.get()    

23                if reward is not None:    

24                 moving_average_rewards.append(reward)

25                else:    

26                    break    

27            [w.join() for w in workers]

28

29            plt.plot(moving_average_rewards)    

30            plt.ylabel('Moving average ep reward')    

31            plt.xlabel('Step')    

32            plt.savefig(os.path.join(self.save_dir,    

33                                                        '{} Moving Average.png'.format(self.game_name)))    

34            plt.show()    



存储分类 — 保留我们的经验

此外,为了更容易追踪训练,我们还会实行存储分类。该分类会简单地提供相关功能,以追踪我们在每一步出现的操作、奖励和状态。


1    class Memory:    

2        def __init__(self):    

           self.states = []    

4            self.actions = []    

5            self.rewards = []   

6 

7        def store(self, state, action, reward):    

8            self.states.append(state)    

9            self.actions.append(action)    

10          self.rewards.append(reward)    

11

12    def clear(self):    

13        self.states = []    

14        self.actions = []    

15        self.rewards = [] 


现在,我们来了解算法的关键:工作器智能体。工作器智能体继承自线程类,而且我们会替换来自线程的运行方法。这使我们可以达成 A3C 的第一个 A,即异步。首先,我们对本地模型进行实例化并设置特定的训练参数。


1    class Worker(threading.Thread):    

2        # Set up global variables across different threads

       global_episode = 0    

4        # Moving average reward    

5        global_moving_average_reward = 0    

6        best_score = 0    

       save_lock = threading.Lock()

 

9        def __init__(self,    

10                            state_size,    

11                            action_size,    

12                            global_model,    

13                            opt,    

14                            result_queue,    

15                            idx,    

16                            game_name='CartPole-v0',    

17                            save_dir='/tmp'):    

18            super(Worker, self).__init__()    

19            self.state_size = state_size    

20            self.action_size = action_size    

21            self.result_queue = result_queue    

22            self.global_model = global_model    

23            self.opt = opt    

24            self.local_model = ActorCriticModel(self.state_size, self.action_size)    

25            self.worker_idx = idx    

26            self.game_name = game_name    

27            self.env = gym.make(self.game_name).unwrapped    

28            self.save_dir = save_dir    

29            self.ep_loss = 0.0 



运行算法

下一步是执行运行函数。这实际上会运行我们的算法。我们会运行指定全局最多游戏次数的所有线程。这就是 A3C 的第三个 A(动作)在发挥作用。我们的智能体会根据策略函数 “行动”,在操作由 “评价” 评判时变为动作,即我们的值函数。这部分的代码看起来可能很密集,但真正发挥的作用不多。在每次游戏中,代码仅发挥以下作用:


  • 获取我们基于当前框架的策略(操作概率分布)

  • 按步骤执行根据策略选择的操作

  • 如果智能体已采取固定数量的步骤 (args.update_freq) 或智能体已达到终止状态(已结束),则:a.使用本地模型计算的梯度更新全局模型

  • 重复步骤



如何计算损失?

工作器智能体会计算损失,以获得有关其全部网络参数的梯度。这是 A3C 的最后一个 A(优势)在发挥作用。然后,这些损失会应用于全局网络。损失的计算方法如下:


  • 值损失: L=∑(R — V(s))²

  • 策略损失: L = -log(𝝅(s)) * A(s)


其中 R 为折扣奖励,V 为值函数(在输入状态下),𝛑 为策略函数(也在输入状态下),而 A 为优势函数。由于我们不能直接使用 A3C 来确定 Q 的值,我们会使用折扣奖励来估算 Q 的值。


就是这样!工作器智能体会重复以下流程:将网络参数重置为全局网络中的所有参数,并反复与其环境互动、计算损失,然后将梯度应用于全局网络。您可以通过运行以下命令来训练您的算法:python a3c_cartpole.py — train。



测试算法

我们通过启动新环境和仅采用已训练模型的策略输出来测试算法。这会呈现我们的环境和我们模型的策略分布中的示例。


对模型进行训练后,您可以使用以下命令运行算法:python a3c_cartpole.py。


要检验我们移动的平均得分:


我们需要查看分数 >200 的范围内的分数。游戏得到 “解决” 的定义是在超过 100 次连续试验中获得 195.0 的平均奖励。


在新环境中的示例表现如下:



关键知识点

我们的学习内容:

  • 我们通过执行 A3C 解决了 CartPole!

  • 我们通过使用 Eager Execution、模型子类化及自定义训练循环达到了这一目的。

  • Eager 是开发训练循环的简单方法,由于我们能够直接打印和调试张量,该方法使编码更加简单清楚。

  • 我们学习了使用策略和值网络进行强化学习的基础知识,然后我们综合运用这些知识来执行 A3C。

  • 我们使用 tf. 梯度应用优化的更新规则,以反复更新全局网络。


TensorFlow 社区活动:

· 程序员工作环境大 Battle!(火热进行中...)


☟ 点击 “阅读原文” 

参加 TensorFlow 社区活动

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存