查看原文
其他

【他山之石】PyTorch | DDP系列:入门教程、实现原理与源代码解析、实战与技巧

“他山之石,可以攻玉”,站在巨人的肩膀才能看得更高,走得更远。在科研的道路上,更需借助东风才能更快前行。为此,我们特别搜集整理了一些实用的代码链接,数据集,软件,编程技巧等,开辟“他山之石”专栏,助你乘风破浪,一路奋勇向前,敬请关注。

作者:知乎—996黄金一代

地址:https://www.zhihu.com/people/magic-frog-sjtu

DistributedDataParallel(DDP)是一个支持多机多卡、分布式训练的深度学习工程方法。PyTorch现已原生支持DDP,可以直接通过torch.distributed使用,超方便,不再需要难以安装的apex库啦!

Life is short, I love PyTorch

想要让你的PyTorch神经网络在多卡环境上跑得又快又好?那你definitely需要这一篇!

No one knows DDP better than I do!
– – MagicFrog(手动狗头)

01

入门教程
本节主要在下述四个方面展开描述:
  • DDP的原理?
    • 在分类上,DDP属于Data Parallel。简单来讲,就是通过提高batch size来增加并行度。
  • 为什么快?
    • DDP通过Ring-Reduce的数据交换方法提高了通讯效率,并通过启动多个进程的方式减轻Python GIL的限制,从而提高训练速度。
  • DDP有多快?
    • 一般来说,DDP都是显著地比DP快,能达到略低于卡数的加速比(例如,四卡下加速3倍)。所以,其是目前最流行的多机多卡训练方法。
  • 怎么用DDP?
    • 有点长,但是给你一个简单、完整的示例!
请欢快地开始阅读吧!

1.1 Quick Start

不想看原理?给你一个最简单的DDP Pytorch例子!

依赖

PyTorch(gpu)>=1.5,python>=3.6

环境准备

推荐使用官方打好的PyTorch docker,避免乱七八糟的环境问题影响心情。
# Dockerfile# Start FROM Nvidia PyTorch image https://ngc.nvidia.com/catalog/containers/nvidia:pytorchFROM nvcr.io/nvidia/pytorch:20.03-py3

代码

单GPU代码## main.py文件import torch
# 构造模型model = nn.Linear(10, 10).to(local_rank)
# 前向传播outputs = model(torch.randn(20, 10).to(rank))labels = torch.randn(20, 10).to(rank)loss_fn = nn.MSELoss()loss_fn(outputs, labels).backward()# 后向传播optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)optimizer.step()
## Bash运行python main.py加入DDP的代码## main.py文件import torch# 新增:import torch.distributed as dist
# 新增:从外面得到local_rank参数import argparseparser = argparse.ArgumentParser()parser.add_argument("--local_rank", default=-1)FLAGS = parser.parse_args()local_rank = FLAGS.local_rank
# 新增:DDP backend初始化torch.cuda.set_device(local_rank)dist.init_process_group(backend='nccl') # nccl是GPU设备上最快、最推荐的后端
# 构造模型device = torch.device("cuda", local_rank)model = nn.Linear(10, 10).to(device)# 新增:构造DDP modelmodel = DDP(model, device_ids=[local_rank], output_device=local_rank)
# 前向传播outputs = model(torch.randn(20, 10).to(rank))labels = torch.randn(20, 10).to(rank)loss_fn = nn.MSELoss()loss_fn(outputs, labels).backward()# 后向传播optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)optimizer.step()
## Bash运行# 改变:使用torch.distributed.launch启动DDP模式,# 其会给main.py一个local_rank的参数。这就是之前需要"新增:从外面得到local_rank参数"的原因python -m torch.distributed.launch --nproc_per_node 4 main.py

1.2 DDP的基本原理

大白话原理

假如我们有N张显卡,
1.(缓解GIL限制)在DDP模式下,会有N个进程被启动,每个进程在一张卡上加载一个模型,这些模型的参数在数值上是相同的。
2.(Ring-Reduce加速)在模型训练时,各个进程通过一种叫Ring-Reduce的方法与其他进程通讯,交换各自的梯度,从而获得所有进程的梯度;
3.(实际上就是Data Parallelism)各个进程用平均后的梯度更新自己的参数,因为各个进程的初始参数、更新梯度是一致的,所以更新后的参数也是完全相同的。
是不是很简单呢?

与DP模式的不同

那么,DDP对比Data Parallel(DP)模式有什么不同呢?
DP模式是很早就出现的、单机多卡的、参数服务器架构的多卡训练模式,在PyTorch,即是:
model = torch.nn.DataParallel(model)
在DP模式中,总共只有一个进程(受到GIL很强限制)。master节点相当于参数服务器,其会向其他卡广播其参数;在梯度反向传播后,各卡将梯度集中到master节点,master节点对搜集来的参数进行平均后更新参数,再将参数统一发送到其他卡上。这种参数更新方式,会导致master节点的计算任务、通讯量很重,从而导致网络阻塞,降低训练速度。
但是DP也有优点,优点就是代码实现简单。要速度还是要方便,看官可以自行选用噢。

1.3 DDP为什么能加速?

本节对上面出现的几个概念进行一下介绍,看完了你就知道为什么DDP这么快啦!

Python GIL

GIL是个很捉急的东西,如果大家有被烦过的话,相信会相当清楚。如果不了解的同学,可以自行百度一下噢。
这里简要介绍下其最大的特征(缺点):Python GIL的存在使得,一个python进程只能利用一个CPU核心,不适合用于计算密集型的任务。
使用多进程,才能有效率利用多核的计算资源。
而DDP启动多进程训练,一定程度地突破了这个限制。

Ring-Reduce梯度合并

