查看原文
其他

教程 | 如何用百度深度学习框架PaddlePaddle做数据预处理

2017-11-25 机器之心

机器之心经授权转载

作者:胡晓曼


本文主要介绍了百度的深度学习开源框架PaddlePaddle的数据预处理过程,创建一个reader读取数据,一行代码搞定数据的输入、混洗和批量读取。本文作者胡晓曼是一名高级算法工程师,热衷写通俗易懂的深度学习入门文章。


PaddlePaddle 的基本数据格式


根据官网的资料,总结出 PaddlePaddle 支持多种不同的数据格式,包括四种数据类型和三种序列格式:


四种数据类型:


  • dense_vector:稠密的浮点数向量。

  • sparse_binary_vector:稀疏的二值向量,即大部分值为 0,但有值的地方必须为 1。

  • sparse_float_vector:稀疏的向量,即大部分值为 0,但有值的部分可以是任何浮点数。

  • integer:整型格式


api 如下:


  • paddle.v2.data_type.dense_vector(dim, seq_type=0)

  • 说明:稠密向量,输入特征是一个稠密的浮点向量。举个例子,手写数字识别里的输入图片是 28*28 的像素,Paddle 的神经网络的输入应该是一个 784 维的稠密向量。

  • 参数:

  • dim(int) 向量维度

  • seq_type(int) 输入的序列格式

  • 返回类型:InputType

  • paddle.v2.data_type.sparse_binary_vector(dim, seq_type=0)

  • 说明:稀疏的二值向量。输入特征是一个稀疏向量,这个向量的每个元素要么是 0, 要么是 1

  • 参数:同上

  • 返回类型:同上

  • paddle.v2.data_type.sparse_vector(dim, seq_type=0)

  • 说明:稀疏向量,向量里大多数元素是 0,其他的值可以是任意的浮点值

  • 参数:同上

  • 返回类型:同上

  • paddle.v2.data_type.integer_value(value_range, seq_type=0)

  • 说明:整型格式

  • 参数:  

  • seq_type(int):输入的序列格式

  • value_range(int):每个元素的范围

  • 返回类型:InputType


三种序列格式:


  • SequenceType.NO_SEQUENCE:不是一条序列

  • SequenceType.SEQUENCE:是一条时间序列

  • SequenceType.SUB_SEQUENCE: 是一条时间序列,且序列的每一个元素还是一个时间序列。


api 如下:


  • paddle.v2.data_type.dense_vector_sequence(dim, seq_type=0)

  • 说明:稠密向量的序列格式

  • 参数:dim(int):稠密向量的维度

  • 返回类型:InputType

  • paddle.v2.data_type.sparse_binary_vector_sequence(dim, seq_type=0)

  • 说明:稀疏的二值向量序列。每个序列里的元素要么是 0 要么是 1

  • 参数:dim(int):稀疏向量的维度

  • 返回类型:InputType

  • paddle.v2.data_type.sparse_non_value_slot(dim, seq_type=0)

  • 说明:稀疏的向量序列。每个序列里的元素要么是 0 要么是 1

  • 参数:

  • dim(int):稀疏向量的维度

  • seq_type(int):输入的序列格式

  • 返回类型:InputType

  • paddle.v2.data_type.sparse_value_slot(dim, seq_type=0)

  • 说明:稀疏的向量序列,向量里大多数元素是 0,其他的值可以是任意的浮点值

  • 参数:

  • dim(int):稀疏向量的维度

  • seq_type(int):输入的序列格式

  • 返回类型:InputType

  • paddle.v2.data_type.integer_value_sequence(value_range, seq_type=0)

  • 说明:value_range(int):每个元素的范围


不同的数据类型和序列模式返回的格式不同,如下表:


其中 f 表示浮点数,i 表示整数


