查看原文
其他

Horovod分布式计算原理与代码实现

高原 狗熊会 2023-08-15

背景介绍

近些年,深度学习已在图像处理、语音识别、机器翻译等领域取得了令人瞩目的成就。2016年,AlphaGo击败人类顶级棋手刷新了人们对机器智能的认知;最近,AlphaFold成功预测了98.5%的人类蛋白质结构,更是展现了深度学习在科学研究领域的巨大潜力。而我们也应当注意到,如今深度学习的蓬勃发展离不开高性能计算机硬件的支持,尤其是GPU(图形处理器)。与传统的CPU相比,GPU拥有更多的计算单元,能够更高效、并行地处理神经网络中的各种张量(Tensor)运算。因此,GPU逐渐成为了深度学习领域的基本硬件配置,各式主流深度学习框架(如TensorFlow,PyTorch)也都开发了相应的GPU版本。

然而随着学习任务变得复杂、精度要求变高,涉及的训练集也越来越大,对应的模型参数也越来越多。例如著名的图像数据集ImageNet拥有超过140万张图片,基于该数据集的ResNet50模型拥有超过两千万的参数,而最近的模型甚至超过了数十亿参数。很显然,要想在如此大的数据集上训练如此复杂的模型,单个GPU的计算能力是远远不够的。因此,必须要使用一些分布式计算策略。大体来说,深度学习的分布式策略可分为两种。第一种为数据并行(Data Parallelism),指的是将数据分为若干份,然后让不同GPU在不同的数据上进行计算;第二种为模型并行(Model Parallelism),也就是将神经网络模型的分成若干个部分,分别放在不同的GPU上。本文主要关注数据并行策略。

图 1 参数服务器框架示意图

数据并行策略中,较为经典的一种框架为参数服务器(Parameter Server,下文简称PS)框架。以图1为例,可以看到这是由1个PS和3个worker构成的一个中心化框架。其中,PS是用来更新参数的,而worker是用来在不同数据集上计算梯度的。具体而言,该框架下的分布式训练大致有如下流程:

  1. PS初始化模型的参数;
  2. PS将当前参数值广播给各个worker;
  3. 每个worker读取各自的训练数据并计算当前参数下的梯度,然后发送给PS;
  4. PS将各个worker返回的梯度做平均,并利用该梯度更新参数;
  5. 重复2~4步若干轮。
从这个过程中可以看到,每个worker都需要和PS进行信息交互,所以PS所承担的通讯成本非常高,尤其是worker数非常多时。比方说,假设待训练的模型共有p个参数,通讯速度(带宽)为B,共有M个GPU作为worker。那么,每进行一次参数迭代,PS接收梯度、发送参数的总通讯量为2Mp,因此所需的通讯时间为O(Mp/B)。可以看到,此时的通讯时间是随worker数线性增长的。当worker非常多时,通讯时间甚至会超过计算所需时间,此时整个系统的效率非常之低。例如图2所示,当GPU个数达到128时,实际计算效率甚至达不到理论效率的一半。因此,我们急需一种更加通讯有效的框架,避免大量的宝贵时间浪费在通讯上。

图 2 PS框架下,每秒处理的图片数与GPU个数的关系图

Ring-Allreduce计算原理

上面提到,PS框架中的PS负责与每个worker进行通讯,当worker数较多时整个系统会变得低效。因此,为了提高通讯效率,一个直接的想法为:能不能将通讯量分担到每个worker上?答案是肯定的,2017年百度[2]提出了一种新的分布式框架,称为Ring-Allreduce,它可以让每个worker都参与到计算平均梯度的过程中。2018年,Uber[1]基于该想法开发了基于TensorFlow的开源工具,也就是我们今天的主角:Horovod。下面我们简单介绍Ring-Allreduce计算原理。

图3为使用Ring-Allreduce计算梯度的示意图,可以看到这是一个由3个worker组成的非中心化框架(不再需要PS)。具体来说,假设模型共有6个参数,我们把参数分成3个部分(因为有3个worker),那么每个部分有2个参数。在第①步中,每个worker先分别根据训练数据计算梯度,对应于黄色方框中的6个数值。在②~③步中,每个worker将三块梯度中一个发送给邻近的worker,同时将接收到的梯度与本地相应位置的梯度相加。经过这两步,每个worker上各有一个块梯度综合了3个worker的梯度(见③中绿色小方格)。在④~⑤步中,每个worker将完整的那块梯度广播给其他worker。从⑤中可以看到,此时每个worker上的梯度都是所有worker上梯度的求和。之后,每个worker对梯度求均值,再据此更新参数即可。