Ring-Reduce是一种分布式程序的通讯方法。
  • 因为提高通讯效率,Ring-Reduce比DP的parameter server快。
    • 其避免了master阶段的通讯阻塞现象,n个进程的耗时是o(n)。
  • 详细的介绍:ring allreduce和tree allreduce的具体区别是什么?(https://www.zhihu.com/question/57799212/answer/612786337)
简单说明
图片来自:https://www.zhihu.com/question/57799212/answer/612786337
  • 各进程独立计算梯度。
  • 每个进程将梯度依次传递给下一个进程,之后再把从上一个进程拿到的梯度传递给下一个进程。循环n次(进程数量)之后,所有进程就可以得到全部的梯度了。
    • 可以看到,每个进程只跟自己上下游两个进程进行通讯,极大地缓解了参数服务器的通讯阻塞现象!

并行计算

图片来自:https://medium.com/@esaliya/model-parallelism-in-deep-learning-is-not-what-you-think-94d2f81e82ed
统一来讲,神经网络中的并行有以下三种形式:
  1. Data Parallelism
    1. 这是最常见的形式,通俗来讲,就是增大batch size。
      1. 平时我们看到的多卡并行就属于这种。比如DP、DDP都是。这能让我们方便地利用多卡计算资源。
    2. 能加速。
  2. Model Parallelism
    1. 把模型放在不同GPU上,计算是并行的。
    2. 有可能是加速的,看通讯效率。
  3. Workload Partitioning
    1. 把模型放在不同GPU上,但计算是串行的。
    2. 不能加速。


1.4 如何在PyTorch中使用DDP

看到这里,你应该对DDP是怎么运作的,为什么能加速有了一定的了解,下面就让我们学习一下怎么使用DDP吧!

1.5 如何在PyTorch中使用DDP:DDP模式

DDP有不同的使用模式。DDP的官方最佳实践是,每一张卡对应一个单独的GPU模型(也就是一个进程),在下面介绍中,都会默认遵循这个pattern。
举个例子:我有两台机子,每台8张显卡,那就是2x8=16个进程,并行数是16。
但是,我们也是可以给每个进程分配多张卡的。总的来说,分为以下三种情况:
  1. 每个进程一张卡。这是DDP的最佳使用方法。
  2. 每个进程多张卡,复制模式。一个模型复制在不同卡上面,每个进程都实质等同于DP模式。这样做是能跑得通的,但是,速度不如上一种方法,一般不采用。
  3. 每个进程多张卡,并行模式。一个模型的不同部分分布在不同的卡上面。例如,网络的前半部分在0号卡上,后半部分在1号卡上。这种场景,一般是因为我们的模型非常大,大到一张卡都塞不下batch size = 1的一个模型。
在本文中,先不会讲每个进程多张卡要怎么操作,免得文章过于冗长。在这里,只是让你知道有这个东西,用的时候再查阅文档。


1.6 如何在PyTorch中使用DDP:概念

下面介绍一些PyTorch分布式编程的基础概念。

基本概念

在16张显卡,16的并行数下,DDP会同时启动16个进程。下面介绍一些分布式的概念。
group
即进程组。默认情况下,只有一个组。这个可以先不管,一直用默认的就行。
world size
表示全局的并行数,简单来讲,就是2x8=16。
# 获取world size,在不同进程里都是一样的,得到16torch.distributed.get_world_size()
rank
表现当前进程的序号,用于进程间通讯。对于16的world sizel来说,就是0,1,2,…,15。
注意:rank=0的进程就是master进程。
# 获取rank,每个进程都有自己的序号,各不相同torch.distributed.get_rank()
local_rank
又一个序号。这是每台机子上的进程的序号。机器一上有0,1,2,3,4,5,6,7,机器二上也有0,1,2,3,4,5,6,7
# 获取local_rank。一般情况下,你需要用这个local_rank来手动设置当前模型是跑在当前机器的哪块GPU上面的。torch.distributed.local_rank()


1.7 如何在PyTorch中使用DDP:详细流程

精髓

DDP的使用非常简单,因为它不需要修改你网络的配置。其精髓只有一句话
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
原本的model就是你的PyTorch模型,新得到的model,就是你的DDP模型。
最重要的是,后续的模型关于前向传播、后向传播的用法,和原来完全一致!DDP把分布式训练的细节都隐藏起来了,不需要暴露给用户,非常*优雅*!
(对于有时间的人,如果你想知道DDP的实现方式,请看DDP第二节进阶部分)

准备工作

但是,在套`model = DDP(model)`之前,我们还是需要做一番准备功夫,把环境准备好的。
这里需要注意的是,我们的程序虽然会在16个进程上跑起来,但是它们跑的是同一份代码,所以在写程序的时候要处理好不同进程的关系。
## main.py文件import torchimport argparse
# 新增1:依赖import torch.distributed as distfrom torch.nn.parallel import DistributedDataParallel as DDP
# 新增2:从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数,后面还会介绍。所以不用考虑太多,照着抄就是了。# argparse是python的一个系统库,用来处理命令行调用,如果不熟悉,可以稍微百度一下,很简单!parser = argparse.ArgumentParser()parser.add_argument("--local_rank", default=-1)FLAGS = parser.parse_args()local_rank = FLAGS.local_rank
# 新增3:DDP backend初始化# a.根据local_rank来设定当前使用哪块GPUtorch.cuda.set_device(local_rank)# b.初始化DDP,使用默认backend(nccl)就行。如果是CPU模型运行,需要选择其他后端。dist.init_process_group(backend='nccl')
# 新增4:定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前做哦。# 如果要加载模型,也必须在这里做哦。device = torch.device("cuda", local_rank)model = nn.Linear(10, 10).to(device)# 可能的load模型...
# 新增5:之后才是初始化DDP模型model = DDP(model, device_ids=[local_rank], output_device=local_rank)

前向与后向传播

有一个很重要的概念,就是数据的并行化。
我们知道,DDP同时起了很多个进程,但是他们用的是同一份数据,那么就会有数据上的冗余性。也就是说,你平时一个epoch如果是一万份数据,现在就要变成1*16=16万份数据了。
那么,我们需要使用一个特殊的sampler,来使得各个进程上的数据各不相同,进而让一个epoch还是1万份数据。
幸福的是,DDP也帮我们做好了!
my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!# sampler的原理,后面也会介绍。train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)
for epoch in range(num_epochs): # 新增2:设置sampler的epoch,DistributedSampler需要这个来维持各个进程之间的相同随机数种子 trainloader.sampler.set_epoch(epoch) # 后面这部分,则与原来完全一致了。 for data, label in trainloader: prediction = model(data) loss = loss_fn(prediction, label) loss.backward() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) optimizer.step()

其他需要注意的地方

  • 保存参数
# 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。# 因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。# 2. 我只需要在进程0上保存一次就行了,避免多次保存重复的东西。if dist.get_rank() == 0: torch.save(model.module, "saved_model.ckpt")
  • 理论上,DDP性能和单卡Gradient Accumulation性能是完全一致的。
    • 并行度为8的DDP 等于 Gradient Accumulation Step为8的单卡
    • 速度上,DDP当然比Graident Accumulation的单卡快;
      • 但是还有加速空间。请见DDP系列第三节:实战。
    • 如果要对齐性能,需要确保喂进去的数据,在DDP下和在单卡Gradient Accumulation下是一致的。
      • 这个说起来简单,但对于复杂模型,可能是相当困难的。

调用方式

像我们在QuickStart里面看到的,DDP模型下,python源代码的调用方式和原来的不一样了。现在,需要用torch.distributed.launch来启动训练。
  • 作用
    • 在这里,我们给出分布式训练的重要参数:
      • 有多少台机器?
        • --nnodes
      • 当前是哪台机器?
        • --node_rank
      • 每台机器有多少个进程?
        • --nproc_per_node
      • 高级参数(可以先不看,多机模式才会用到)
        • 通讯的address
        • 通讯的port
  • 实现方式
    • 我们需要在每一台机子(总共m台)上都运行一次torch.distributed.launch
    • 每个torch.distributed.launch会启动n个进程,并给每个进程一个--local_rank=i的参数
      • 这就是之前需要"新增:从外面得到local_rank参数"的原因
    • 这样我们就得到n*m个进程,world_size=n*m
单机模式
## Bash运行# 假设我们只在一台机器上运行,可用卡数是8python -m torch.distributed.launch --nproc_per_node 8 main.py
多机模式
复习一下,master进程就是rank=0的进程。
在使用多机模式前,需要介绍两个参数:
  • 通讯的address
    • --master_address
    • 也就是master进程的网络地址
    • 默认是:127.0.0.1,只能用于单机。
  • 通讯的port
    • --master_port
    • 也就是master进程的一个端口,要先确认这个端口没有被其他程序占用了哦。一般情况下用默认的就行
    • 默认是:29500
## Bash运行# 假设我们在2台机器上运行,每台可用卡数是8# 机器1:python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node 8 \ --master_adderss $my_address --master_port $my_port main.py# 机器2:python -m torch.distributed.launch --nnodes=2 --node_rank=1 --nproc_per_node 8 \ --master_adderss $my_address --master_port $my_port main.py
小技巧
# 假设我们只用4,5,6,7号卡CUDA_VISIBLE_DEVICES="4,5,6,7" python -m torch.distributed.launch --nproc_per_node 4 main.py# 假如我们还有另外一个实验要跑,也就是同时跑两个不同实验。# 这时,为避免master_port冲突,我们需要指定一个新的。这里我随便敲了一个。CUDA_VISIBLE_DEVICES="4,5,6,7" python -m torch.distributed.launch --nproc_per_node 4 \ --master_port 53453 main.py

mp.spawn调用方式

PyTorch引入了torch.multiprocessing.spawn,可以使得单卡、DDP下的外部调用一致,即不用使用torch.distributed.launch。python main.py一句话搞定DDP模式。
给一个mp.spawn的文档:代码文档(https://pytorch.org/docs/stable/_modules/torch/multiprocessing/spawn.html#spawn)
下面给一个简单的demo:
def demo_fn(rank, world_size): dist.init_process_group("nccl", rank=rank, world_size=world_size) # lots of code. ...
def run_demo(demo_fn, world_size): mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True)
mp.spawn与launch各有利弊,请按照自己的情况选用。
按照笔者个人经验,如果算法程序是提供给别人用的,那么mp.spawn更方便,因为不用解释launch的用法;但是如果是自己使用,launch更有利,因为你的内部程序会更简单,支持单卡、多卡DDP模式也更简单。


1.8 总结

在本节中,我们介绍了DDP的加速原理,和基本用法。如果你能充分理解文章内容,那么,你可以说对DDP初步入门了,可以开始改造你的算法程序,来吃掉多卡训练速度提升这波红利了!
最后让我们来总结一下所有的代码,这份是一份能直接跑的代码,推荐收藏!
################## main.py文件import argparsefrom tqdm import tqdmimport torchimport torchvisionimport torch.nn as nnimport torch.nn.functional as F# 新增:import torch.distributed as distfrom torch.nn.parallel import DistributedDataParallel as DDP
### 1. 基础模块 ### # 假设我们的模型是这个,与DDP无关class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.conv1 = nn.Conv2d(3, 6, 5) self.pool = nn.MaxPool2d(2, 2) self.conv2 = nn.Conv2d(6, 16, 5) self.fc1 = nn.Linear(16 * 5 * 5, 120) self.fc2 = nn.Linear(120, 84) self.fc3 = nn.Linear(84, 10) def forward(self, x): x = self.pool(F.relu(self.conv1(x))) x = self.pool(F.relu(self.conv2(x))) x = x.view(-1, 16 * 5 * 5) x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x# 假设我们的数据是这个def get_dataset(): transform = torchvision.transforms.Compose([ torchvision.transforms.ToTensor(), torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ]) my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform) # DDP:使用DistributedSampler,DDP帮我们把细节都封装起来了。 # 用,就完事儿!sampler的原理,第二节中有介绍。 train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset) # DDP:需要注意的是,这里的batch_size指的是每个进程下的batch_size。 # 也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。 trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=16, num_workers=2, sampler=train_sampler) return trainloader
### 2. 初始化我们的模型、数据、各种配置 ##### DDP:从外部得到local_rank参数parser = argparse.ArgumentParser()parser.add_argument("--local_rank", default=-1, type=int)FLAGS = parser.parse_args()local_rank = FLAGS.local_rank
# DDP:DDP backend初始化torch.cuda.set_device(local_rank)dist.init_process_group(backend='nccl') # nccl是GPU设备上最快、最推荐的后端
# 准备数据,要在DDP初始化之后进行trainloader = get_dataset()
# 构造模型model = ToyModel().to(local_rank)# DDP: Load模型要在构造DDP模型之前,且只需要在master上加载就行了。ckpt_path = Noneif dist.get_rank() == 0 and ckpt_path is not None: model.load_state_dict(torch.load(ckpt_path))# DDP: 构造DDP modelmodel = DDP(model, device_ids=[local_rank], output_device=local_rank)
# DDP: 要在构造DDP model之后,才能用model初始化optimizer。optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
# 假设我们的loss是这个loss_func = nn.CrossEntropyLoss().to(local_rank)
### 3. 网络训练 ###model.train()iterator = tqdm(range(100))for epoch in iterator: # DDP:设置sampler的epoch,DistributedSampler需要这个来维持各个进程之间的相同随机数种子 trainloader.sampler.set_epoch(epoch) # 后面这部分,则与原来完全一致了。 for data, label in trainloader: data, label = data.to(local_rank), label.to(local_rank) optimizer.zero_grad() prediction = model(data) loss = loss_func(prediction, label) loss.backward() iterator.desc = "loss = %0.3f" % loss optimizer.step() # DDP: # 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。 # 因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。 # 2. 只需要在进程0上保存一次就行了,避免多次保存重复的东西。 if dist.get_rank() == 0: torch.save(model.module.state_dict(), "%d.ckpt" % epoch)

################## Bash运行# DDP: 使用torch.distributed.launch启动DDP模式# 使用CUDA_VISIBLE_DEVICES,来决定使用哪些GPU# CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 main.py

1.9 Citation

1. 很全面的知乎上的文章:会飞的闲鱼:Pytorch 分布式训练

2. pytorch 官方入门:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

3. pytorch 官方设计笔记:https://pytorch.org/docs/master/notes/ddp.html

4. 关于并行的介绍:https://medium.com/@esaliya/model-parallelism-in-deep-learning-is-not-what-you-think-94d2f81e82ed


02

实现原理与源代码解析

https://medium.com/@esaliya/model-parallelism-in-deep-learning-is-not-what-you-think-94d2f81e82ed

本节主要聚焦于DDP原理和源代码解析。

虽然是进阶篇,但是本节力求做到简单易懂,涉及的新概念都会有讲解、引用。看完这节后,你的收获将是:

  1. 了解分布式计算的概念

  2. 了解PyTorch模型的状态表示和构成

  3. 学习DDP的精巧的实现技巧

  4. 学会如何debug你的DDP模型

请欢快地开始阅读吧!

依赖

pytorch(gpu)>=1.5,python>=3.6


2.1 背景概念

在正式介绍之前,我们先认识一些基本概念,打好基础。地基是很重要的,请各位同学认真学习哦!

分布式编程

一个分布式系统,相对于单机系统,其最大的特征就是,其数据、处理是分布在不同地方的。与此相伴的是,各节点间有交换数据的需求,为此需要定义交换数据的规范、接口。在此基础上,才能构建起分布式计算的大框架。比如很有名的google大数据三驾马车之一的`map-reduce`概念,简要地描述,就是将数据分开成N份map到N个地方,并行进行处理;处理完成后,再将结果reduce到一起。

为了满足分布式编程的需求,PyTorch提供了一些分布式基本接口,在torch.distributed中。有兴趣的可以自己翻阅:文档 and 代码

下图阐述了PyTorch实现的分布式接口:

记住我们使用的是最常用的NCCL后端,是GPU上优化做得最好的后端。

在DDP这里,我们重点介绍一下最重要的实现,all_reduce。

  1. 所谓的reduce,就是不同节点各有一份数据,把这些数据汇总到一起。在这里,我们规定各个节点上的这份数据有着相同的shape和data type,并规定汇总的方法是相加。简而言之,就是把各个节点上的一份相同规范的数据相加到一起。

  2. 所谓的all_reduce,就是在reduce的基础上,把最终的结果发回到各个节点上。

DDP利用all_reduce,来进行不同进程上的梯度的平均操作。PyTorch提供了几个all_reduce的版本,下面这个就是Ring-Reduce版本(我们在前面阐述了为什么Ring-Reduce是一个更好的版本):

def all_reduce(tensor, op=ReduceOp.SUM, group=group.WORLD, async_op=False): """ Reduces the tensor data across all machines in such a way that all get the final result. After the call ``tensor`` is going to be bitwise identical in all processes. Arguments: tensor (Tensor): Input and output of the collective. The function operates in-place. op (optional): One of the values from ``torch.distributed.ReduceOp`` enum. Specifies an operation used for element-wise reductions. group (ProcessGroup, optional): The process group to work on async_op (bool, optional): Whether this op should be an async op Returns: Async work handle, if async_op is set to True. None, if not async_op or if not part of the group """

PyTorch 数据结构基础

DDP到底和什么数据结构打交道呢?我们要首先解决这些问题:

  1. 我们知道,DDP下各进程不同步参数而是同步参数的变化量,所以各进程的模型的状态同一性是非常重要的。那么模型的状态由什么构成呢?

  2. DDP是怎么做到,无论是什么模型进来,一个简单的model = DDP(model)就可以解决问题呢?它的逻辑是怎么嵌入到模型中的?

buffer

解决第一个问题,需要了解buffer的概念。
在PyTorch中,所有的模型都会继承module类。可以说,一个CNN模型,其就是由一系列module组合而成的。要了解模型,就必须从module下手。下面是module的初始化代码,可以看到,它定义了一系列变量。可以说,这些变量就组成了一个module的基本要素。

代码

# torch.nn.modules.py. line 71. Class module: def __init__(self): """ Initializes internal Module state, shared by both nn.Module and ScriptModule. """ torch._C._log_api_usage_once("python.nn_module")
self.training = True self._parameters = OrderedDict() self._buffers = OrderedDict() self._backward_hooks = OrderedDict() self._forward_hooks = OrderedDict() self._forward_pre_hooks = OrderedDict() self._state_dict_hooks = OrderedDict() self._load_state_dict_pre_hooks = OrderedDict() self._modules = OrderedDict()

总的来说,module的基本要素可以分为2组,一组是状态,一组是各种各样的hooks。状态有以下4个东西:

  • self.training

    • 指的是网络是否在训练状态中。这是个非常宏观的状态,大家都知道这个是啥,可以略过。

  • self._modules

    • modules是下属的模块,相当于迭代地定义了self.trainig, self._modules, self._parameters等一系列变量

  • self._parameters

    • 指的就是网络的参数

  • self._buffers

    • 不是参数,但也对网络很重要,会被持久化保存的数据。

    • 举个例子,BatchNorm中的moving mean and variance就是buffer,其优化不是通过梯度反向传播而是通过其他途径。

从本质上讲,当一个模型的网络结构被定义后,其状态就是由parameter和buffer的迭代组合表示的。当我们保存模型,调用model.staic_dict()的时候,我们同时会得到模型的parameter和buffer;也就是说,在DDP中,如果我们要在不同进程中维持相同的状态,我们不光要传递parameter的梯度,也要传递buffer。事实上,DDP就是这么做的。当每次网络传播开始前,其都会把master节点上的buffer广播给其他节点,维持状态的统一。

hook

回答第二个问题,需要了解hook的概念。
hook的中文是`钩子`,是一种技术概念。用形象的话讲,hook提供了这么一种机制:程序提供hook接口,用户可以写一个hook函数,然后钩在hook接口,即程序的主体上从而可以插入到中间执行。DDP使用hook技术把自己的逻辑插入到module的训练过程中去。

在前一节文章中,曾经讲过

在模型训练时,各个进程通过一种叫Ring-Reduce的方法与其他进程通讯,从而获得所有进程的梯度;

那么,Ring-Reduce机制是怎么插入到module中去的呢?这归功于PyTorch提供了很多个hook接口!
其中,就有一个是,parameter在反向梯度计算结束后提供了一个hook接口。DDP把Ring-Reduce的代码写成一个hook函数,插入到这里。每次parameter的反向梯度计算结束后,程序就会调用这个hook函数,从而开启Ring-Reduce流程。因为所有模型都用到parameter,所以DDP模型用hook函数就解决了所有模型的梯度平均问题了!

下面,我们来看看其具体的代码实现

torch.nn.parameter

torch.nn.parameter只是torch.Tensor上的一层概念封装,没什么时候特别的。hook机制也是定义在torch.Tensor中的。

torch.tensor.Tensor

有一点需要说明,DDP的关键代码(即梯度平均)是用C++实现的。但是,在C++、python代码中Tensor都给出了hook接口,实现相似的功能。所以我们可以看下Tensor的python hook接口的文档,来理解下hook这个概念。

# line 200. Class Tensor. def register_hook(self, hook): r"""Registers a backward hook. The hook will be called every time a gradient with respect to the Tensor is computed. The hook should have the following signature:: hook(grad) -> Tensor or None The hook should not modify its argument, but it can optionally return a new gradient which will be used in place of :attr:`grad`. This function returns a handle with a method ``handle.remove()`` that removes the hook from the module. Example:: >>> v = torch.tensor([0., 0., 0.], requires_grad=True) >>> h = v.register_hook(lambda grad: grad * 2) # double the gradient >>> v.backward(torch.tensor([1., 2., 3.])) >>> v.grad 2 4 6 [torch.FloatTensor of size (3,)] >>> h.remove() # removes the hook """


2.2 DDP内部实现

Finally,经过一系列铺垫,终于要来讲DDP是怎么实现的了。在读到这里的时候,你应该对DDP的大致原理、PyTorch是怎么训练的有一定的了解。现在就来了解一下最底层的细节吧!
下面,我们会给出具体源代码的URL,复习一下不同的DDP模式,给出一份DDP训练流程的伪代码,最后总结一下易错的注意事项。

代码位置

DDP的代码主要在以下几个地方:

https://github.com/pytorch/pytorch/blob/v1.5.0/torch/nn/parallel/distributed.py

https://github.com/pytorch/pytorch/blob/v1.5.0/torch/distributed/distributed_c10d.py

https://github.com/pytorch/pytorch/blob/v1.5.0/torch/csrc/distributed/c10d/reducer.h

同时推荐一个官方设计笔记(https://pytorch.org/docs/stable/notes/ddp.html),讲得很详细,有兴趣可以看看。

DDP模式

之前我们介绍过DDP模式。在这里,我们复习一下。因为,在接下来的DDP流程介绍中,我们是要处理不同的模式的。

1. 每个进程一张卡。这是DDP的最佳使用方法。
2. 每个进程多张卡,复制模式。一个模型复制在不同卡上面,每个进程都实质等同于DP模式。这样做是能跑得通的,但是,速度不如上一种方法,一般不采用。
3. 每个进程多张卡,并行模式。一个模型的不同部分分布在不同的卡上面。例如,网络的前半部分在0号卡上,后半部分在1号卡上。这种场景,一般是因为我们的模型非常大,大到一张卡都塞不下batch size = 1的一个模型。

正篇!正篇!DDP流程的伪代码

我们总结了一个DDP模型在训练过程中的伪代码,来清晰地描述DDP的细节。
DDP很简单,但是流程并不简单。额外的代码主要是在,处理不同的DDP模式以及加速。刨去这些,主体其实是很简单的,所以不要害怕,大胆看完!

准备阶段

  1. 环境准备(就是init_process_group这一步)。各个进程会在这一步,与master节点进行握手,建立连接。

    1. 注释:如果连接上的进程数量不足约定的 word_size,进程会一直等待。也就是说,如果你约定了world_size=64,但是只开了6台8卡机器,那么程序会一直暂停在这个地方。

  2. DDP初始化(也就是model = DDP(model)这一步)

    1. 把parameter,buffer从master节点传到其他节点,使所有进程上的状态一致。

      1. 注释:DDP通过这一步保证所有进程的初始状态一致。所以,请确保在这一步之后,你的代码不会再修改模型的任何东西了,包括添加、修改、删除parameter和buffer!

    2. (可能)如果有每个节点有多卡,则在每张卡上创建模型(类似DP)

    3. 把parameter进行分组,每一组称为一个bucket。临近的parameter在同一个bucket。

      1. 注释:这是为了加速,在梯度通讯时,先计算、得到梯度的bucket会马上进行通讯,不必等到所有梯度计算结束才进行通讯。后面会详细介绍。

    4. 创建管理器reducer,给每个parameter注册梯度平均的hook。

      1. 注释:这一步的具体实现是在C++代码里面的,即reducer.h文件。

    5. (可能)为可能的SyncBN层做准备

正式训练阶段

在每个step中,DDP模型都会做下面的事情:

  1. 采样数据,从dataloader得到一个batch的数据,用于当前计算(for data, label in dataloader)。

    1. 注释:因为我们的dataloader使用了DistributedSampler,所以各个进程之间的数据是不会重复的。如果要确保DDP性能和单卡性能一致,这边需要保证在数据上,DDP模式下的一个epoch和单卡下的一个epoch是等效的。

  2. 进行网络的前向计算(prediction = model(data))

    1. 同步各进程状态

      1. (可能)对单进程多卡复制模式,要在进程内同步多卡之间的parameter和buffer

      2. 同步各进程之间的buffer。

    2. 接下来才是进行真正的前向计算

    3. (可能)当DDP参数find_unused_parameter为true时,其会在forward结束时,启动一个回溯,标记出所有没被用到的parameter,提前把这些设定为ready。

      1. 注释:find_unused_parameter的默认值是false,因为其会拖慢速度。

  3. 计算梯度(loss.backward())

    1. reducer外面:各个进程各自开始反向地计算梯度。

      1. 注释:梯度是反向计算的,所以最后面的参数反而是最先得到梯度的。

    2. reducer外面:当某个parameter的梯度计算好了的时候,其之前注册的grad hook就会被触发,在reducer里把这个parameter的状态标记为ready。

    3. reducer里面:当某个bucket的所有parameter都是ready状态时,reducer会开始对这个bucket的所有parameter都开始一个异步的all-reduce梯度平均操作。

      1. 注释:

        1. bucket的执行过程也是有顺序的,其顺序与parameter是相反的,即最先注册的parameter的bucket在最后面。

        2. 所以,我们在创建module的时候,请务必把先进行计算的parameter注册在前面,后计算的在后面。不然,reducer会卡在某一个bucket等待,使训练时间延长!

          1. 所谓的参数注册,其实就是创建网络层。也就是要求按照网络计算顺序,依次创建网络层。

    4. reducer里面:当所有bucket的梯度平均都结束后,reducer才会把得到的平均grad结果正式写入到parameter.grad里面。

      1. 注释:这一步,感觉没有必要等全部结束之后才进行。可能得对照一下源码。

  4. 优化器optimizer应用gradient,更新参数(optimizer.step())。

    1. 注释:这一步,是和DDP没关系的。

虽然DDP的实现代码与optimizer没有关系,但是关于optimizer有个额外的东西需要说明。更新后的参数最终能在各进程间保持一致,是由以下因素保证的:

  1. 参数初始值相同

  2. 参数更新值相同

    1. 更新值相同又是由以下因素保证的:

      1. optimizer初始状态相同

      2. 每次opimizer.step()时的梯度相同。

我们可以看到,因为optimizer和DDP是没有关系的,所以optimizer初始状态的同一性是不被DDP保证的!

大多数官方optimizer,其实现能保证从同样状态的model初始化时,其初始状态是相同的。所以这边我们只要保证在DDP模型创建后才初始化optimizer,就不用做额外的操作。但是,如果自定义optimizer,则需要你自己来保证其统一性!

回顾一下文章最开始的代码,你会发现,optimizer确实是在DDP之后定义的。这个时候的模式已经是被初始化为相同的参数,所以能够保证优化器的初始状态是相同的。

# 新增:构造DDP modelmodel = DDP(model, device_ids=[local_rank], output_device=local_rank)
# 优化器:要在构造DDP model之后,才能初始化model。optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.8)

为什么速度没怎么提升/性能下降

很多同学可能有这么一个问题,我加入了DDP,为什么速度没怎么提升/性能下降了呢?我给大家准备了一个check list。

  • 是否遵循了“单进程单卡”这样的最佳工程实践?

    • “单进程多卡复制模式”在速度上不是最优的,而且不被PyTorch社区优先支持,避免使用。

  • 是否使用了默认的NCCL后端?

    • 用就完事。

  • 各进程的模型是否相同?

    • 用户必须保证,不同进程里的模型都是相同结构的;保证parameter(你可以理解为网络层)的创建顺序是一致的。

  • 模型的parameter创建顺序是否与真实计算顺序一致?

    • 这涉及到bucket的通讯效率优化

  • 产生DDP模型后,是否手动动了它的参数?

    • 不允许在产生DDP后,新增、减少、随机修改、替换参数,会造成梯度reduce出错、各进程间的参数不相同、丢失hook机制。

  • DDP模式下的一个epoch的数据和单卡下的一个epoch的数据是否是等效的?

    • 实际上,n卡的DDP模式,理论上可以等价于n次gradient accumulation的单卡模式。所以,确保你的数据,也是这样的。

    • 如果出现性能下降,切记数据是最有可能出现问题的地方!

  • 是否保证初始状态的同一性?

    • parameter、buffer初始状态同一性

    • optimizer初始状态同一性


2.3 DistributedSampler机制

最后,我们额外介绍一下DDP的DistributedSampler机制。

不知道你有没有好奇,为什么给dataloader加一个DistributedSampler,就可以无缝对接DDP模式呢?其实原理很简单,就是给不同进程分配数据集的不重叠、不交叉部分。那么问题来了,每次epoch我们都会随机shuffle数据集,那么,不同进程之间要怎么保持shuffle后数据集的一致性呢?DistributedSampler的实现方式是,不同进程会使用一个相同的随机数种子,这样shuffle出来的东西就能确保一致。

具体实现上,DistributedSampler使用当前epoch作为随机数种子,从而使得不同epoch下有不同的shuffle结果。所以,记得每次epoch开始前都要调用一下sampler的set_epoch方法,这样才能让数据集随机shuffle起来。

下面看一下DistributedSampler的核心源代码:

代码

# line 56 def __iter__(self): # deterministically shuffle based on epoch g = torch.Generator() g.manual_seed(self.epoch) if self.shuffle: indices = torch.randperm(len(self.dataset), generator=g).tolist() else: indices = list(range(len(self.dataset)))

# add extra samples to make it evenly divisible indices += indices[:(self.total_size - len(indices))] assert len(indices) == self.total_size
# subsample indices = indices[self.rank:self.total_size:self.num_replicas] assert len(indices) == self.num_samples
return iter(indices)# line 79 def set_epoch(self, epoch): self.epoch = epoch


2.4 总结

在本节中,我们详细介绍了DDP的原理和底层代码实现。如果你能完全理解,相信你对深度学习中的并行加速、分布式计算会有更深入的认识。知己知彼,方能百战不殆,对DDP有透彻的了解,才能让你的模型以最快的速度跑起来,加快实验迭代速度,极大地提高产出!


03

实战与技巧

在上两节中,我们已经对DDP的理论、代码进行了充分、详细的介绍,相信大家都已经了然在胸。但是,实践也是很重要的。正所谓理论联系实践,如果只掌握理论而不进行实践,无疑是纸上谈兵。

在这节文章里,我们通过几个实战例子,来给大家介绍一下DDP在实际生产中的应用。希望能对大家有所帮助!

  1. 在DDP中引入SyncBN

  2. DDP下的Gradient Accumulation的进一步加速

  3. 多机多卡环境下的inference加速

  4. 保证DDP性能:确保数据的一致性

  5. 和DDP有关的小技巧

    1. 控制不同进程的执行顺序

    2. 避免DDP带来的冗余输出

请欢快地开始阅读吧!

依赖:pytorch(gpu)>=1.5,python>=3.6


3.1 在DDP中引入SyncBN

什么是Batch Normalization(BN)? 这里就不多加以介绍。附上BN文章(https://arxiv.org/abs/1502.03167)。接下来,让我们来深入了解下BN在多级多卡环境上的完整实现:SyncBN。

什么是SyncBN?

SyncBN就是Batch Normalization(BN)。其跟一般所说的普通BN的不同在于工程实现方式:SyncBN能够完美支持多卡训练,而普通BN在多卡模式下实际上就是单卡模式。

我们知道,BN中有moving mean和moving variance这两个buffer,这两个buffer的更新依赖于当前训练轮次的batch数据的计算结果。但是在普通多卡DP模式下,各个模型只能拿到自己的那部分计算结果,所以在DP模式下的普通BN被设计为只利用主卡上的计算结果来计算moving mean和moving variance,之后再广播给其他卡。这样,实际上BN的batch size就只是主卡上的batch size那么大。当模型很大、batch size很小时,这样的BN无疑会限制模型的性能。

为了解决这个问题,PyTorch新引入了一个叫SyncBN的结构,利用DDP的分布式计算接口来实现真正的多卡BN。

SyncBN的原理

SyncBN的原理很简单:SyncBN利用分布式通讯接口在各卡间进行通讯,从而能利用所有数据进行BN计算。为了尽可能地减少跨卡传输量,SyncBN做了一个关键的优化,即只传输各自进程的各自的 小batch mean和 小batch variance,而不是所有数据。具体流程请见下面:

  1. 前向传播

    1. 在各进程上计算各自的 小batch mean和小batch variance

    2. 各自的进程对各自的 小batch mean和小batch variance进行all_gather操作,每个进程都得到s的全局量。

      1. 注释:只传递mean和variance,而不是整体数据,可以大大减少通讯量,提高速度。

    3. 每个进程分别计算总体mean和总体variance,得到一样的结果

      1. 注释:在数学上是可行的,有兴趣的同学可以自己推导一下。

    4. 接下来,延续正常的BN计算。

      1. 注释:因为从前向传播的计算数据中得到的batch mean和batch variance在各卡间保持一致,所以,running_mean和running_variance就能保持一致,不需要显式地同步了!

  2. 后向传播:和正常的一样

贴一下关键代码,有兴趣的同学可以研究下:pytorch源码(https://github.com/pytorch/pytorch/blob/release/1.5/torch/nn/modules/_functions.py#L5)

SyncBN与DDP的关系

一句话总结,当前PyTorch SyncBN只在DDP单进程单卡模式中支持。SyncBN用到 all_gather这个分布式计算接口,而使用这个接口需要先初始化DDP环境。

复习一下DDP的伪代码中的准备阶段中的DDP初始化阶段

d. 创建管理器reducer,给每个parameter注册梯度平均的hook。
i. 注释:这一步的具体实现是在C++代码里面的,即reducer.h文件。
e. (可能)为可能的SyncBN层做准备

这里有三个点需要注意:

  • 这里的为可能的SyncBN层做准备,实际上就是检测当前是否是DDP单进程单卡模式,如果不是,会直接停止。

  • 这告诉我们,SyncBN需要在DDP环境初始化后初始化,但是要在DDP模型前就准备好。

  • 为什么当前PyTorch SyncBN只支持DDP单进程单卡模式?

    • 从SyncBN原理中我们可以看到,其强依赖了all_gather计算,而这个分布式接口当前是不支持单进程多卡或者DP模式的。当然,不排除未来也是有可能支持的。

怎么用SyncBN?

怎么样才能在我们的代码引入SyncBN呢?很简单:

# DDP initdist.init_process_group(backend='nccl')
# 按照原来的方式定义模型,这里的BN都使用普通BN就行了。model = MyModel()# 引入SyncBN,这句代码,会将普通BN替换成SyncBN。model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
# 构造DDP模型model = DDP(model, device_ids=[local_rank], output_device=local_rank)

又是熟悉的模样,像DDP一样,一句代码就解决了问题。这是怎么做到的呢?

convert_sync_batchnorm的原理:

torch.nn.SyncBatchNorm.convert_sync_batchnorm会搜索model里面的每一个module,如果发现这个module是、或者继承了torch.nn.modules.batchnorm._BatchNorm类,就把它替换成SyncBN。也就是说,如果你的Normalization层是自己定义的特殊类,没有继承过_BatchNorm类,那么convert_sync_batchnorm是不支持的,需要你自己实现一个新的SyncBN!

下面给一下convert_sync_batchnorm的源码(https://github.com/pytorch/pytorch/blob/v1.5.0/torch/nn/modules/batchnorm.py#L474),可以看到convert的过程中,新的SyncBN复制了原来的BN层的所有参数:

@classmethod def convert_sync_batchnorm(cls, module, process_group=None): r"""Helper function to convert all :attr:`BatchNorm*D` layers in the model to :class:`torch.nn.SyncBatchNorm` layers. """ module_output = module if isinstance(module, torch.nn.modules.batchnorm._BatchNorm): module_output = torch.nn.SyncBatchNorm(module.num_features, module.eps, module.momentum, module.affine, module.track_running_stats, process_group) if module.affine: with torch.no_grad(): module_output.weight = module.weight module_output.bias = module.bias module_output.running_mean = module.running_mean module_output.running_var = module.running_var module_output.num_batches_tracked = module.num_batches_tracked for name, child in module.named_children(): module_output.add_module(name, cls.convert_sync_batchnorm(child, process_group)) del module return module_output


3.2 DDP下的Gradient Accumulation的进一步加速

什么是Gradient Accmulation?

Gradient Accumulation,即梯度累加,相信大家都有所了解,是一种增大训练时batch size的技术,造福了无数硬件条件窘迫的我等穷人。不了解的同学请看这个知乎链接。

为什么还能进一步加速?

我们仔细思考一下DDP下的gradient accumulation。

# 单卡模式,即普通情况下的梯度累加for 每次梯度累加循环 optimizer.zero_grad() for 每个小step prediction = model(data) loss_fn(prediction, label).backward() # 积累梯度,不应用梯度改变 optimizer.step() # 应用梯度改变

我们知道,DDP的gradient all_reduce阶段发生在loss_fn(prediction, label).backward()。这意味着,在梯度累加的情况下,假设一次梯度累加循环有K个step,每次梯度累加循环会进行K次 all_reduce!但事实上,每次梯度累加循环只会有一次 optimizer.step(),即只应用一次参数修改,这意味着在每一次梯度累加循环中,我们其实只要进行一次gradient all_reduce即可满足要求,有K-1次 all_reduce被浪费了!而每次 all_reduce的时间成本是很高的!

如何加速

解决问题的思路在于,对前K-1次step取消其梯度同步。幸运的是,DDP给我们提供了一个暂时取消梯度同步的context函数 no_sync()(源代码)(https://github.com/pytorch/pytorch/blob/master/torch/nn/parallel/distributed.py#L548)。在这个context下,DDP不会进行梯度同步。

所以,我们可以这样实现加速:

model = DDP(model)
for 每次梯度累加循环 optimizer.zero_grad() # 前K-1个step,不进行梯度同步,累积梯度。 for K-1个小step: with model.no_sync(): prediction = model(data) loss_fn(prediction, label).backward() # 第K个step,进行梯度同步 prediction = model(data) loss_fn(prediction, label).backward() optimizer.step()

给一个优雅写法(同时兼容单卡、DDP模式哦):

from contextlib import nullcontext# 如果你的python版本小于3.7,请注释掉上面一行,使用下面这个:# from contextlib import suppress as nullcontext
if local_rank != -1: model = DDP(model)
optimizer.zero_grad()for i, (data, label) in enumerate(dataloader): # 只在DDP模式下,轮数不是K整数倍的时候使用no_sync my_context = model.no_sync if local_rank != -1 and i % K != 0 else nullcontext with my_context(): prediction = model(data) loss_fn(prediction, label).backward() if i % K == 0: optimizer.step() optimizer.zero_grad()

是不是很漂亮!


3.3 多机多卡环境下的inference加速

问题

有一些非常现实的需求,相信大家肯定碰到过:

  1. 一般,训练中每几个epoch我们会跑一下inference、测试一下模型性能。在DDP多卡训练环境下,能不能利用多卡来加速inference速度呢?

  2. 我有一堆数据要跑一些网络推理,拿到inference结果。DP下多卡加速比太低,能不能利用DDP多卡来加速呢?

解法

这两个问题实际是同一个问题。答案肯定是可以的,但是,没有现成、省力的方法。

测试和训练的不同在于:

  1. 测试的时候不需要进行梯度反向传播,inference过程中各进程之间不需要通讯。

  2. 测试的时候,不同模型的inference结果、性能指标的类型多种多样,没有统一的形式。

    1. 我们很难定义一个统一的框架,像训练时model=DDP(model)那样方便地应用DDP多卡加速。

解决问题的思路很简单,就是各个进程中各自进行单卡的inference,然后把结果收集到一起。单卡inference很简单,我们甚至可以直接用DDP包装前的模型。问题其实只有两个:

  • 我们要如何把数据split到各个进程中

  • 我们要如何把结果合并到一起

如何把数据split到各个进程中:新的data sampler

大家肯定还记得,在训练的时候,我们用的 torch.utils.data.distributed.DistributedSampler帮助我们把数据不重复地分到各个进程上去。但是,其分的方法是:每段连续的N个数据,拆成一个一个,分给N个进程,所以每个进程拿到的数据不是连续的。这样,不利于我们在inference结束的时候将结果合并到一起。

所以,这里我们需要实现一个新的data sampler。它的功能,是能够连续地划分数据块,不重复地分到各个进程上去。直接给代码:

# 来源:https://github.com/huggingface/transformers/blob/447808c85f0e6d6b0aeeb07214942bf1e578f9d2/src/transformers/trainer_pt_utils.pyclass SequentialDistributedSampler(torch.utils.data.sampler.Sampler): """ Distributed Sampler that subsamples indicies sequentially, making it easier to collate all results at the end. Even though we only use this sampler for eval and predict (no training), which means that the model params won't have to be synced (i.e. will not hang for synchronization even if varied number of forward passes), we still add extra samples to the sampler to make it evenly divisible (like in `DistributedSampler`) to make it easy to `gather` or `reduce` resulting tensors at the end of the loop. """
def __init__(self, dataset, batch_size, rank=None, num_replicas=None): if num_replicas is None: if not torch.distributed.is_available(): raise RuntimeError("Requires distributed package to be available") num_replicas = torch.distributed.get_world_size() if rank is None: if not torch.distributed.is_available(): raise RuntimeError("Requires distributed package to be available") rank = torch.distributed.get_rank() self.dataset = dataset self.num_replicas = num_replicas self.rank = rank self.batch_size = batch_size self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.batch_size / self.num_replicas)) * self.batch_size self.total_size = self.num_samples * self.num_replicas
def __iter__(self): indices = list(range(len(self.dataset))) # add extra samples to make it evenly divisible indices += [indices[-1]] * (self.total_size - len(indices)) # subsample indices = indices[self.rank * self.num_samples : (self.rank + 1) * self.num_samples] return iter(indices)
def __len__(self): return self.num_samples

如何把结果合并到一起: all_gather

通过torch.distributed提供的分布式接口all_gather,我们可以把各个进程的prediction结果集中到一起。

难点就在这里。因为世界上存在着千奇百怪的神经网络模型,有着千奇百怪的输出,所以,把数据集中到一起不是一件容易的事情。但是,如果你的网络输出在不同的进程中有着一样的大小,那么这个问题就好解多了。下面给一个方法,其要求网络的prediction结果在各个进程中的大小是一模一样的:

# 合并结果的函数# 1. all_gather,将各个进程中的同一份数据合并到一起。# 和all_reduce不同的是,all_reduce是平均,而这里是合并。# 2. 要注意的是,函数的最后会裁剪掉后面额外长度的部分,这是之前的SequentialDistributedSampler添加的。# 3. 这个函数要求,输入tensor在各个进程中的大小是一模一样的。def distributed_concat(tensor, num_total_examples): output_tensors = [tensor.clone() for _ in range(torch.distributed.get_world_size())] torch.distributed.all_gather(output_tensors, tensor) concat = torch.cat(output_tensors, dim=0) # truncate the dummy elements added by SequentialDistributedSampler return concat[:num_total_examples]

完整的流程

结合上面的介绍,我们可以得到下面这样一个完整的流程。

## 构造测试集# 假定我们的数据集是这个transform = torchvision.transforms.Compose([ torchvision.transforms.ToTensor(), torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ])my_testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)# 使用我们的新samplertest_sampler = SequentialDistributedSampler(my_testset, batch_size=16)testloader = torch.utils.data.DataLoader(my_testset, batch_size=16, sampler=test_sampler)
# DDP和模型初始化,略。# ......
# 正式训练和evaluationfor epoch in range(total_epoch_size): # 训练代码,略 # ....... # 开始测试 with torch.no_grad(): # 1. 得到本进程的prediction predictions = [] labels = [] for data, label in testloader: data, label = data.to(local_rank), label.to(local_rank) predictions.append(model(data)) labels.append(label) # 进行gather predictions = distributed_concat(torch.concat(predictions, dim=0), len(test_sampler.dataset)) labels = distributed_concat(torch.concat(labels, dim=0), len(test_sampler.dataset)) # 3. 现在我们已经拿到所有数据的predictioin结果,进行evaluate! my_evaluate_func(predictions, labels)

更简化的解法

  1. 如果我们的目的只是得到性能数字,那么,我们甚至可以直接在各个进程中计算各自的性能数字,然后再合并到一起。上面给的解法,是为了更通用的情景。一切根据你的需要来定!

  2. 我们可以单向地把predictions、labels集中到 rank=0的进程,只在其进行evaluation并输出。PyTorch也提供了相应的接口(链接(https://pytorch.org/docs/stable/distributed.html),send和recv)。


3.4 保证DDP性能:确保数据的一致性

性能期望

从原理上讲,当没有开启SyncBN时,(或者更严格地讲,没有BN层;但一般有的话影响也不大),以下两种方法训练出来的模型应该是性能相似的:

  • 进程数为N的DDP训练

  • accumulation为N、其他配置完全相同的单卡训练

如果我们发现性能对不上,那么,往往是DDP中的某些设置出了问题。在DDP系列第二节中,我们介绍过一个check list,可以根据它检查下自己的配置。其中,在造成性能对不齐的原因中,最有可能的是数据方面出现了问题。

DDP训练时,数据的一致性必须被保证:各个进程拿到的数据,要像是accumulation为N、其他配置完全相同的单卡训练中同个accumulation循环中不同iteration拿到的数据。想象一下,如果各个进程拿到的数据是一样的,或者分布上有任何相似的地方,那么,这就会造成训练数据质量的下降,最终导致模型性能下降。

容易错的点:随机数种子

为保证实验的可复现性,一般我们会在代码在开头声明一个固定的随机数种子,从而使得同一个配置下的实验,无论启动多少次,都会拿到同样的结果。

import randomimport numpy as npimport torch
def init_seeds(seed=0, cuda_deterministic=True): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) # Speed-reproducibility tradeoff https://pytorch.org/docs/stable/notes/randomness.html if cuda_deterministic: # slower, more reproducible cudnn.deterministic = True cudnn.benchmark = False else: # faster, less reproducible cudnn.deterministic = False cudnn.benchmark = True

def main(): # 一般都直接用0作为固定的随机数种子。 init_seeds(0)

但是在DDP训练中,如果还是像以前一样,使用0作为随机数种子,不做修改,就会造成以下后果:

  1. DDP的N个进程都使用同一个随机数种子

  2. 在生成数据时,如果我们使用了一些随机过程的数据扩充方法,那么,各个进程生成的数据会带有一定的同态性。

    1. 比如说,YOLOv5会使用mosaic数据增强(从数据集中随机采样3张图像与当前的拼在一起,组成一张里面有4张小图的大图)。这样,因为各卡使用了相同的随机数种子,你会发现,各卡生成的图像中,除了原本的那张小图,其他三张小图都是一模一样的!

  3. 同态性的数据,降低了训练数据的质量,也就降低了训练效率!最终得到的模型性能,很有可能是比原来更低的。

所以,我们需要给不同的进程分配不同的、固定的随机数种子:

def main(): rank = torch.distributed.get_rank() # 问题完美解决! init_seeds(1 + rank)


3.5 和DDP有关的小技巧

控制不同进程的执行顺序

一般情况下,各个进程是各自执行的,速度有快有慢,只有在gradient all-reduce的时候,快的进程才会等一下慢的进程,也就是进行同步。那么,如果我们需要在其他地方进行同步呢?比如说,在加载数据前,如果数据集不存在,我们要下载数据集:

  1. 我们只需要在唯一一个进程中开启一次下载

  2. 我们需要让其他进程等待其下载完成,再去加载数据

怎么解决这个问题呢?torch.distributed提供了一个barrier()的接口,利用它我们可以同步各个DDP中的各个进程!当使用barrier函数时,DDP进程会在函数的位置进行等待,知道所有的进程都跑到了 barrier函数的位置,它们才会再次向下执行。

只在某进程执行,无须同步:

这是最简单的,只需要一个简单的判断,用不到barrier()

if rank == 0: code_only_run_in_rank_0()

简单的同步:

没什么好讲的,只是一个示范

code_before()# 在这一步同步torch.distributed.barrier()code_after()

在某个进程中执行A操作,其他进程等待其执行完成后再执行B操作:

也简单。

if rank == 0: do_A() torch.distributed.barrier()else: do_B() torch.distributed.barrier()

在某个进程中优先执行A操作,其他进程等待其执行完成后再执行A操作:

这个值得深入讲一下,因为这个是非常普遍的需求。利用contextlib.contextmanager,我们可以把这个逻辑给优雅地包装起来!

from contextlib import contextmanager
@contextmanagerdef torch_distributed_zero_first(rank: int): """Decorator to make all processes in distributed training wait for each local_master to do something. """ if rank not in [-1, 0]: torch.distributed.barrier() # 这里的用法其实就是协程的一种哦。 yield if rank == 0: torch.distributed.barrier()

然后我们就可以这样骚操作:

with torch_distributed_zero_first(rank): if not check_if_dataset_exist(): download_dataset() load_dataset()

优雅地解决了需求!

避免DDP带来的冗余输出

问题:

当我们在自己的模型中加入DDP模型时,第一的直观感受肯定是,终端里的输出变成了N倍了。这是因为我们现在有N个进程在同时跑整个程序。这不光是对有洁癖的同学造成困扰,其实对所有人都会造成困扰。因为各个进程的速度并不一样快,在茫茫的输出海洋中,我们难以debug、把控实验状态。

解法:

那么,有什么办法能避免这个现象呢?下面,笔者给一个可行的方法:logging模块+输出信息等级控制。即用logging输出代替所有print输出,并给不同进程设置不同的输出等级,只在0号进程保留低等级输出。举一个例子:

import logging
# 给主要进程(rank=0)设置低输出等级,给其他进程设置高输出等级。logging.basicConfig(level=logging.INFO if rank in [-1, 0] else logging.WARN)# 普通log,只会打印一次。logging.info("This is an ordinary log.")# 危险的warning、error,无论在哪个进程,都会被打印出来,从而方便debug。logging.error("This is a fatal log!")

simple but powerful!


3.6 总结

既然看到了这里,不妨点个赞/喜欢吧!

不畏浮云遮望眼,只缘身在最高层。

现在你已经系统地学习了DDP多机多卡加速的原理、源码实现、实战技巧,相信,在DDP上面,已经没有什么问题能够难倒你了。请为勤学苦练的自己鼓个掌!


本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。


“他山之石”历史文章


更多他山之石专栏文章,

请点击文章底部“阅读原文”查看



分享、点赞、在看,给个三连击呗!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存