使用 Google Cloud 上的 tf.Transform 对 TensorFlow 管道模式进行预处理
文 / Matthias Feys,ML6 首席技术官
机器学习模型需要数据来训练,但是通常需要对这些数据进行预处理,以便在训练模型时发挥作用。这种预处理(通常称为 “特征工程”)采用多种形式,例如:规范化和缩放数据,将分类值编码为数值,形成词汇表,以及连续数值的分级。
在生产过程中利用机器学习时,为了确保在模型的离线培训期间应用的特征工程步骤与使用模型用于预测时应用的特征工程步骤保持相同,这往往就成为一项极具挑战性的任务。此外,放眼当今世界,机器学习模型会在超大型的数据集上进行训练,因此在训练期间应用的预处理步骤将会在大规模分布式计算框架(例如 Google Cloud Dataflow 或 Apache Spark)上实现。由于训练环境通常与服务环境大相径庭,在训练和服务期间执行的特征工程之间可能会产生不一致的情况。
幸运的是,我们现在有了 tf.Transform,这是一个 TensorFlow 库,它提供了一个优雅的解决方案,以确保在训练和服务期间特征工程步骤的一致性。 在这篇文章中,我们将提供在 Google Cloud Dataflow 上使用 tf.Transform,以及在 Cloud ML Engine 上进行模型训练和服务的具体示例。
注:tf.Transform 链接
https://github.com/tensorflow/transform
应用于机器模拟上的变换用例
ecc.ai 是一个有助于优化机器配置的平台。 我们模拟物理机器(例如瓶灌装机或饼干机)以便找到更优化的参数设置。 由于每个模拟的物理机器的目标是具有与实际机器相同的输入/输出特性,我们称之为 “数字孪生”。
这篇文章将展示这个 “数字孪生” 的设计和实现过程。 在最后一段中,您可以找到有关我们之后如何使用这些数字孪生来优化机器配置的更多信息。
注:ecc.ai 链接
https://ecc.ai/
tf.Transform 释义
tf.Transform 是 TensorFlow 的一个库,它允许用户定义预处理管道模式并使用大规模数据处理框架运行这些管道模式,同时还以可以作为 TensorFlow 图形的一部分运行的方式导出管道。 用户通过组合模块化 Python 函数来定义管道,然后 tf.Transform 随着 Apache Beam 一起运行。 tf.Transform 导出的 TensorFlow 图形可以在使用训练模型进行预测时复制预处理步骤,比如在使用 TensorFlow Serving 服务模型时。
注:Apache Beam 链接
https://beam.apache.org/
TensorFlow Serving 链接
https://ai.googleblog.com/2016/02/running-your-models-in-production-with.html
tf.Transform 允许用户定义预处理管道。 用户可以实现预处理数据以用于 TensorFlow 训练,还可以将转换编码为 TensorFlow 图形后导出。然后将该变换图形结合到用于推断的模型图中
tf.Transform 建立数字孪生
数字双模型的目标是能够根据其输入预测机器的所有输出参数。 为了训练这个模型,我们分析了包含这种关系的观察记录历史的日志数据。 由于日志的数据量可能会相当广泛,理想的情况是应该以分布式方式运行此步骤。 此外,必须在训练和服务的时间之间使用相同的概念和代码,这样对预处理代码的改动最小。
开发伊始,我们在任何现有的开源项目中都找不到此功能。 因此,我们开始构建用于 Apache Beam 预处理的自定义工具,这使我们能够分配我们的工作负载并轻松地在多台机器之间切换。 但是不太幸运的是,这种方法不允许我们在服务时(即在生产环境中使用训练模型时)重复使用相同的代码作为 TensorFlow 图形的一部分运行。
在实践中,我们必须在 Apache Beam 中编写自定义分析步骤,计算并保存每个变量所需的元数据,以便在后续步骤中进行实际的预处理。 我们在训练期间使用 Apache Beam 执行后续预处理步骤,并在服务期间作为 API 的一部分执行。 不幸的是,由于它不是 TensorFlow 图形的一部分,我们不能简单地使用 ML Engine 将我们的模型部署为 API,而我们的 API 总是由预处理部分和模型部分组成,这使得统一升级变得更加困难。而且,对于所有想要使用的那些已有的和全新的转换,我们需要为此实施和维护分析并转换步骤。
TensorFlow Transform 解决了这些问题。 自发布以来,我们将其直接整合为我们完整管道模式的主要构建块。
简化数字孪生示例流程
我们现在将专注于构建和使用特定机器的数字孪生。 举个例子,我们假设有一个布朗尼面团机器。 这台机器对不同的原料进行加热、搅拌,直到面团产生完美的质地。 我们将从批次问题开始,这意味着数据在完整的生产批次中进行汇总,而不是在连续不断的生产线上进行汇总。
数据
我们有两种类型的数据:
输入数据:原料描述(绿色)和布朗尼面团机(蓝色)的设置。 您可以在下面找到列名称和 3 个示例行。
输出数据:带有这些原料的机器设置结果:消耗的能量,输出的质量度量和输出量。 您可以在下面找到列名称和 3 个示例行。
制作数字孪生
在这里,我们在云存储中根据两种不同类型文件的历史日志数据来训练系统的数字孪生。 该数字孪生能够基于输入数据预测输出数据。上图显示我们在此流程中使用的 Google 服务。
预处理
使用 tf.Transform 函数,Apache Beam 将完成预处理(制作训练示例)。
预处理阶段包括 4 个步骤,代码如下:
1. 组合输入/输出数据,并制作原始数据 PCollection。
1 raw_data_input = (
2 p
3 | 'ReadInputData' >> textio.ReadFromText(train_data_file)
4 | 'ParseInputCSV'>> beam.Map(converter_input.decode)
5 | 'ExtractBatchKeyIn'>> beam.Map(extract_batchkey))
6
7 raw_data_output = (
8 p
9 | 'ReadOutputData' >> textio.ReadFromText(train_data_file)
10 | 'ParseOutputCSV'>> beam.Map(converter_output.decode)
11 | 'ExtractBatchKeyOut'>> beam.Map(extract_batchkey))
12
13 raw_data = (
14 (raw_data_input, raw_data_output)
15 | 'JoinData' >> CoGroupByKey()
16 | 'RemoveKeys'>> beam.Map(remove_keys))
2. 定义将预处理原始数据的预处理功能。 此函数将组合多个 TF-Transform 函数,以生成 TensorFlow Estimators 的示例。
Language: Python
1 def preprocessing_fn(inputs):
2 """Preprocess input columns into transformed columns."""
3 outputs = {}
4 # Encode categorical column:
5 outputs['Mixing Speed'] = tft.string_to_int(inputs['Mixing Speed'])
6 # Calculate Derived Features:
7 outputs['Total Mass'] = inputs['Butter Mass'] + inputs['Sugar Mass'] + inputs['Flour Mass']
8 for ingredient in ['Butter', 'Sugar', 'Flour']:
9 ingredient_percentage = inputs['{} Mass'.format(ingredient)] / outputs['Total Mass']
10 outputs['Norm {} perc'.format(ingredient)] = tft.scale_to_z_score(ingredient_percentage)
11 # Keep absolute numeric columns
12 for key in ['Total Volume', 'Energy']:
13 outputs[key]=inputs[key]
14 # Normalize other numeric columns
15 for key in [
16 'Butter Temperature',
17 'Sugar Humidity',
18 'Flour Humidity'
19 'Heating Time',
20 'Mixing Time',
21 'Density',
22 'Temperature',
23 'Humidity',
24 ]:
25 outputs[key] = tft.scale_to_z_score(inputs[key]) 26 # Extract Specific Problems
27 chunks_detected_str = tf.regex_replace(
28 inputs['Problems'],
29 '.*chunk.*'
30 'chunk',
31 name='Detect Chunk')
32 outputs['Chunks']=tf.equal(chunks_detected_str,'chunk')
33 return outputs
3. 使用预处理功能分析和转换整个数据集。这部分代码将采用预处理功能,首先分析数据集,即完整传递数据集以计算分类列的词汇表,然后计算平均值和标准化列的标准偏差。 接下来,Analyze 步骤的输出用于转换整个数据集。
1 transform_fn = raw_data | AnalyzeDataset(preprocessing_fn)
2 transformed_data = (raw_data, transform_fn) | TransformDataset()
4. 保存数据并将 TransformFn 和元数据文件序列化。
1 transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
2 transformed_eval_data_base,
3 coder=example_proto_coder.ExampleProtoCoder(transformed_metadata))
4
5 _ = (
6 transform_fn
7 | "WriteTransformFn" >>
8 transform_fn_io.WriteTransformFn(working_dir))
9
10
11 transformed_metadata | 'WriteMetadata' >> beam_metadata_io.WriteMetadata(
12 transformed_metadata_file, pipeline=p)
训练
使用预处理数据作为 `TFRecords`,我们现在可以使用 Estimators 轻松训练带有标准 TensorFlow 代码的 TensorFlow 模型。
导出训练的模型
在分析数据集的结构化方法旁边,tf.Transform 的实际功能在于可以导出预处理图。 您可以导出 TensorFlow 模型,该模型包含与训练数据完全相同的预处理步骤。
为此,我们只需要使用 tf.Transform 输入函数导出训练模型:
1 tf_transform_output = tft.TFTransformOutput(working_dir)
2 serving_input_fn = _make_serving_input_fn(tf_transform_output)
3 exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR)
4 estimator.export_savedmodel(exported_model_dir, serving_input_fn)
_make_serving_input_fn 函数是一个非常通用的函数,不管项目的逻辑如何,您都可以简单地在不同项目之间重用:
Language: Python
1 def _make_serving_input_fn(tf_transform_output):
2 raw_feature_spec = RAW_DATA_METADATA.schema.as_feature_spec()
3 raw_feature_spec.pop(LABEL_KEY)
4
5 def serving_input_fn():
6 raw_input_fn = input_fn_utils.build_parsing_serving_input_fn(
7 raw_feature_spec)
8 raw_features, _, default_inputs = raw_input_fn()
9 transformed_features = tf_transform_output.transform_raw_features(
10 raw_features)
11 return input_fn_utils.InputFnOps(transformed_features, None, default_inputs)
12
13 return serving_input_fn
使用数字孪生
数字孪生示例流程的最后一部分使用保存的模型根据输入预测系统的输出。 这是我们可以充分利用 tf.Transform 的地方,因为这使得在 Cloud ML Engine 上部署 “TrainedModel”(包括预处理)变得非常容易。
要部署训练模型,您只需运行 2 个命令:
1 gcloud ml-engine models create MODEL_NAME
2 gcloud ml-engine versions create VERSION --model=MODEL_NAME --origin=ORIGIN
现在,我们可以使用以下代码轻松地与我们的数字孪生进行交互
1 def get_predictions(project, model, instances, version=None):
2 service = discovery.build('ml', 'v1')
3 name = 'projects/{}/models/{}'.format(project, model)
4
5 if version is not None:
6 name += '/versions/{}'.format(version)
7
8 response = service.projects().predict(
9 name=name,
10 body={'instances': instances}
11 ).execute()
12
13 if 'error' in response:
14 raise RuntimeError(response['error'])
15
16 return response['predictions']
17
18
19 if __name__ == "__main__":
20 predictions = get_predictions(
21 project="<project_id>",
22 model="<model_name>",
23 instances=[
24 {
25 "Butter Mass": 121,
26 "Butter Temperature": 20,
27 "Sugar Mass": 200,
28 "Sugar Humidity": 0.22,
29 "Flour Mass ": 50,
30 "Flour Humidity": 0.23,
31 "Heating Time": 50,
32 "Mixing Speed": "Max Speed",
33 "Mixing Time": 200
34 }]
35 )
在 ecc.ai,我们使用数字孪生来优化物理机器的参数。
简而言之,我们的方法包括 3 个步骤(如下图 1 所示):
使用历史机器数据创建模拟环境。机器的这种 “数字孪生” 则将作为能够允许增强代理来学习最佳控制策略的环境。
利用数字孪生使用我们的强化学习(RL)代理查找(新的)最佳参数设置。
使用 RL 代理配置真实机器的参数。
总结
通过 tf.Transform,我们现在已将我们的模型部署在 ML Engine 上作为一个 API,成为特定布朗尼面团机的数字孪生:它采用原始输入功能(成分描述和机器设置),并将反馈机器的预测输出。
好处是我们不需要维护 API 并且包含所有内容 - 因为预处理是服务图形的一部分。 如果我们需要更新 API,只需要使用最新的版本来刷新模型,所有相关的预处理步骤将会自动为您更新。
此外,如果我们需要为另一个布朗尼面团机器(使用相同数据格式的机器)制作数字孪生模型,但是是在不同的工厂或设置中运行,我们也可以轻松地重新运行相同的代码,无需手动调整预处理代码或执行自定义分析步骤。
您可以在 GitHub 上找到这篇文章的代码。
注:GitHub 链接
https://github.com/Fematich/tftransform-demo
更多 AI 相关阅读:
· TensorFlow Hub, 给您带来全新的 Web 体验
· 使用 TensorFlow Probability 对金融模型中的误差进行介绍性分析