图 3 Ring-Allreduce的计算流程示意图

从上述流程中不难总结,对于一个拥有M个worker的系统,每个worker需要与相邻的两个worker进行2(M-1)次通讯:前M-1次是为了计算梯度求和,后M-1次是为了将求和后的梯度广播给其他worker。注意到,每次通讯的数据量仅为p/M(即示意图中的小方格)。因此,每个worker的总通讯量为2(M-1)*(p/M)。假设每个worker的通讯速度(带宽)依然是B,那么完成一轮参数迭代所需的通讯时间为2(M-1)*(p/M)/B = O(p/B)。由于所有worker同时在进行通讯,所以总的通讯时间依然是O(p/B)。从这里我们可以看到,通讯时间与worker数M无关。因此,在worker数非常多时,我们期待Ring-Allreduce框架的表现要优于PS框架。具体性能对比可以参考图4。

图 4 PS框架与Ring-Allreduce框架性能对比图

代码实现

Horovod是一个融合了Ring-Allreduce框架的深度学习工具。它具有简单易上手的优点,仅需在原来的代码上修改若干行,就可以调用多GPU进行分布式训练。下面我们以一个简单例子来讲解如何调用Horovod来进行神经网络的分布式训练。代码运行环境为Python 3.8,CUDA 11.0,TensorFlow 2.4,Horovod 0.22.1,共使用了4个GPU(型号为P100-16GB)。如果读者觉得配置运行环境太复杂,可以考虑租赁云服务器,例如矩池云、阿里云等云计算平台。我们使用的数据为手写数字数据集MNIST,包含了60000个训练样本以及10000个测试样本。下面是代码详解。

import tensorflow as tf
import horovod.tensorflow.keras as hvd
import tensorflow.keras as keras
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D,Dense,Flatten,Input,MaxPooling2D 
from tensorflow.keras import Model
# 初始化horovod
hvd.init()
# 为每个GPU分配一个单独的进程(用于并行计算)
## 找到所有物理设备(此处为GPU)
gpus = tf.config.experimental.list_physical_devices('GPU'
print(f'{len(gpus)} GPUs are used!')
## 启用内存增长,防止全部内存被分配给单个GPU
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
## 设置当前程序可见的设备范围,保证了每个进程使用不同的GPU
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

首先,导入各种所需软件包,包含了horovod以及搭建神经网络所需的各种组件。为了调用Horovod,我们得先通过hvd.init()对Horovod进行初始化,然后为每个GPU分配一个单独的进程(Process),用于并行计算。

# 导入数据集,并进行预处理
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32')/255.0
x_test = x_test.astype('float32')/255.0
# 创建模型(LeNet5)
input_layer = Input([28,28,1])
x = input_layer
x = Conv2D(6,[5,5],padding = "same", activation = 'relu')(x) 
x = MaxPooling2D(pool_size = [2,2], strides = [2,2])(x)    
x = Conv2D(16,[5,5],padding = "valid", activation = 'relu')(x) 
x = MaxPooling2D(pool_size = [2,2], strides = [2,2])(x)
x = Flatten()(x)   
x = Dense(120,activation = 'relu')(x)
x = Dense(84,activation = 'relu')(x)
x = Dense(10,activation = 'softmax')(x)
output_layer=x
model=Model(input_layer,output_layer)
#model.summary()

接着,我们导入MNIST数据集并进行预处理。然后,我们创建了一个经典的卷积神经网络LeNet5,它共有61706个可训练参数。注意,此处只是为了简单演示如何调用Horovod,所以完全可以换成其他数据集和模型。

# 选择优化器,并设定学习率(此处学习率随GPU个数增加而增加,可自行设定)
opt = tf.optimizers.SGD(0.01 * hvd.size())
# 【将优化器封装成分布式优化器】
opt = hvd.DistributedOptimizer(opt)
# 对模型进行编译
## 设定参数`experimental_run_tf_function=False`让TensorFlow使用hvd.DistributedOptimizer()来计算梯度
model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                    optimizer=opt,
                    metrics=['accuracy'],
                    experimental_run_tf_function=False)