注意:对 sparse_binary_vector 和 sparse_float_vector,PaddlePaddle 存的是有值位置的索引。例如,


  • 对一个 5 维非序列的稀疏 01 向量 [0, 1, 1, 0, 0],类型是 sparse_binary_vector,返回的是 [1, 2]。(因为只有第 1 位和第 2 位有值)

  • 对一个 5 维非序列的稀疏浮点向量 [0, 0.5, 0.7, 0, 0],类型是 sparse_float_vector,返回的是 [(1, 0.5), (2, 0.7)]。(因为只有第一位和第二位有值,分别是 0.5 和 0.7)


PaddlePaddle 的数据读取方式


我们了解了上文的四种基本数据格式和三种序列模式后,在处理自己的数据时可以根据需求选择,但是处理完数据后如何把数据放到模型里去训练呢?我们知道,基本的方法一般有两种:


  • 一次性加载到内存:模型训练时直接从内存中取数据,不需要大量的 IO 消耗,速度快,适合少量数据。

  • 加载到磁盘/HDFS/共享存储等:这样不用占用内存空间,在处理大量数据时一般采取这种方式,但是缺点是每次数据加载进来也是一次 IO 的开销,非常影响速度。


在 PaddlePaddle 中我们可以有三种模式来读取数据:分别是 reader、reader creator 和 reader decorator, 这三者有什么区别呢?


  • reader:从本地、网络、分布式文件系统 HDFS 等读取数据,也可随机生成数据,并返回一个或多个数据项。

  • reader creator:一个返回 reader 的函数。

  • reader decorator:装饰器,可组合一个或多个 reader。


Reader


我们先以 reader 为例,为房价数据(斯坦福吴恩达的公开课第一课举例的数据)创建一个 reader:


1. 创建一个 reader,实质上是一个迭代器,每次返回一条数据(此处以房价数据为例)


  1. reader = paddle.dataset.uci_housing.train()


2. 创建一个 shuffle_reader,把上一步的 reader 放进去,配置 buf_size 就可以读取 buf_size 大小的数据自动做 shuffle,让数据打乱,随机化


  1. shuffle_reader = paddle.reader.shuffle(reader,buf_size= 100)


3. 创建一个 batch_reader,把上一步混洗好的 shuffle_reader 放进去,给定 batch_size,即可创建。


  1. batch_reader = paddle.batch(shuffle_reader,batch_size = 2)


这三种方式也可以组合起来放一块:


  1. reader = paddle.batch(

  2.    paddle.reader.shuffle(

  3.        uci_housing.train(),

  4.    buf_size = 100),

  5.    batch_size=2)    


可以以一个直观的图来表示:



从图中可以看到,我们可以直接从原始数据集里拿去数据,用 reader 读取,一条条灌倒 shuffle_reader 里,在本地随机化,把数据打乱,做 shuffle,然后把 shuffle 后的数据,一个 batch 一个 batch 的形式,批量的放到训练器里去进行每一步的迭代和训练。流程简单,而且只需要使用一行代码即可实现整个过程。


Reader creator


如果想要生成一个简单的随机数据,以 reader creator 为例:


  1. def reader_creator():

  2.    def reader():

  3.        while True:

  4.            yield numpy.random.uniform(-1,1,size=784)

  5.    return reader


源码见 creator.py, 支持四种格式:np_array,text_file,RecordIO 和 cloud_reader


Code:https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/reader/creator.py

 

Reader decorator


