Horovod分布式计算原理与代码实现
背景介绍
近些年,深度学习已在图像处理、语音识别、机器翻译等领域取得了令人瞩目的成就。2016年,AlphaGo击败人类顶级棋手刷新了人们对机器智能的认知;最近,AlphaFold成功预测了98.5%的人类蛋白质结构,更是展现了深度学习在科学研究领域的巨大潜力。而我们也应当注意到,如今深度学习的蓬勃发展离不开高性能计算机硬件的支持,尤其是GPU(图形处理器)。与传统的CPU相比,GPU拥有更多的计算单元,能够更高效、并行地处理神经网络中的各种张量(Tensor)运算。因此,GPU逐渐成为了深度学习领域的基本硬件配置,各式主流深度学习框架(如TensorFlow,PyTorch)也都开发了相应的GPU版本。
然而随着学习任务变得复杂、精度要求变高,涉及的训练集也越来越大,对应的模型参数也越来越多。例如著名的图像数据集ImageNet拥有超过140万张图片,基于该数据集的ResNet50模型拥有超过两千万的参数,而最近的模型甚至超过了数十亿参数。很显然,要想在如此大的数据集上训练如此复杂的模型,单个GPU的计算能力是远远不够的。因此,必须要使用一些分布式计算策略。大体来说,深度学习的分布式策略可分为两种。第一种为数据并行(Data Parallelism),指的是将数据分为若干份,然后让不同GPU在不同的数据上进行计算;第二种为模型并行(Model Parallelism),也就是将神经网络模型的分成若干个部分,分别放在不同的GPU上。本文主要关注数据并行策略。
数据并行策略中,较为经典的一种框架为参数服务器(Parameter Server,下文简称PS)框架。以图1为例,可以看到这是由1个PS和3个worker构成的一个中心化框架。其中,PS是用来更新参数的,而worker是用来在不同数据集上计算梯度的。具体而言,该框架下的分布式训练大致有如下流程:
PS初始化模型的参数; PS将当前参数值广播给各个worker; 每个worker读取各自的训练数据并计算当前参数下的梯度,然后发送给PS; PS将各个worker返回的梯度做平均,并利用该梯度更新参数; 重复2~4步若干轮。
Ring-Allreduce计算原理
上面提到,PS框架中的PS负责与每个worker进行通讯,当worker数较多时整个系统会变得低效。因此,为了提高通讯效率,一个直接的想法为:能不能将通讯量分担到每个worker上?答案是肯定的,2017年百度[2]提出了一种新的分布式框架,称为Ring-Allreduce,它可以让每个worker都参与到计算平均梯度的过程中。2018年,Uber[1]基于该想法开发了基于TensorFlow的开源工具,也就是我们今天的主角:Horovod。下面我们简单介绍Ring-Allreduce计算原理。
从上述流程中不难总结,对于一个拥有M个worker的系统,每个worker需要与相邻的两个worker进行2(M-1)次通讯:前M-1次是为了计算梯度求和,后M-1次是为了将求和后的梯度广播给其他worker。注意到,每次通讯的数据量仅为p/M(即示意图中的小方格)。因此,每个worker的总通讯量为2(M-1)*(p/M)。假设每个worker的通讯速度(带宽)依然是B,那么完成一轮参数迭代所需的通讯时间为2(M-1)*(p/M)/B = O(p/B)。由于所有worker同时在进行通讯,所以总的通讯时间依然是O(p/B)。从这里我们可以看到,通讯时间与worker数M无关。因此,在worker数非常多时,我们期待Ring-Allreduce框架的表现要优于PS框架。具体性能对比可以参考图4。
代码实现
Horovod是一个融合了Ring-Allreduce框架的深度学习工具。它具有简单易上手的优点,仅需在原来的代码上修改若干行,就可以调用多GPU进行分布式训练。下面我们以一个简单例子来讲解如何调用Horovod来进行神经网络的分布式训练。代码运行环境为Python 3.8,CUDA 11.0,TensorFlow 2.4,Horovod 0.22.1,共使用了4个GPU(型号为P100-16GB)。如果读者觉得配置运行环境太复杂,可以考虑租赁云服务器,例如矩池云、阿里云等云计算平台。我们使用的数据为手写数字数据集MNIST,包含了60000个训练样本以及10000个测试样本。下面是代码详解。
import tensorflow as tf
import horovod.tensorflow.keras as hvd
import tensorflow.keras as keras
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D,Dense,Flatten,Input,MaxPooling2D
from tensorflow.keras import Model
# 初始化horovod
hvd.init()
# 为每个GPU分配一个单独的进程(用于并行计算)
## 找到所有物理设备(此处为GPU)
gpus = tf.config.experimental.list_physical_devices('GPU')
print(f'{len(gpus)} GPUs are used!')
## 启用内存增长,防止全部内存被分配给单个GPU
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
## 设置当前程序可见的设备范围,保证了每个进程使用不同的GPU
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
首先,导入各种所需软件包,包含了horovod以及搭建神经网络所需的各种组件。为了调用Horovod,我们得先通过hvd.init()
对Horovod进行初始化,然后为每个GPU分配一个单独的进程(Process),用于并行计算。
# 导入数据集,并进行预处理
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32')/255.0
x_test = x_test.astype('float32')/255.0
# 创建模型(LeNet5)
input_layer = Input([28,28,1])
x = input_layer
x = Conv2D(6,[5,5],padding = "same", activation = 'relu')(x)
x = MaxPooling2D(pool_size = [2,2], strides = [2,2])(x)
x = Conv2D(16,[5,5],padding = "valid", activation = 'relu')(x)
x = MaxPooling2D(pool_size = [2,2], strides = [2,2])(x)
x = Flatten()(x)
x = Dense(120,activation = 'relu')(x)
x = Dense(84,activation = 'relu')(x)
x = Dense(10,activation = 'softmax')(x)
output_layer=x
model=Model(input_layer,output_layer)
#model.summary()
接着,我们导入MNIST数据集并进行预处理。然后,我们创建了一个经典的卷积神经网络LeNet5,它共有61706个可训练参数。注意,此处只是为了简单演示如何调用Horovod,所以完全可以换成其他数据集和模型。
# 选择优化器,并设定学习率(此处学习率随GPU个数增加而增加,可自行设定)
opt = tf.optimizers.SGD(0.01 * hvd.size())
# 【将优化器封装成分布式优化器】
opt = hvd.DistributedOptimizer(opt)
# 对模型进行编译
## 设定参数`experimental_run_tf_function=False`让TensorFlow使用hvd.DistributedOptimizer()来计算梯度
model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
optimizer=opt,
metrics=['accuracy'],
experimental_run_tf_function=False)
接下来,我们需要选择一个优化器,并对模型进行编译。需要注意的是,选择好优化器后,还需调用函数hvd.DistributedOptimizer()
对它进行修饰。hvd.size()
是调用的GPU个数,所以在当前环境下,它的值为4(因为有4个GPU)。此外,编译模型时,需要指定参数experimental_run_tf_function=False
让TensorFlow采用分布式的策略来计算梯度。
# 【开始训练时,需要让所有GPU上模型的参数具有相同的初始值。】
# 例如在下面的回调(callbacks)中,将rank=0的GPU的参数值广播给其他GPU
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# 在rank=0的GPU上设置检查点(checkpoint)定时保存训练参数,防止其他GPU训练时出错
if hvd.rank() == 0:
# 每5个epoch保存一次当前的参数值
callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5', save_freq='epoch', period = 5))
# 模型训练
model.fit(x_train, y_train, validation_data=(x_test, y_test),
# 【注意】:此处设定的是每个GPU的batch size,
batch_size = 1000,
callbacks=callbacks,
epochs=20, # 训练20个epoch(总共经历的minibatch数为20*(60000/1000)*4
verbose=1 if hvd.rank() == 0 else 0) # 只在rank=0的GPU上展示训练过程
最后,我们开始训练模型。这地方我们需要知道Horovod为每个GPU分配了一个编号,称为rank。例如我们有4个GPU,那么这些GPU的rank就分别为0,1,2,3,我们可以通过hvd.rank()
来确认当前进程调用的是哪个GPU。此外,由于Ring-Allreduce框架中没有PS,所有worker都是独立更新参数的。所以,我们需要保证每个worker上的参数初始值相同。这可以在回调(callback)中加入hvd.callbacks.BroadcastGlobalVariablesCallback(0)
来实现。它的作用是,在训练开始前,先将rank为0的GPU上的参数值发送给所有其他GPU。此外,为了防止其他GPU训练时出错,我们定期让rank为0的GPU将当前参数值进行保存。这样的话,即使训练出错,我们也可以从最近的检查点开始训练,而无需重新开始。
值得注意的是,batch_size = 1000
表示单个GPU的每次读取的小批次样本量(minibatch size)为1000。由于训练数据是被随机打乱的(shuffled),所以4个GPU每次读取的小批次样本重合度较小,因此每次计算梯度的有效样本量约为1000*4=4000。注意到我们总共有60000个训练样本,所以每个GPU进行一个epoch训练需要经历60000/1000=60个小批次(minibatch)样本。由于有4个GPU,所以实际上一个epoch总共经历了60*4=240个小批次样本。
我们将上面的所有代码写入文件mnist_horovod.py中,并打开终端(Terminal)输入以下命令:
cd ‘mnist_horovod.py所在目录’
horovodrun -np 4 python mnist_horovod.py
第二行命令中的‘4’为调用的GPU个数,由于我们只有4个GPU,所以调用的个数不能超过4。等终端显示训练已经开始后,我们可以在另一个终端窗口中输入nvidia-smi -l 2
命令,查看各个GPU的运行情况。如果程序正常运转,可以看到每个GPU都有一定的负载,见图5。需要注意的是,不能直接在jupyter notebook或python解释器中运行以上代码,否则无法成功调用所有GPU。
总结
本文介绍了Horovod进行分布式计算的基本原理,并通过一个简单的例子具体展示了如何调用Horovod对神经网络进行分布式训练。关于Horovod的更多细节可以参考文献[1]以及官方文档(地址为https://horovod.readthedocs.io/en/stable/index.html)。希望大家都可以自己动手调用多个GPU,训练更多有趣的深度学习模型!