Gemma 2 实例分享 | 使用 Dataflow 流式传输 ML 内容
数据局部性,使用本地工作调用而不是将数据通过远程过程调用 (RPC) 发送到另一个系统。
可自动扩展的单一系统,允许使用诸如背压等指标作为自动调节程序的直接信号。
用于在生产环境中进行观察和监控的单一系统。
将大小与性能优势相结合
https://ai.google.dev/gemma
Dataflow 提供一个可伸缩的统一批处理和流式处理平台。借助 Dataflow,您可以使用 Apache Beam Python SDK 开发流式数据、事件处理流水线。Dataflow 拥有以下优势:
Dataflow 为全托管式,可根据需求自动进行拓展和缩减。 Apache Beam 提供一套低代码一站式转换功能,可节省您编写通用样板代码的时间、精力和成本。毕竟,最理想的代码是不需要您亲自编写的。 Dataflow ML 直接支持通过 GPU 安装必要的驱动程序并提供对一系列 GPU 设备的访问权限。
Dataflow
https://cloud.google.com/dataflow
GPU 设备
https://cloud.google.com/dataflow/docs/gpu/gpu-support#availability
以下示例将说明如何将 Gemma 模型嵌入流式数据流水线中,以便使用 Dataflow 运行推理。
首先,我们希望通过总结积极的互动来高效地管理和存储聊天数据,以供参考和未来分析。
其次,我们希望部署实时问题检测和解决机制,使用情感分析快速识别感到不满意的客户,并生成量身定制的回复来解决他们的问题。
该解决方案使用流水线来近乎实时地处理已完成的聊天消息。首先,Gemma 会监测并分析这些聊天内容透露出的情绪。然后,模型会对所有聊天数据进行总结,并通过搭配使用 Dataflow 开箱即用的 I/O 连接器,将积极或中性情绪的聊天直接发送到数据平台 BigQuery。对于报告为消极情绪的聊天,我们会要求 Gemma 模型为感到不满意的客户生成符合情境的回复,并将此回复发送给工作人员进行审核,以便他们可以在优化消息之后再将其发送给可能感到不满意的客户。
BigQuery
https://cloud.google.com/bigquery
流水线如下所示:
1. 读取来自 Pub/Sub (我们的事件消息源) 的评论数据。此数据包含作为 JSON 负载的聊天 ID 和聊天记录,此负载在流水线中进行处理。
2. 流水线将该信息中的文本传递给 Gemma,并给出提示要求完成两个任务。
使用以下三个值,为消息附加情感评分: 1 表示积极情绪的聊天,0 表示中性情绪的聊天,-1 表示消极情绪的聊天。 用一句话总结聊天内容。
如果分数为 1 或 0,包含总结的聊天内容将被发送到我们的数据分析系统,用于存储和未来分析。 如果分数为 -1,我们会要求 Gemma 提供回复。然后,模型将此回复与聊天信息一起发送到事件消息传递系统,该系统充当流水线与其他应用之间的桥梁。在此步骤允许特定人员审查内容。
Pub/Sub
https://cloud.google.com/pubsub
设置
访问和下载 Gemma
在我们的示例中,我们通过 KerasNLP 使用 Gemma,并且还使用了 Kaggle "指令调整" gemma2_keras_gemma2_instruct_2b_en 变体。您必须下载模型并将其存储在流水线中可以访问的位置。
KerasNLP
https://keras.io/keras_nlp/
Kaggle
https://www.kaggle.com/models/google/gemma
gemma2_keras_gemma2_instruct_2b_en 变体
https://www.kaggle.com/models/google/gemma-2/keras
虽然可以使用 CPU 进行测试和开发,但考虑到推理时间,对于生产系统而言我们需要在 Dataflow ML 服务上使用 GPU。自定义容器有助于将 GPU 与 Dataflow 搭配使用,有关此设置的详细信息,请访问 Dataflow GPU 支持。我们建议您按照本地开发指南进行开发,以便快速测试流水线。您还可以参考关于在 Dataflow 上使用 Gemma 的指南,其中包括示例 Docker 文件的链接。
Dataflow GPU 支持 https://cloud.google.com/dataflow/docs/gpu/gpu-support 本地开发 https://cloud.google.com/dataflow/docs/gpu/develop-with-gpus 关于在 Dataflow 上使用 Gemma 的指南 https://cloud.google.com/dataflow/docs/machine-learning/gemma
Apache Beam 中的 RunInference 转换是此解决方案的核心,可使用模型处理程序进行配置,并使用户从生产所需的样板代码中抽象出来。大多数模型类型仅支持使用 Beam 内置的模型处理程序进行配置,但对于 Gemma,本文介绍的是如何使用自定义模型处理程序进行配置,这让我们在使用 RunInference 提供的所有处理机制的同时,还能完全控制与模型的交互。流水线 custom_model_gemma.py 有一个供您使用的示例 GemmModelHandler。请注意,务必使用来自 GemmModelHandler 的 model.generate() 调用中所使用的 max_length 值。该值控制 Gemma 的最大查询回复长度,并且需要更改才能匹配用例的需求。在本文中,我们使用的值是 512。
custom_model_gemma.py https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dataflow/gemma/custom_model_gemma.py
构建流水线
流水线中的第一步是建立事件处理系统的标准: 我们需要读取上游系统创建的 JSON 消息,将聊天信息打包成一个包含聊天 ID 的简单结构。
chats = ( pipeline | "Read Topic" >>
beam.io.ReadFromPubSub(subscription=args.messages_subscription)
| "Decode" >> beam.Map(lambda x: x.decode("utf-8")
)
以下示例显示了其中一条 JSON 消息,以及关于菠萝和披萨的重要讨论 (其中 ID 221 是我们的客户)。
{
"id": 1,
"user_id": 221,
"chat_message": "\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"
}
我们现在有包含各种 python 聊天对象的 PCollection。在下一步,我们将从这些聊天消息中提取所需的值,并将其整合到提示中,以传递给经过指令调优的 LLM。为此,我们会创建一个为模型提供指令的提示模板。
prompt_template = """
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user {}
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
@@@{}@@@
<answer>
"""
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user 221
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
@@@"\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"@@@
<answer>
1. 本提示示例仅供参考。对于您自己的提示,请使用应用的指示性数据运行完整的分析。
对于原型设计,您可以使用 http://aistudio.google.com/ 来快速测试 Gemma 和 Gemini 行为。如果您想以编程方式进行测试,还可以使用一键式 API 密钥。
2. 对于尺寸较小、功能较弱的模型,可以通过将指令简化为单个任务并针对模型进行多次调用,或许能获得更好的回复。
3. 我们将聊天消息摘要限制为最多 512 个字符。此值与 max_length 配置中提供的值相匹配,以便 Gemma 生成调用。
4. 使用 "@@@" 符号,可以便于我们在处理后从消息中提取原始聊天内容。除此之外,还可采取以下方法完成此任务:
将整个聊天消息用作键值对中的键。
将结果连接回原始数据,这种方法需要进行重排。
5. 由于需要处理代码中的回复,所以我们要求 LLM 创建一个 JSON 格式的提示,并包含两个字段: 情感和总结。
# Create the prompt using the information from the chat
class CreatePrompt(beam.DoFn):
def process(self, element, *args, **kwargs):
user_chat = json.loads(element)
chat_id = user_chat['id']
user_id = user_chat['user_id']
messages = user_chat['chat_message']
yield (chat_id, prompt_template.format(user_id, messages))
prompts = chats | "Create Prompt" >> beam.ParDo(CreatePrompt())
keyed_model_handler = KeyedModelHandler(GemmaModelHandler('gemma2'))
results = prompts | "RunInference-Gemma" >> RunInference(keyed_model_handler)
结果集合现在包含来自 LLM 调用的结果。有一点十分有趣: 虽然 LLM 调用的是代码,但与仅调用另一个函数不同,结果具有不确定性!这包括我们的提示请求 "将结果输出为带有字段 [情感、总结] 的 JSON 文件" 的最后一点。一般来说,回复应符合该格式,但并不能保证这一点。因此,我们需要采取一些防范措施,并验证我们的输入。如果验证失败,我们会将结果输出到错误集合。在此示例中,我们对这些值予以保留。对于生产流水线,您可能希望让 LLM 重新尝试并再次在 RunInference 中运行错误集合结果,然后将回复与结果一起展平处理。由于 Beam 流水线为有向无环图,因此我们无法在此处创建循环。
现在,我们采用结果集合并处理 LLM 输出。为了处理 RunInference 的结果,我们创建了一个新的 SentimentAnalysis 和函数 extract_model_reply。在此步骤,模型会返回类型 PredictionResult 的对象:
def extract_model_reply(model_inference):
match = re.search(r"(\{[\s\S]*?\})", model_inference)
json_str = match.group(1)
result = json.loads(json_str)
if all(key in result for key in ['sentiment', 'summary']):
return result
raise Exception('Malformed model reply')
class SentimentAnalysis(beam.DoFn):
def process(self, element):
key = element[0]
match = re.search(r"@@@([\s\S]*?)@@@", element[1].example)
chats = match.group(1)
try:
# The result will contain the prompt, replace the prompt with ""
result = extract_model_reply(element[1].inference.replace(element[1].example, ""))
processed_result = (key, chats, result['sentiment'], result['summary'])
if (result['sentiment'] <0):
output = beam.TaggedOutput('negative', processed_result)
else:
output = beam.TaggedOutput('main', processed_result)
except Exception as err:
print("ERROR!" + str(err))
output = beam.TaggedOutput('error', element)
yield output
不妨花几分钟时间来研究对 extract_model_reply() 的需求。由于模型为自托管式,我们无法保证文本会是 JSON 输出,所以需要运行多次检查。使用 Gemini API 的一个好处是,它包含一项被称为约束解码的功能,可确保输出始终是 JSON 格式。
约束解码
https://ai.google.dev/gemini-api/docs/api-overview
现在,让我们在流水线中使用这些函数:
filtered_results = (results | "Process Results" >> beam.ParDo(SentimentAnalysis()).with_outputs('main','negative','error'))
使用 with_outputs 在 filtered_results 中创建多个可访问的集合。主集合包含积极和中性评价的情感及摘要,而错误包含 LLM 中无法解析的回复。您可以使用写入转换功能,将这些集合发送到其他来源,例如 BigQuery。此示例中未演示此步骤,但我们希望在此流水线中实现更多与负面评价集合相关的功能。
负面情绪处理
确保客户满意对于留住客户至关重要。虽然我们在刚才示例中使用了菠萝披萨这样一个简单的例子,但在与客户直接互动时,相关的各个部门应始终努力做到有同理心并积极回复。在此阶段,我们会将聊天内容转交给一位训练有素的工作人员,但我们仍然可以查看 LLM 是否能够帮助该工作人员缩短解决问题的时间。
对于此步骤,我们会调用模型并要求其生成回复。在代码中,我们再次使用 Gemma 2B 模型进行此调用。
generated_responses = (results.negative
| "Generate Response" >> beam.Map(lambda x: ((x[0], x[3]), "<prompt>Generate an apology response for the user in this chat text: " + x[1] + "<answer>"))
| "Gemma-Response" >> RunInference(keyed_model_handler)
一般来说,您可以将提示创建代码封装在 DoFn 中,但也可以在流水线代码本身中使用简单的 lambda。在此处,我们会生成一个提示,其中包含从 SentimentAnalysis 函数中提取出来的原始聊天消息。
对于本地运行和测试,我们可以使用一些简单的 print 语句来查看各种 PCollection 上的输出:
generated_responses | "Print Response" >> beam.Map(print)
filtered_results.main | "Print Main" >> beam.Map(print)
filtered_results.error | "Print Errors" >> beam.Map(print)
运行流水线
步骤 1: 情感分析和总结
步骤 2: 生成的回复
微调
https://ai.google.dev/gemma/docs/distributed_tuning
在生成步骤中,我们将所有负面情感的聊天传递给 Gemma 2B 模型。如果想将集合的一部分发送到另一个模型,则可以结合使用 Beam 中的 Partition 函数与 filtered_responses.negative 集合。通过将一些客户消息定向到不同的模型,并在发送之前让工作人员为生成的回复打分,可以了解到有关回复质量的反馈,以帮助我们更好地改进。
Partition
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Partition
谷歌开发者特别招募活动进行中
诚邀热爱技术的你加入
通过多种形式 (文章/视频/coding 等) 创作与 Google 技术相关的讲解分享、实践案例或活动感受等内容,以及分享您应用 AI 技术的故事经历与成果。我们将为您提供平台和资源,助力您在分享中提升技能。更有惊喜权益等您领取,快来报名参与吧!