如果想要读取同时读取两部分的数据,那么可以定义两个 reader,合并后对其进行 shuffle。如我想读取所有用户对比车系的数据和浏览车系的数据,可以定义两个 reader,分别为 contrast() 和 view(),然后通过预定义的 reader decorator 缓存并组合这些数据,在对合并后的数据进行乱序操作。源码见 decorator.py


  1. data = paddle.reader.shuffle(

  2.        paddle.reader.compose(

  3.            paddle.reader(contradt(contrast_path),buf_size = 100),

  4.            paddle.reader(view(view_path),buf_size = 200),

  5.            500)


这样有一个很大的好处,就是组合特征来训练变得更容易了!传统的跑模型的方法是,确定 label 和 feature,尽可能多的找合适的 feature 扔到模型里去训练,这样我们就需要做一张大表,训练完后我们可以分析某些特征的重要性然后重新增加或减少一些 feature 来进行训练,这样我们有需要对原来的 label-feature 表进行修改,如果数据量小没啥影响,就是麻烦点,但是数据量大的话需要每一次增加 feature,和主键、label 来 join 的操作都会很耗时,如果采取这种方式的话,我们可以对某些同一类的特征做成一张表,数据存放的地址存为一个变量名,每次跑模型的时候想选取几类特征,就创建几个 reader,用 reader decorator 组合起来,最后再 shuffle 灌倒模型里去训练。这!样!是!不!是!很!方!便!


如果没理解,我举一个实例,假设我们要预测用户是否会买车,label 是买车 or 不买车,feature 有浏览车系、对比车系、关注车系的功能偏好等等 20 个,传统的思维是做成这样一张表:



如果想要减少 feature_2, 看看 feature_2 对模型的准确率影响是否很大,那么我们需要在这张表里去掉这一列,想要增加一个 feature 的话,也需要在 feature 里增加一列,如果用 reador decorator 的话,我们可以这样做数据集:



把相同类型的 feature 放在一起,不用频繁的 join 减少时间,一共做四个表,创建 4 个 reador:


  1. data = paddle.reader.shuffle(

  2.            paddle.reader.compose(

  3.                paddle.reader(table1(table1_path),buf_size = 100),

  4.                paddle.reader(table2(table2_path),buf_size = 100),

  5.                paddle.reader(table3(table3_path),buf_size = 100),

  6.                paddle.reader(table4(table4_path),buf_size = 100),

  7.            500)


如果新发现了一个特征,想尝试这个特征对模型提高准确率有没有用,可以再单独把这个特征数据提取出来,再增加一个 reader,用 reader decorator 组合起来,shuffle 后放入模型里跑就行了。


PaddlePaddle 的数据预处理实例


还是以手写数字为例,对数据进行处理后并划分 train 和 test,只需要 4 步即可:


1. 指定数据地址


  1. import paddle.v2.dataset.common

  2. import subprocess

  3. import numpy

  4. import platform

  5. __all__ = ['train', 'test', 'convert']

  6. URL_PREFIX = 'http://yann.lecun.com/exdb/mnist/'

  7. TEST_IMAGE_URL = URL_PREFIX + 't10k-images-idx3-ubyte.gz'

  8. TEST_IMAGE_MD5 = '9fb629c4189551a2d022fa330f9573f3'

  9. TEST_LABEL_URL = URL_PREFIX + 't10k-labels-idx1-ubyte.gz'

  10. TEST_LABEL_MD5 = 'ec29112dd5afa0611ce80d1b7f02629c'

  11. TRAIN_IMAGE_URL = URL_PREFIX + 'train-images-idx3-ubyte.gz'

  12. TRAIN_IMAGE_MD5 = 'f68b3c2dcbeaaa9fbdd348bbdeb94873'

  13. TRAIN_LABEL_URL = URL_PREFIX + 'train-labels-idx1-ubyte.gz'

  14. TRAIN_LABEL_MD5 = 'd53e105ee54ea40749a09fcbcd1e9432'


2. 创建 reader creator


  1. def reader_creator(image_filename, label_filename, buffer_size):

  2.     # 创建一个reader

  3.     def reader():

  4.         if platform.system() == 'Darwin':

  5.             zcat_cmd = 'gzcat'

  6.         elif platform.system() == 'Linux':

  7.             zcat_cmd = 'zcat'

  8.         else:

  9.             raise NotImplementedError()

  10.         m = subprocess.Popen([zcat_cmd, image_filename], stdout=subprocess.PIPE)

  11.         m.stdout.read(16)  

  12.         l = subprocess.Popen([zcat_cmd, label_filename], stdout=subprocess.PIPE)

  13.         l.stdout.read(8)  

  14.         try:  # reader could be break.

  15.             while True:

  16.                 labels = numpy.fromfile(

  17.                     l.stdout, 'ubyte', count=buffer_size).astype("int")

  18.                 if labels.size != buffer_size:

  19.                     break  # numpy.fromfile returns empty slice after EOF.

  20.                 images = numpy.fromfile(

  21.                     m.stdout, 'ubyte', count=buffer_size * 28 * 28).reshape(

  22.                         (buffer_size, 28 * 28)).astype('float32')

  23.                 images = images / 255.0 * 2.0 - 1.0

  24.                 for i in xrange(buffer_size):

  25.                     yield images[i, :], int(labels[i])

  26.         finally:

  27.             m.terminate()

  28.             l.terminate()

  29.     return reader


3. 创建训练集和测试集


  1. def train():

  2.     """

  3.     创建mnsit的训练集 reader creator

  4.     返回一个reador creator,每个reader里的样本都是图片的像素值,在区间[0,1]内,label为0~9

  5.     返回:training reader creator

  6.     """

  7.     return reader_creator(

  8.         paddle.v2.dataset.common.download(TRAIN_IMAGE_URL, 'mnist',

  9.                                           TRAIN_IMAGE_MD5),

  10.         paddle.v2.dataset.common.download(TRAIN_LABEL_URL, 'mnist',

  11.                                           TRAIN_LABEL_MD5), 100)

  12. def test():

  13.     """

  14.     创建mnsit的测试集 reader creator

  15.     返回一个reador creator,每个reader里的样本都是图片的像素值,在区间[0,1]内,label为0~9

  16.     返回:testreader creator

  17.     """

  18.     return reader_creator(

  19.         paddle.v2.dataset.common.download(TEST_IMAGE_URL, 'mnist',

  20.                                           TEST_IMAGE_MD5),

  21.         paddle.v2.dataset.common.download(TEST_LABEL_URL, 'mnist',

  22.                                           TEST_LABEL_MD5), 100)


4. 下载数据并转换成相应格式


  1. def fetch():

  2.     paddle.v2.dataset.common.download(TRAIN_IMAGE_URL, 'mnist', TRAIN_IMAGE_MD5)

  3.     paddle.v2.dataset.common.download(TRAIN_LABEL_URL, 'mnist', TRAIN_LABEL_MD5)

  4.     paddle.v2.dataset.common.download(TEST_IMAGE_URL, 'mnist', TEST_IMAGE_MD5)

  5.     paddle.v2.dataset.common.download(TEST_LABEL_URL, 'mnist', TRAIN_LABEL_MD5)

  6. def convert(path):

  7.     """

  8.     将数据格式转换为 recordio format

  9.     """

  10.     paddle.v2.dataset.common.convert(path, train(), 1000, "minist_train")

  11.     paddle.v2.dataset.common.convert(path, test(), 1000, "minist_test")


如果想换成自己的训练数据,只需要按照步骤改成自己的数据地址,创建相应的 reader creator(或者 reader decorator)即可。


这是图像的例子,如果我们想训练一个文本模型,做一个情感分析,这个时候如何处理数据呢?步骤也很简单。


假设我们有一堆数据,每一行为一条样本,以 \t 分隔,第一列是类别标签,第二列是输入文本的内容,文本内容中的词语以空格分隔。以下是两条示例数据:


  1. positive        今天终于试了自己理想的车 外观太骚气了 而且中控也很棒

  2. negative       这台车好贵 而且还费油 性价比太低了


现在开始做数据预处理


1. 创建 reader


  1. def train_reader(data_dir, word_dict, label_dict):

  2.     def reader():

  3.         UNK_ID = word_dict["<UNK>"]

  4.         word_col = 0

  5.         lbl_col = 1

  6.         for file_name in os.listdir(data_dir):

  7.             with open(os.path.join(data_dir, file_name), "r") as f:

  8.                 for line in f:

  9.                     line_split = line.strip().split("\t")

  10.                     word_ids = [

  11.                         word_dict.get(w, UNK_ID)

  12.                         for w in line_split[word_col].split()

  13.                     ]

  14.                     yield word_ids, label_dict[line_split[lbl_col]]

  15.     return reader


返回类型为: paddle.data_type.integer_value_sequence(词语在字典的序号)和 paddle.data_type.integer_value(类别标签)


2. 组合读取方式


  1. train_reader = paddle.batch(

  2.         paddle.reader.shuffle(

  3.             reader.train_reader(train_data_dir, word_dict, lbl_dict),

  4.             buf_size=1000),

  5.         batch_size=batch_size)


完整的代码如下(加上了划分 train 和 test 部分):


  1. import os

  2. def train_reader(data_dir, word_dict, label_dict):

  3.     """

  4.    创建训练数据reader

  5.     :param data_dir: 数据地址.

  6.     :type data_dir: str

  7.     :param word_dict: 词典地址,

  8.         词典里必须有 "UNK" .

  9.     :type word_dict:python dict

  10.     :param label_dict: label 字典的地址

  11.     :type label_dict: Python dict

  12.     """

  13.     def reader():

  14.         UNK_ID = word_dict["<UNK>"]

  15.         word_col = 1

  16.         lbl_col = 0

  17.         for file_name in os.listdir(data_dir):

  18.             with open(os.path.join(data_dir, file_name), "r") as f:

  19.                 for line in f:

  20.                     line_split = line.strip().split("\t")

  21.                     word_ids = [

  22.                         word_dict.get(w, UNK_ID)

  23.                         for w in line_split[word_col].split()

  24.                     ]

  25.                     yield word_ids, label_dict[line_split[lbl_col]]

  26.     return reader

  27. def test_reader(data_dir, word_dict):

  28.     """

  29.     创建测试数据reader

  30.     :param data_dir: 数据地址.

  31.     :type data_dir: str

  32.     :param word_dict: 词典地址,

  33.         词典里必须有 "UNK" .

  34.     :type word_dict:python dict

  35.     """

  36.     def reader():

  37.         UNK_ID = word_dict["<UNK>"]

  38.         word_col = 1

  39.         for file_name in os.listdir(data_dir):

  40.             with open(os.path.join(data_dir, file_name), "r") as f:

  41.                 for line in f:

  42.                     line_split = line.strip().split("\t")

  43.                     if len(line_split) < word_col: continue

  44.                     word_ids = [

  45.                         word_dict.get(w, UNK_ID)

  46.                         for w in line_split[word_col].split()

  47.                     ]

  48.                     yield word_ids, line_split[word_col]

  49.     return reader


总结


这篇文章主要讲了在 paddlepaddle 里如何加载自己的数据集,转换成相应的格式,并划分 train 和 test。我们在使用一个框架的时候通常会先去跑几个简单的 demo,但是如果不用常见的 demo 的数据,自己做一个实际的项目,完整的跑通一个模型,这才代表我们掌握了这个框架的基本应用知识。跑一个模型第一步就是数据预处理,在 paddlepaddle 里,提供的方式非常简单,但是有很多优点:


  • shuffle 数据非常方便

  • 可以将数据组合成 batch 训练

  • 可以利用 reader decorator 来组合多个 reader,提高组合特征运行模型的效率

  • 可以多线程读取数据


而我之前使用过 mxnet 来训练车牌识别的模型,50w 的图片数据想要一次训练是非常慢的,这样的话就有两个解决方法:一是批量训练,这一点大多数的框架都会有,二是转换成 mxnet 特有的 rec 格式,提高读取效率,可以通过 im2rec.py 将图片转换,比较麻烦,如果是 tesnorflow,也有相对应的特定格式 tfrecord,这几种方式各有优劣,从易用性上,paddlepaddle 是比较简单的。 


参考文章:


1. 官网说明:http://doc.paddlepaddle.org/develop/doc_cn/getstarted/concepts/use_concepts_cn.html



本文为机器之心专栏,转载请联系原作者获得授权。

✄------------------------------------------------

加入机器之心(全职记者/实习生):hr@jiqizhixin.com

投稿或寻求报道:content@jiqizhixin.com

广告&商务合作:bd@jiqizhixin.com

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存