其他
【他山之石】“最全PyTorch分布式教程”来了!
“他山之石,可以攻玉”,站在巨人的肩膀才能看得更高,走得更远。在科研的道路上,更需借助东风才能更快前行。为此,我们特别搜集整理了一些实用的代码链接,数据集,软件,编程技巧等,开辟“他山之石”专栏,助你乘风破浪,一路奋勇向前,敬请关注。
地址:https://www.zhihu.com/people/liu-zhao-41-67
先验知识 使用过程框架 代码解析
01
1.DataParallel 和DistributedDataParallel(DDP)
net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
本文极力推荐DDP方法,下文也都是对DDP的说明:
>>> torch.cuda.set_device(i) # i为0 - N-1
>>> from torch.nn.parallel import DistributedDataParallel
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
2. torch.distributed
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
3. DataLoader
DataLoader(dataset, batch_size=1, shuffle=False, sampler=None,
batch_sampler=None, num_workers=0, collate_fn=None,
pin_memory=False, drop_last=False, timeout=0,
worker_init_fn=None)
dataset,即获取的原始数据集,pytorch支持两种不同类型的数据集
map-style datasets:一种映射型的数据集,使用__getitem__() 和 __len__()协议,表示一种从indices/keys(可能为非整型)到数据样本的映射
比如有这样一个数据集,当访问 dataset[idx]时,可以从磁盘上的文件夹读取到第idx个图像以及与它相关的标签。iterable-style datasets这类数据集是 IterableDataset 的子类的一个实例,使用 __iter__()协议,表示可在数据样本上迭代。这种类型的数据集特别适合于很难甚至无法进行随机读取,以及BatchSize的大小取决于获取的数据的情况。
比如调用 iter(dataset)时,可以返回从数据库、远程服务器读取的数据流,甚至实时生成的日志。
sampler,batch_sampler及shuffle
这里主要为关于map-style的介绍。
介绍这几个个参数之前,需要认识另一种类
CLASS torch.utils.data.Sampler(data_source)
同种类型的类还有torch.utils.data.SequentialSampler,torch.utils.data.RandomSampler, torch.utils.data.SubsetRandomSamplertorch.utils.data.WeightedRandomSamplertorch.utils.data.BatchSampler,torch.utils.data.distributed.DistributedSampler。
这些类的实例会作为参数传到DataLoader中。它们用来指定数据加载中使用的indices/keys的顺序,它们是数据集索引上的可迭代对象。
下面是正式的介绍
简单来说,sampler是一个取样器(容器),用来对原始的数据取样,返回原始数据的多个子集,不同的类也对应不同的取样方式。DataLoader会根据参数中的shuffle参数自动构建一个sampler类实例,再传给DataLoader。若shuffle为True,即打乱数据,则参数sampler = torch.utils.data.RandomSampler;若为False,则sampler = torch.utils.data.SequentialSampler。
在分布式训练时用到的是distributed.DistributedSampler。此种方法会根据当前分布式环境(具体说是worldsize)来将原始数据分为几个子集。
batch_sampler的作用是从sampler中进行批处理,即将sampler中的数据分批,它返回的数据为一个batch的数据。具体细节将在下一小节讨论。
distributed.DistributedSampler参数dataset –要进行取样的数据集 num_replicas (int,optional) – 参与分布式训练的进程数量. rank 默认为当前进程组的进程数。 rank (int,optional) –当前进程在num_replicas的Rank,默认 rank从当前分布式组中检索。 shuffle (bool,optional) – If True (default), sampler 会打乱indices。 seed (int,optional) – 在 shuffle=True时,用来打乱采样器的随机种子,这个数字在分布式组中的所有进程之间应该是相同的Default: 0。
batch_size、drop_last以及collate_fn
本小节与上一小节联系很大,建议联系到一起理解。
DataLoader通过参数batch_size、drop_last和batch_sampler自动将获取的单个数据样本排序成批。
如果batch_size(默认是1)的值不是None,数据加载器会生成成批的样本,每一批(batch)的样本数为batch_size的值。drop_last为True时,如果数据集size不能被batch size整除,会丢弃最后一个不完整的batch,此参数默认为False,也就是若不能整除,多出来的部分独占一个batch。若指定了 batch_size, shuffle, sampler和 drop_last中的任何一个(布尔值为True或具体指定)则batch_sampler就不能再指定了,因为会自动根据参数使用相应的类。
batch_size和drop_last参数本质上是用来从sampler中构造batch_sampler的。对于map-style的数据集,sampler可以由用户提供,也可以基于shuffle参数构造,也就是上面说的,它们是互斥的。
collate_fn在批处理和非处理是作用是不同的
若batch_size不是None,则为自动成批模式,此时使用collate_fn参数传递的函数来将一个列表中的样本排列为一个batch。(实际上,batch_sampler和sample作为取样器,返回的是根据规则排列的indices,并非真实的数据,还要使用collate_fn来排列真实数据)。collate_fn每次调用一个列表里的数据样本,它需要将输入样本整理为批,以便从data loader迭代器生成。
例如,如果每个数据样本由一个3通道图像和一个完整的类标签组成,也就是说数据集的每个元素都返回一个元组(image,class_index),默认的collate_fn会将包含这样的元组的列表整理成一个批处理过的图像tensor的单独的元组以及一个批处理过的类标签Tensor。具体来说,collate_fn有以下特点:它总是添加一个新维度作为批处理维度。 它自动将NumPy数组和Python数值转换为PyTorch张量。 它保留了数据结构,例如,如果每个样本是一个字典,它输出具有相同键集但批处理过的张量作为值的字典(如果值不能转换成张量,则值为列表)
当batch_size和batch_sampler都为None (batch_sampler的默认值已经为None)时,为非自动成批模式。此时使用作为collate_fn参数传递的函数来处理从数据集获得的每个示例。这时,这个函数只是将Numpy数组转换维PyTorch的Tensor,其他保持不变。
其他参数
num_workers 用来进行多进程加载数据,注意这里的多进程只是加载数据时的多进程,不同于多进程训练。在此模式下,每当创建一个DataLoader的迭代器时(例如,当调用enumerate(dataLoader)时),会创建 num_workers个工作进程。此时,dataset,collate_fn和worker_init_fn被传你递给每个worker,它们被用于初始化和获取数据。这意味着数据集访问和它的内部IO,以及转换(包括collate_fn)都在工作进程中运行。
也就是说只有对DataLoader迭代时才会得到真实的数据。
pin_memory 为True 会自动将获取的数据张量放到固定的内存中,从而使数据更快地传输到支持cuda的gpu。
02
初始化torch.distributed,这是DDP的依赖项。 加载模型,如model = model() 指定本进程对应的GPU:torch.cuda.set_device(i) i 是当前进程对应的GPU号,以保证当前程在单独的GPU上运行 将模型放到当前设备:model.to(device) 模型并行化:DistributedDataParallel(model,device_id=[i])。 数据处理,首先获取原始数据。 根据分布式情况以及原始数据指定Sampler,作为DataLoader的参数输入。(将原始数据分为两个子集,每个子集有4000个副本) 使用DataLoader包装原始数据,由于传入了Sampler,会使用batch_sampler 在sampler中再进行分批。由于使用了分布式,在此步之前将batch_size除以设备数,得到新的batch_size(8),作为每个GPU的batch_size。因此batch_sampler会根据batch_size和sampler产生 个batch。 在epoch中进行训练。注意,在每个epoch的开端调用sampler.set_epoch(n) n为epoch数。 保存模型
03
import torch.multiprocessing as mp
opt.world_size = opt.ngpus_per_node * opt.world_size
mp.spawn(main_worker, nprocs=opt.ngpus_per_node, args=(opt,))
def main_worker(index, opt):
random.seed(opt.manual_seed)
np.random.seed(opt.manual_seed)
torch.manual_seed(opt.manual_seed)
if index >= 0 and opt.device.type == 'cuda':
opt.device = torch.device(f'cuda:{index}')
opt.dist_rank = opt.dist_rank * opt.ngpus_per_node + index
dist.init_process_group(backend='nccl',
init_method=opt.dist_url,
world_size=opt.world_size,
rank=opt.dist_rank)
opt.batch_size = int(opt.batch_size / opt.ngpus_per_node)
opt.n_threads = int((opt.n_threads + opt.ngpus_per_node - 1) / opt.ngpus_per_node)
opt.is_master_node = not opt.distributed or opt.dist_rank == 0
model = generate_model(opt)
if opt.batchnorm_sync:
assert opt.distributed, 'SyncBatchNorm only supports DistributedDataParallel.'
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
model = make_data_parallel(model, opt.distributed, opt.device)
parameters = model.parameters()
criterion = CrossEntropyLoss().to(opt.device)
(train_loader, train_sampler, train_logger, train_batch_logger,
optimizer, scheduler) = get_train_utils(opt, parameters)
for i in range(opt.begin_epoch, opt.n_epochs + 1):
if not opt.no_train:
if opt.distributed:
train_sampler.set_epoch(i)
current_lr = get_lr(optimizer)
train_epoch(i, train_loader, model, criterion, optimizer,
opt.device, current_lr, train_logger,
train_batch_logger, tb_writer, opt.distributed)
if i % opt.checkpoint == 0 and opt.is_master_node:
save_file_path = opt.result_path / 'save_{}.pth'.format(i)
save_checkpoint(save_file_path, i, opt.arch, model, optimizer,
scheduler)
scheduler.step()
1. 初始化torch.distributed
def main_worker(index, opt):
random.seed(opt.manual_seed)
np.random.seed(opt.manual_seed)
torch.manual_seed(opt.manual_seed)
if index >= 0 and opt.device.type == 'cuda':
opt.device = torch.device(f'cuda:{index}')
opt.dist_rank = opt.dist_rank * opt.ngpus_per_node + index
dist.init_process_group(backend='nccl',
init_method=opt.dist_url,
world_size=opt.world_size,
rank=opt.dist_rank)
opt.batch_size = int(opt.batch_size / opt.ngpus_per_node)
opt.n_threads = int((opt.n_threads + opt.ngpus_per_node - 1) / opt.ngpus_per_node)
opt.is_master_node = opt.dist_rank == 0
2. 加载模型
model = generate_model(opt)
此部分没什么好说的,从其他函数或类中获取模型。
3. 指定本进程对应的GPU
4. 将模型放到当前设备
5. 模型并行化
model = make_data_parallel(model, opt.device)
def make_data_parallel(model, device):
if device.type == 'cuda' and device.index is not None:
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
model.to(device)
model = nn.parallel.DistributedDataParallel(model,device_ids=[device])
6. 数据处理,获取原始数据
train_data = get_training_data(**kwargs)
7. 根据分布式情况以及原始数据指定Sampler,作为DataLoader的参数输入
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_data)
8. 使用DataLoader包装原始数据
train_loader = torch.utils.data.DataLoader(train_data,
batch_size=opt.batch_size,
shuffle=(train_sampler is None),
num_workers=opt.n_threads,
pin_memory=True,
sampler=train_sampler,
worker_init_fn=worker_init_fn)
9. 在epoch中进行训练
for i in range(opt.begin_epoch, opt.n_epochs + 1):
train_sampler.set_epoch(i)
current_lr = get_lr(optimizer)
train_epoch(i, train_loader, model, criterion, optimizer,
opt.device, current_lr, train_logger,
train_batch_logger, tb_writer, opt.distributed)
以上即为本教程的全部内容,虽然没有涵盖训练的每个细节,但是你可以学会在你的代码中适当的位置添加某些内容,从而实现分布式训练。
本教程仅为本人观点,如果有错误之处,欢迎评论!
本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。
直播预告
“他山之石”历史文章
c++接口libtorch介绍& vscode+cmake实践
python从零开始构建知识图谱
一文读懂 PyTorch 模型保存与载入
适合PyTorch小白的官网教程:Learning PyTorch With Examples
pytorch量化备忘录
LSTM模型结构的可视化
PointNet论文复现及代码详解
SCI写作常用句型之研究结果&发现
白话生成对抗网络GAN及代码实现
pytorch的余弦退火学习率
Pytorch转ONNX-实战篇(tracing机制)
联邦学习:FedAvg 的 Pytorch 实现
PyTorch实现ShuffleNet-v2亲身实践
训练时显存优化技术——OP合并与gradient checkpoint
浅谈数据标准化与Pytorch中NLLLoss和CrossEntropyLoss损失函数的区别
更多他山之石专栏文章,
请点击文章底部“阅读原文”查看
分享、点赞、在看,给个三连击呗!