【他山之石】Pytorch/Tensorflow-gpu训练并行加速trick(含代码)
“他山之石,可以攻玉”,站在巨人的肩膀才能看得更高,走得更远。在科研的道路上,更需借助东风才能更快前行。为此,我们特别搜集整理了一些实用的代码链接,数据集,软件,编程技巧等,开辟“他山之石”专栏,助你乘风破浪,一路奋勇向前,敬请关注。
地址:https://www.zhihu.com/people/waitsop
01
02
TF三种读取数据方式
tf.data API
几个常用函数:
batch():参数为batch size。 repeat():参数同样是一个整型数字,描述了整个dataset需要重复几次(epoch),如果没有参数,则重复无限次。 shuffle():打乱数据,这里的参数是buffer_size,顾名思义,dataset会取所有数据的前buffer_size数据项,填充buffer, 然后,从buffer中随机选择一条数据输出,然后把空出来的位置从dataset中顺序选择最新的一条数据填充到buffer中。 map(map_func,num_parallel_calls):常常用作预处理,图像解码等操作,第一个参数是一个函数句柄,dataset的每一个元素都会经过这个函数的到新的tensor代替原来的元素。第二个参数num_parallel_calls控制在CPU上并行和处理数据,将不同的预处理任务分配到不同的cpu上,实现并行加速的效果。num_parallel_calls一般设置为cpu内核数量,如果设置的太大反而会降低速度。源码(https://github.com/tensorflow/tensorflow/blob/b3f630cab8911161e71cf5e0f1add8a8beb9ca49/tensorflow/core/kernels/parallel_map_dataset_op.cc#L136)
TRICK
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func=parse_fn, batch_size=batch_size))Parallelize Data Extraction
本地和远程存储的区别:
Time-to-first-byte: remote storage的延时较长。 Read throughput: 虽然remote storage具有较大的带宽,但是读取一个文件时只能使用很小的一部分带宽。
#dataset = files.interleave(tf.data.TFRecordDataset)
dataset = files.apply(tf.contrib.data.parallel_interleave(tf.data.TFRecordDataset, cycle_length=FLAGS.num_parallel_readers))
下图说明了为 parallel_interleave 转换提供 cycle_length=2 的效果
pipeline顺序
将 prefetch(n)(其中 n 是单步训练使用的元素数/批次数)添加到输入pipeline的末尾,以便将在 CPU 上执行的转换与在GPU上执行的训练并行。 通过设置 num_parallel_calls 参数并行处理 map 转换。将其值设为可用 CPU 核心的数量。 使用的batch较大时,使用 map_and_batch 混合转换。 如果要处理远程存储的数据并/或需要反序列化,使用 parallel_interleave 转换来并行从不同文件读取(和反序列化)数据的操作。 向量化传递给 map 转换的低开销用户定义函数,以分摊与调度和执行相应函数相关的开销。 如果内存可以容纳数据,可以使用 cache 转换在第一个周期中将数据缓存在内存中,以便后续周期可以避免与读取、解析和转换该数据相关的开销。 如果预处理操作会增加数据大小,建议先进行 interleave、prefetch 和 shuffle 操作以减少内存使用量。 在repeat 前先进行 shuffle 转换,最好用 shuffle_and_repeat 混合转换。
SBR:把所有数据先打乱,然后按照batch size打包成batch输出,整体数据重复N遍(N个epoch) SRB:把所有数据先打乱,再把所有数据重复N遍,然后将重复N遍的所有数据放在一起,最后按照batch size打包成batch输出。(不常用,可能出现一个batch内有重复数据) BRS:把所有数据按照batch size先打包成batch,然后把打包成batch的数据重复N遍,最后再将所有batch打乱进行输出(打乱的是batch,某些batch的尺寸小于等于batch_size,因为是对batch进行打乱,所以这些batch不一定是最后一个)
运行pipeline
tf.data.Dataset.make_one_shot_iterator() :不需要用户显示地初始化,但是仅仅能迭代(遍历)一次数据集。 tf.data.Dataset.make_initializable_iterator() : 需要用户显示地初始化,并且在初始化时可以送入之间定义的placeholder,达到更加灵活的使用。 get_next() :迭代器,获取数据tensors(构建数据集所用的from_tensors_slice的参数形式)。
实验
class Dataset(object):
def __init__(self, dataset_type):
self.annot_path = './DATA/train.txt' if dataset_type == 'train' else './DATA/test.txt'
self.batch_size = 8 if dataset_type == 'train' else 1 # 2
self.data_aug = True if dataset_type == 'train' else False
self.train_input_size = 64
self.annotations = self.load_annotations(dataset_type)
self.img_num = len(self.annotations)
self.step_per_epoch = int(np.ceil(self.img_num / self.batch_size))
self.step_index_epoch = 0
def load_annotations(self, dataset_type):
with open(self.annot_path, 'r') as f:
txt = f.readlines()
annotations = [line.strip() for line in txt if len(line.strip().split()[1:]) != 0]
np.random.shuffle(annotations)
return annotations
def __iter__(self):
return self
def __next__(self):
with tf.device('/cpu:0'):
batch_image = np.zeros((self.batch_size, self.train_input_size, self.train_input_size, 3))
batch_label = np.zeros((self.batch_size, 1))
i = 0
if self.step_index_epoch < self.step_per_epoch: # in one epoch
while i < self.batch_size: # in one batch
# try:
img_index = self.step_index_epoch * self.batch_size + i
if img_index >= self.img_num: # last batch cannot be rounded up
img_index -= self.img_num
annotation = self.annotations[img_index] # txt ===> ano
image, label = self.parse_annotation(annotation)
batch_image[i, :, :, :] = image # (h,w,c)
batch_label[i,:] = label
i += 1
self.step_index_epoch += 1
return batch_image, batch_label
else: # next epoch
self.step_index_epoch = 0
np.random.shuffle(self.annotations)
raise StopIteration
def parse_annotation(self, annotation): # get image and label from txtfile
line = annotation.split()
image_path = line[0]
label = line[1]
if not os.path.exists(image_path):
print(("%s does not exist ... " % image_path))
image = np.array(cv2.imread(image_path))
image = cv2.resize(image,(self.train_input_size, self.train_input_size))
return image, label
def __len__(self):
return self.step_per_epoch
for epoch in range(1, self.epoch):
train_epoch_loss, test_epoch_loss = [], []
for train_data in Dataset('train'):
_, train_summary, train_step_loss, train_predict, _, train_step = self.sess.run(
[self.train_op, self.merge_train_op, self.loss, self.predict, self.acc, self.step],
feed_dict = {self.inputs: train_data[0],
self.labels: train_data[1],
self.training:True })
train_epoch_loss.append(train_step_loss)
for test_data in Dataset('test'):
test_step_loss, test_predict = self.sess.run(
[self.loss, self.predict],
feed_dict = {self.inputs: test_data[0],
self.labels: test_data[1],
self.training:False })
test_epoch_loss.append(test_step_loss)
train_epoch_loss, test_epoch_loss = np.mean(train_epoch_loss), np.mean(test_epoch_loss)
结果(单位默认为秒):
def Dataset(file_list, batch_size):
def Dataset(file_list, batch_size):
image_paths, labels, img_num = gen_data(file_list)
dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
# dataset = tf.data.TFRecordDataset(tf_records_filename)
dataset = dataset.map(_parse_data, num_parallel_calls=6)
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size)
dataset = dataset.repeat(1)
dataset = dataset.prefetch(2)
iterator = dataset.make_one_shot_iterator()
# iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
return iterator, next_element, img_num
def _parse_data(image_path, label):
image_size = 64
image_channel = 3
file_contents = tf.read_file(image_path)
image = tf.image.decode_jpeg(file_contents, image_channel)
image = tf.image.resize_images(image, (image_size, image_size), method=0)
# image = tf.image.random_brightness(image, max_delta=60)
# image = tf.image.random_contrast(image, lower=0.2, upper=1.8)
image = tf.cast(image, tf.float32)
image = image / 256.0
label = tf.expand_dims(label,axis=0)
return (image, label)
def gen_data(file_list):
with open(file_list, 'r') as f:
lines = f.readlines()
image_paths, labels = [], []
for line in lines:
line = line.strip().split()
image_path = line[0]
label = line[1]
image_paths.append(image_path)
labels.append(label)
image_paths = np.asarray(image_paths, dtype=np.str)
labels = np.asarray(labels, dtype=np.float32)
return image_paths, labels, len(lines)
train_iterator, train_next_element, len_train = Dataset('./DATA/train.txt', batch_size=self.batch_size_train)
test_iterator, test_next_element, len_test = Dataset('./DATA/test.txt', batch_size=self.batch_size_test)
image_batch_train_np, label_batch_train_np = train_next_element
image_batch_test_np, label_batch_test_np = test_next_element
# self.sess.run(test_iterator.initializer) 该阶段只需要初始化一次
for epoch in range(1, self.epoch):
train_epoch_loss, test_epoch_loss = [], []
# self.sess.run(train_iterator.initializer)#每个epoch初始化一次
for i in range(int(np.ceil(len_train/self.batch_size_train))):
image_batch, label_batch = self.sess.run([image_batch_train_np, label_batch_train_np])
_, train_summary, train_step_loss, train_predict, _, train_step = self.sess.run(
[self.train_op, self.merge_train_op, self.loss, self.predict, self.acc, self.step],
feed_dict = {self.inputs: image_batch,
self.labels: label_batch,
self.training:True })
train_epoch_loss.append(train_step_loss)
for i in range(int(np.ceil(len_test / self.batch_size_test))):
image_batch, label_batch = self.sess.run([image_batch_test_np, label_batch_test_np])
test_step_loss, test_predict = self.sess.run(
[self.loss, self.predict],
feed_dict = {self.inputs: image_batch,
self.labels: label_batch,
self.training:False })
test_epoch_loss.append(test_step_loss)
num_parallel_calls 不设置,prefetch(2)训练一轮时间12.15s,gpu利用率稳定96%。 num_parallel_calls 不设置,prefetch(1)训练一轮时间11.47s,gpu利用率稳定96%。 num_parallel_calls =6, prefetch(1)训练一轮时间11.40s。gpu利用率稳定96%。 num_parallel_calls =6, prefetch(4)训练一轮时间11.44s。gpu利用率稳定96%。
for i in range(10):
image = tf.image.flip_up_down(image)
prefetch 不设置: train_cost:3.61072 GPU利用率 2-74% prefetch = 2: train_cost:3.39067 GPU利用率 2-61% prefetch = 3: train_cost:3.44768 GPU利用率 2-67% prefetch = 4: train_cost:3.36775 GPU利用率 2-61%
prefetch 不设置: train_cost:2.61407 GPU利用率 2-78% prefetch = 2: train_cost:2.53957 GPU利用率 2-75% prefetch = 3: train_cost:2.54157 GPU利用率 2-74% prefetch = 4: train_cost:2.54957 GPU利用率 2-78% prefetch = 6: train_cost:2.51907 GPU利用率 2-71% prefetch = 10: train_cost:2.47456 GPU利用率 2-71% prefetch = 40: train_cost:2.24550 GPU利用率 1-74% prefetch = 80: train_cost:1.85542 GPU利用率 36-64% prefetch = 120: train_cost:1.61136 GPU利用率 56-61% prefetch = 240: train_cost:1.57235 GPU利用率 58-62% prefetch = 360: train_cost:1.58837 GPU利用率 59-62% prefetch = 480: train_cost:1.69238 GPU利用率 60-63%
Estimator
优势
可在本地主机上或分布式多服务器环境中运行基于 Estimator 的模型,而无需更改模型。此外,在 CPU、GPU 或 TPU 上运行基于 Estimator 的模型,也无需重新编码模型。 Estimator 简化了在模型开发者之间共享实现的过程。 模型创建简单 Estimator 本身在tf.layers之上构建而成,可以简化自定义过程。 Estimator 会构建图,会创建并管理Graph和Session对象。 Estimator 提供安全的分布式训练循环,可以控制: 构建图 初始化变量 开始排队 处理异常 创建检查点文件并从故障中恢复 保存 TensorBoard 的摘要 使用 Estimator 编写应用时,必须将数据输入pipeline从模型中分离出来。这种分离简化了不同数据集的实验流程。
步骤
一个字典,其中键是特征名称,值是包含相应特征数据的张量(或 SparseTensor) 一个包含一个或多个标签的张量
def input_fn(features, labels, training=True, batch_size=256):
"""An input function for training or evaluating"""
# 将输入转换为数据集。
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# 如果在训练模式下并重复数据。
if training:
dataset = dataset.shuffle(1000).repeat()
return dataset.batch(batch_size)
# Define three numeric feature columns.
population = tf.feature_column.numeric_column('population')
crime_rate = tf.feature_column.numeric_column('crime_rate')
median_education = tf.feature_column.numeric_column('median_education',
normalizer_fn=lambda x: x - global_education_mean)
# Instantiate an estimator, passing the feature columns.
estimator = tf.estimator.LinearClassifier(
feature_columns=[population, crime_rate, median_education],
)
train_spec = tf.estimator.TrainSpec(
input_fn=lambda: input_fn('data/kaggle_movie_reviews/train.tsv',
mode=tf.estimator.ModeKeys.TRAIN,
num_epochs=config.epochs,
batch_size=config.batch_size),
max_steps=TOTAL_STEPS,
hooks=None
)
eval_spec = tf.estimator.EvalSpec(
input_fn=lambda: input_fn('data/kaggle_movie_reviews/train.tsv',
mode=tf.estimator.ModeKeys.EVAL,
batch_size=config.batch_size),
exporters=[tf.estimator.LatestExporter(name="predict",
serving_input_receiver_fn=serving_input_fn,
exports_to_keep=1,
as_text=True)],
steps=TEST_STEPS,
throttle_secs=EVAL_AFTER_SEC
)
if not RESUME_TRAINING:
print("Removing previous artifacts...")
shutil.rmtree(model_dir, ignore_errors=True)
else:
print("Resuming training...")
tf.logging.set_verbosity(tf.logging.INFO)
time_start = datetime.utcnow()
print("Experiment started at {}".format(time_start.strftime("%H:%M:%S")))
estimator = create_estimator(run_config, config)
tf.estimator.train_and_evaluate(estimator=estimator,
train_spec=train_spec,
eval_spec=eval_spec)
03
Dataloader
dataloader = DataLoader(
xxdataset,
batch_size=args.train_batchsize,
shuffle=True,
num_workers=args.workers,
drop_last=False,
pin_memory=False)
num_wokers = 0 ,训练10个step耗时:17.54s num_wokers = 1 ,训练10个step耗时:12.47s num_wokers = 2 ,训练10个step耗时:7.39s num_wokers = 4 ,训练10个step耗时:5.22s num_wokers = 6 ,训练10个step耗时:3.91s num_wokers = 8 ,训练10个step耗时:4.58s
torch.multiprocessing
# Training a model using multiple processes:
import torch.multiprocessing as mp
def train(model):
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() # This will update the shared parameters
model = nn.Sequential(nn.Linear(n_in, n_h1),
nn.ReLU(),
nn.Linear(n_h1, n_out))
model.share_memory() # Required for 'fork' method to work
processes = []
for i in range(4): # No. of processes
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()
梯度累加实现超出显存的batch size
# clear last step
optimizer.zero_grad()
# 16 accumulated gradient steps
scaled_loss = 0
for accumulated_step_i in range(16):
out = model.forward()
loss = some_loss(out,y)
loss.backward()
scaled_loss += loss.item()
# update weights after 8 steps. effective batch = 8*16
optimizer.step()
# loss is now scaled up by the number of accumulated batches
Lightning
上述代码在lightning中,全部都给你做好了,只需要设置accumulate_grad_batches=16:
trainer = Trainer(accumulate_grad_batches=16)
trainer.fit(model)
from pytorch_lightning import Trainer
model = LightningModule(…)
trainer = Trainer()
trainer.fit(model)
losses.append(loss.item())
16-bit 精度
# enable 16-bit on the model and the optimizer
model, optimizers = amp.initialize(model, optimizers, opt_level= O2 )
# when doing .backward, let amp do it so it can scale the loss
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
trainer = Trainer(amp_level= O2 , use_amp=False)
trainer.fit(model)
https://zhuanlan.zhihu.com/p/53345706
https://blog.csdn.net/wangdongwei0/article/details/82991048
https://www.cnblogs.com/huangyc/p/10340766.html
https://blog.csdn.net/qq_34914551/article/details/96834647
https://blog.csdn.net/u014061630/article/details/80728694
https://blog.csdn.net/qq_32998593/article/details/92849585/
https://zhuanlan.zhihu.com/p/78349516
https://mp.weixin.qq.com/s/FDHb8alivbO-Dr_bbt876g
https://zhuanlan.zhihu.com/p/129018863
https://github.com/jxz542189/TCN_classification/blob/master/tcn_main.py
本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。
“他山之石”历史文章
从NumPy开始实现一个支持Auto-grad的CNN框架
pytorch_lightning 全程笔记
深度学习中的那些Trade-off
PyTorch 手把手搭建神经网络 (MNIST)
autograd源码剖析
怎样才能让你的模型更加高效运行?
来自日本程序员的纯C++深度学习库tiny-dnn
MMTracking: OpenMMLab 一体化视频目标感知平台
深度学习和机器视觉top组都在研究什么
pytorch常见的坑汇总
pytorch 中张量基本操作
pytorch计算模型FLOPs和Params
保姆级教程:个人深度学习工作站配置指南
整理 Deep Learning 调参 tricks
更多他山之石专栏文章,
请点击文章底部“阅读原文”查看
分享、点赞、在看,给个三连击呗!