接下来,我们需要选择一个优化器,并对模型进行编译。需要注意的是,选择好优化器后,还需调用函数hvd.DistributedOptimizer()对它进行修饰。hvd.size()是调用的GPU个数,所以在当前环境下,它的值为4(因为有4个GPU)。此外,编译模型时,需要指定参数experimental_run_tf_function=False让TensorFlow采用分布式的策略来计算梯度。

# 【开始训练时,需要让所有GPU上模型的参数具有相同的初始值。】
# 例如在下面的回调(callbacks)中,将rank=0的GPU的参数值广播给其他GPU
callbacks = [
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# 在rank=0的GPU上设置检查点(checkpoint)定时保存训练参数,防止其他GPU训练时出错
if hvd.rank() == 0:
    # 每5个epoch保存一次当前的参数值
    callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5', save_freq='epoch', period = 5))  
# 模型训练
model.fit(x_train, y_train, validation_data=(x_test, y_test),
          # 【注意】:此处设定的是每个GPU的batch size,
          batch_size = 1000,
          callbacks=callbacks,
          epochs=20, # 训练20个epoch(总共经历的minibatch数为20*(60000/1000)*4
          verbose=1 if hvd.rank() == 0 else 0) # 只在rank=0的GPU上展示训练过程

最后,我们开始训练模型。这地方我们需要知道Horovod为每个GPU分配了一个编号,称为rank。例如我们有4个GPU,那么这些GPU的rank就分别为0,1,2,3,我们可以通过hvd.rank()来确认当前进程调用的是哪个GPU。此外,由于Ring-Allreduce框架中没有PS,所有worker都是独立更新参数的。所以,我们需要保证每个worker上的参数初始值相同。这可以在回调(callback)中加入hvd.callbacks.BroadcastGlobalVariablesCallback(0)来实现。它的作用是,在训练开始前,先将rank为0的GPU上的参数值发送给所有其他GPU。此外,为了防止其他GPU训练时出错,我们定期让rank为0的GPU将当前参数值进行保存。这样的话,即使训练出错,我们也可以从最近的检查点开始训练,而无需重新开始。

值得注意的是,batch_size = 1000表示单个GPU的每次读取的小批次样本量(minibatch size)为1000。由于训练数据是被随机打乱的(shuffled),所以4个GPU每次读取的小批次样本重合度较小,因此每次计算梯度的有效样本量约为1000*4=4000。注意到我们总共有60000个训练样本,所以每个GPU进行一个epoch训练需要经历60000/1000=60个小批次(minibatch)样本。由于有4个GPU,所以实际上一个epoch总共经历了60*4=240个小批次样本。

我们将上面的所有代码写入文件mnist_horovod.py中,并打开终端(Terminal)输入以下命令:

cd ‘mnist_horovod.py所在目录’
horovodrun -np 4 python mnist_horovod.py

第二行命令中的‘4’为调用的GPU个数,由于我们只有4个GPU,所以调用的个数不能超过4。等终端显示训练已经开始后,我们可以在另一个终端窗口中输入nvidia-smi -l 2命令,查看各个GPU的运行情况。如果程序正常运转,可以看到每个GPU都有一定的负载,见图5。需要注意的是,不能直接在jupyter notebook或python解释器中运行以上代码,否则无法成功调用所有GPU。

图 5 各GPU运行负载情况

总结

本文介绍了Horovod进行分布式计算的基本原理,并通过一个简单的例子具体展示了如何调用Horovod对神经网络进行分布式训练。关于Horovod的更多细节可以参考文献[1]以及官方文档(地址为https://horovod.readthedocs.io/en/stable/index.html)。希望大家都可以自己动手调用多个GPU,训练更多有趣的深度学习模型!

参考文献
[1] Sergeev A, Del Balso M. Horovod: fast and easy distributed deep learning in TensorFlow[J]. arXiv preprint arXiv:1802.05799, 2018.
[2] Gibiansky A. Bringing HPC techniques to deep learning[J]. Baidu Research, Tech. Rep., 2017.
- END -


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存