查看原文
其他

Gemma 2 实例分享 | 使用 Dataflow 流式传输 ML 内容

Google 谷歌开发者
2024-09-27
作者 / Reza Rokni, Google Senior Staff, Dataflow, Ravin Kumar, Google Data Scientist. Language Applications


Gemma 2 是 Google 最先进的轻量级开放模型系列,采用了与创建 Gemini 模型相同的研究和技术。像 Gemma 这样的大语言模型 (LLM) 用途广泛,为业务流程提供了诸多的集成可能性。本篇文章探讨了如何使用 Gemma 来评估对话传达出的情绪总结对话的内容,并协助为复杂对话生成响应 (之后可以再由人工审批)。其中一个关键要求是,表达了有急切需求的用户可以近乎实时地得到回应,这意味着我们需要利用流式数据流水线,从而以最低的延迟使用 LLM。


Gemma


Gemma 2 将大小与性能优势相结合,可以实现出色的基准结果,甚至比一些大模型更为出色。模型的小尺寸架构使其能够直接在流式数据处理流水线上部署或嵌入模型,并具有以下优势:
  • 数据局部性,使用本地工作调用而不是将数据通过远程过程调用 (RPC) 发送到另一个系统。

  • 可自动扩展的单一系统,允许使用诸如背压等指标作为自动调节程序的直接信号。

  • 用于在生产环境中进行观察和监控的单一系统。


  • 将大小与性能优势相结合

    https://ai.google.dev/gemma


Dataflow 提供一个可伸缩的统一批处理和流式处理平台。借助 Dataflow,您可以使用 Apache Beam Python SDK 开发流式数据、事件处理流水线。Dataflow 拥有以下优势:

  • Dataflow 为全托管式,可根据需求自动进行拓展和缩减。
  • Apache Beam 提供一套低代码一站式转换功能,可节省您编写通用样板代码的时间、精力和成本。毕竟,最理想的代码是不需要您亲自编写的。
  • Dataflow ML 直接支持通过 GPU 安装必要的驱动程序并提供对一系列 GPU 设备的访问权限。


  • Dataflow

    https://cloud.google.com/dataflow

  • GPU 设备

    https://cloud.google.com/dataflow/docs/gpu/gpu-support#availability


以下示例将说明如何将 Gemma 模型嵌入流式数据流水线中,以便使用 Dataflow 运行推理。



场景


这个场景围绕一家繁忙的连锁餐饮企业展开,通过各种沟通渠道分析和存储大量的客户支持请求。这些互动既包括自动聊天机器人生成的聊天,也包括更多需要即时支持人员关注的微妙对话。为了应对这一挑战,我们制定了宏大的目标:
  • 首先,我们希望通过总结积极的互动来高效地管理和存储聊天数据,以供参考和未来分析。

  • 其次,我们希望部署实时问题检测和解决机制,使用情感分析快速识别感到不满意的客户,并生成量身定制的回复来解决他们的问题。


该解决方案使用流水线来近乎实时地处理已完成的聊天消息。首先,Gemma 会监测并分析这些聊天内容透露出的情绪。然后,模型会对所有聊天数据进行总结,并通过搭配使用 Dataflow 开箱即用的 I/O 连接器,将积极或中性情绪的聊天直接发送到数据平台 BigQuery。对于报告为消极情绪的聊天,我们会要求 Gemma 模型为感到不满意的客户生成符合情境的回复,并将此回复发送给工作人员进行审核,以便他们可以在优化消息之后再将其发送给可能感到不满意的客户。


  • BigQuery

    https://cloud.google.com/bigquery


在此用例中,我们将探讨在流水线中使用 LLM 时一些有趣的方面。例如,考虑到客户可以接受的回复具有非确定性,而 LLM 又必须以代码处理回复,因此存在一些挑战。例如,我们要求 LLM 以 JSON 格式做出回复,但无法保证模型能做到这一点。此请求需要我们解析和验证响应,过程与您以往处理可能没有正确结构化数据的来源时所采用的数据处理过程类似。


借助此解决方案,客户可以体验更快的响应,并在出现问题时获得个性化的回复。将积极聊天内容总结为自动化回复,为工作人员节省了时间,让他们可以专注于更复杂的沟通。此外,深入分析聊天数据可以推动基于数据驱动的决策制定,而系统的可扩展性使模型可以轻松应对不断增加的聊天量,且不会对响应质量造成影响。


数据处理流水线


流水线如下所示:

以下是高级流水线的相关描述:

1. 读取来自 Pub/Sub (我们的事件消息源) 的评论数据。此数据包含作为 JSON 负载的聊天 ID 和聊天记录,此负载在流水线中进行处理。

2. 流水线将该信息中的文本传递给 Gemma,并给出提示要求完成两个任务。

  • 使用以下三个值,为消息附加情感评分: 1 表示积极情绪的聊天,0 表示中性情绪的聊天,-1 表示消极情绪的聊天。
  • 用一句话总结聊天内容。
3. 接下来,根据情感评分会出现以下分支:
  • 如果分数为 1 或 0,包含总结的聊天内容将被发送到我们的数据分析系统,用于存储和未来分析。
  • 如果分数为 -1,我们会要求 Gemma 提供回复。然后,模型将此回复与聊天信息一起发送到事件消息传递系统,该系统充当流水线与其他应用之间的桥梁。在此步骤允许特定人员审查内容。

  • Pub/Sub

    https://cloud.google.com/pubsub



流水线代码


设置


访问和下载 Gemma

在我们的示例中,我们通过 KerasNLP 使用 Gemma,并且还使用了 Kaggle "指令调整" gemma2_keras_gemma2_instruct_2b_en 变体。您必须下载模型并将其存储在流水线中可以访问的位置。


  • KerasNLP

    https://keras.io/keras_nlp/

  • Kaggle

    https://www.kaggle.com/models/google/gemma

  • gemma2_keras_gemma2_instruct_2b_en 变体

    https://www.kaggle.com/models/google/gemma-2/keras


使用 Dataflow 服务

虽然可以使用 CPU 进行测试和开发,但考虑到推理时间,对于生产系统而言我们需要在 Dataflow ML 服务上使用 GPU。自定义容器有助于将 GPU 与 Dataflow 搭配使用,有关此设置的详细信息,请访问 Dataflow GPU 支持。我们建议您按照本地开发指南进行开发,以便快速测试流水线。您还可以参考关于在 Dataflow 上使用 Gemma 的指南,其中包括示例 Docker 文件的链接。


  • Dataflow GPU 支持
    https://cloud.google.com/dataflow/docs/gpu/gpu-support
  • 本地开发
    https://cloud.google.com/dataflow/docs/gpu/develop-with-gpus
  • 关于在 Dataflow 上使用 Gemma 的指南
    https://cloud.google.com/dataflow/docs/machine-learning/gemma

Gemma 自定义模型处理程序

Apache Beam 中的 RunInference 转换是此解决方案的核心,可使用模型处理程序进行配置,并使用户从生产所需的样板代码中抽象出来。大多数模型类型仅支持使用 Beam 内置的模型处理程序进行配置,但对于 Gemma,本文介绍的是如何使用自定义模型处理程序进行配置,这让我们在使用 RunInference 提供的所有处理机制的同时,还能完全控制与模型的交互。流水线 custom_model_gemma.py 有一个供您使用的示例 GemmModelHandler。请注意,务必使用来自 GemmModelHandlermodel.generate() 调用中所使用的 max_length 值。该值控制 Gemma 的最大查询回复长度,并且需要更改才能匹配用例的需求。在本文中,我们使用的值是 512。


  • custom_model_gemma.py
    https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dataflow/gemma/custom_model_gemma.py


提示: 对于此文,我们发现使用 jax keras 后端的效果明显更好。要启用此后端,DockerFile 必须包含指令 ENV KERAS_BACKEND="jax"。您必须在工作器启动 Beam (即导入 Keras) 之前在容器中完成这项设置。


构建流水线


流水线中的第一步是建立事件处理系统的标准: 我们需要读取上游系统创建的 JSON 消息,将聊天信息打包成一个包含聊天 ID 的简单结构。

chats = ( pipeline | "Read Topic" >> beam.io.ReadFromPubSub(subscription=args.messages_subscription)| "Decode" >> beam.Map(lambda x: x.decode("utf-8")   )

以下示例显示了其中一条 JSON 消息,以及关于菠萝和披萨的重要讨论 (其中 ID 221 是我们的客户)。

{"id": 1, "user_id": 221, "chat_message": "\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"}

我们现在有包含各种 python 聊天对象的 PCollection。在下一步,我们将从这些聊天消息中提取所需的值,并将其整合到提示中,以传递给经过指令调优的 LLM。为此,我们会创建一个为模型提供指令的提示模板。

prompt_template = """<prompt>Provide the results of doing these two tasks on the chat history provided below for the user {}task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1task 2 : summarize the text with a maximum of 512 charactersOutput the results as a json with fields [sentiment, summary]
@@@{}@@@<answer>"""
以下是向模型发送的提示示例:
<prompt>Provide the results of doing these two tasks on the chat history provided below for the user 221task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1task 2 : summarize the text with a maximum of 512 charactersOutput the results as a json with fields [sentiment, summary]
@@@"\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"@@@<answer>

有关提示的一些注意事项:

1. 本提示示例仅供参考。对于您自己的提示,请使用应用的指示性数据运行完整的分析。

  • 对于原型设计,您可以使用 http://aistudio.google.com/ 来快速测试 Gemma 和 Gemini 行为。如果您想以编程方式进行测试,还可以使用一键式 API 密钥。

2. 对于尺寸较小、功能较弱的模型,可以通过将指令简化为单个任务并针对模型进行多次调用,或许能获得更好的回复。

3. 我们将聊天消息摘要限制为最多 512 个字符。此值与 max_length 配置中提供的值相匹配,以便 Gemma 生成调用。

4. 使用 "@@@" 符号,可以便于我们在处理后从消息中提取原始聊天内容。除此之外,还可采取以下方法完成此任务:

  • 将整个聊天消息用作键值对中的键。

  • 将结果连接回原始数据,这种方法需要进行重排。

5. 由于需要处理代码中的回复,所以我们要求 LLM 创建一个 JSON 格式的提示,并包含两个字段: 情感和总结。


要创建提示,我们需要解析源 JSON 消息中的信息,然后将其插入模板中。我们将此过程封装在 Beam DoFN 中,并在我们的流水线中使用。在 yield 语句中,我们构建了一个键值结构,其中聊天 ID 充当键。这种结构让我们能够在调用模型时将聊天与推理相匹配。
# Create the prompt using the information from the chatclass CreatePrompt(beam.DoFn): def process(self, element, *args, **kwargs): user_chat = json.loads(element) chat_id = user_chat['id'] user_id = user_chat['user_id'] messages = user_chat['chat_message'] yield (chat_id, prompt_template.format(user_id, messages))
prompts = chats |  "Create Prompt" >> beam.ParDo(CreatePrompt())
现在,我们可以调用自己的模型了。得益于 RunInference 机制,此步骤非常简单。我们将 GemmaModelHandler 封装在 KeyedModelhandler 中,并告诉 RunInference 接受传入的数据作为键值对元组。在开发和测试期间,模型存储在 gemma2 目录中。在 Dataflow ML 服务上运行模型时,模型存储在 Google Cloud Storage 中,URI 格式为 gs://<your_bucket>/gemma-directory
keyed_model_handler = KeyedModelHandler(GemmaModelHandler('gemma2'))results = prompts | "RunInference-Gemma" >> RunInference(keyed_model_handler)

结果集合现在包含来自 LLM 调用的结果。有一点十分有趣: 虽然 LLM 调用的是代码,但与仅调用另一个函数不同,结果具有不确定性!这包括我们的提示请求 "将结果输出为带有字段 [情感、总结] 的 JSON 文件" 的最后一点。一般来说,回复应符合该格式,但并不能保证这一点。因此,我们需要采取一些防范措施,并验证我们的输入。如果验证失败,我们会将结果输出到错误集合。在此示例中,我们对这些值予以保留。对于生产流水线,您可能希望让 LLM 重新尝试并再次在 RunInference 中运行错误集合结果,然后将回复与结果一起展平处理。由于 Beam 流水线为有向无环图,因此我们无法在此处创建循环。


现在,我们采用结果集合并处理 LLM 输出。为了处理 RunInference 的结果,我们创建了一个新的 SentimentAnalysis 和函数 extract_model_reply。在此步骤,模型会返回类型 PredictionResult 的对象:

https://beam.apache.org/releases/pydoc/2.56.0/_modules/apache_beam/ml/inference/base.html#PredictionResult


def extract_model_reply(model_inference): match = re.search(r"(\{[\s\S]*?\})", model_inference) json_str = match.group(1) result = json.loads(json_str) if all(key in result for key in ['sentiment', 'summary']): return result raise Exception('Malformed model reply')
class SentimentAnalysis(beam.DoFn): def process(self, element): key = element[0] match = re.search(r"@@@([\s\S]*?)@@@", element[1].example) chats = match.group(1) try: # The result will contain the prompt, replace the prompt with "" result = extract_model_reply(element[1].inference.replace(element[1].example, "")) processed_result = (key, chats, result['sentiment'], result['summary']) if (result['sentiment'] <0): output = beam.TaggedOutput('negative', processed_result) else: output = beam.TaggedOutput('main', processed_result)
except Exception as err: print("ERROR!" + str(err)) output = beam.TaggedOutput('error', element) yield output

不妨花几分钟时间来研究对 extract_model_reply() 的需求。由于模型为自托管式,我们无法保证文本会是 JSON 输出,所以需要运行多次检查。使用 Gemini API 的一个好处是,它包含一项被称为约束解码的功能,可确保输出始终是 JSON 格式。


  • 约束解码

    https://ai.google.dev/gemini-api/docs/api-overview


现在,让我们在流水线中使用这些函数:

filtered_results = (results | "Process Results" >> beam.ParDo(SentimentAnalysis()).with_outputs('main','negative','error'))

使用 with_outputsfiltered_results 中创建多个可访问的集合。主集合包含积极和中性评价的情感及摘要,而错误包含 LLM 中无法解析的回复。您可以使用写入转换功能,将这些集合发送到其他来源,例如 BigQuery。此示例中未演示此步骤,但我们希望在此流水线中实现更多与负面评价集合相关的功能。


负面情绪处理


确保客户满意对于留住客户至关重要。虽然我们在刚才示例中使用了菠萝披萨这样一个简单的例子,但在与客户直接互动时,相关的各个部门应始终努力做到有同理心并积极回复。在此阶段,我们会将聊天内容转交给一位训练有素的工作人员,但我们仍然可以查看 LLM 是否能够帮助该工作人员缩短解决问题的时间。


对于此步骤,我们会调用模型并要求其生成回复。在代码中,我们再次使用 Gemma 2B 模型进行此调用。

generated_responses = (results.negative | "Generate Response" >> beam.Map(lambda x: ((x[0], x[3]), "<prompt>Generate an apology response for the user in this chat text: " + x[1] + "<answer>"))       | "Gemma-Response" >> RunInference(keyed_model_handler)

一般来说,您可以将提示创建代码封装在 DoFn 中,但也可以在流水线代码本身中使用简单的 lambda。在此处,我们会生成一个提示,其中包含从 SentimentAnalysis 函数中提取出来的原始聊天消息。


对于本地运行和测试,我们可以使用一些简单的 print 语句来查看各种 PCollection 上的输出:

generated_responses | "Print Response" >> beam.Map(print)filtered_results.main | "Print Main" >> beam.Map(print)filtered_results.error | "Print Errors" >> beam.Map(print)
当然,在实际使用中,这些输出将被发送到各种接收器,如 Pub/Sub 和 BigQuery。

运行流水线


我们来看看模型如何处理前面的 JSON 消息:


步骤 1: 情感分析和总结

"情感" : -1,
"总结" : "用户 221 对披萨上有菠萝非常不满。"


2B 模型生成的回复也不错。情感分析正确无误,并且由于总结的结果更加主观,所以回复的正确性取决于下游对此信息的使用情况。

步骤 2: 生成的回复

"我知道您对菠萝披萨不满意。虽然这是非常个人的偏好,但也很抱歉没能让您感到满意。我们致力于提供多样化的菜单,以满足各种口味需求,同时也随时欢迎反馈意见。您愿意分享更多您对菠萝披萨的看法吗?"

这些回复是否可以接受?在这个阶段,我们打算将整个数据包发送给工作人员进行分析,如果他们满意,则可以按原文发送,也可以进行一些编辑和调整。


后续步骤


也许在这个阶段,我们希望使用有更多参数的模型,例如 Gemma 2 9B 或 27B。或者使用一个足够大的模型,需要通过 API 调用外部服务 (如 Gemini),而不是加载到工作器上。毕竟,我们使用较小的模型作为过滤器,以减少发送数据到这些较大模型所需完成的工作。做出这些选择不仅是一项技术决策,也是一项商业决策,因为需要衡量成本和收益。我们可以再次利用 Dataflow 来更轻松地设置 A/B 测试。


您还可以选择根据用例对模型进行自定义微调。这是改变模型 "作用" 以满足您需求的一种方式。


  • 微调

    https://ai.google.dev/gemma/docs/distributed_tuning



A/B 测试


在生成步骤中,我们将所有负面情感的聊天传递给 Gemma 2B 模型。如果想将集合的一部分发送到另一个模型,则可以结合使用 Beam 中的 Partition 函数与 filtered_responses.negative 集合。通过将一些客户消息定向到不同的模型,并在发送之前让工作人员为生成的回复打分,可以了解到有关回复质量的反馈,以帮助我们更好地改进。


  • Partition

    https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Partition



总结


利用这几行代码,我们构建了一个能够高速、灵活处理客户情感数据的系统。通过利用 "在同等大小情况下有更卓越性能" 的 Gemma 2 开放模型,我们能够将这一强大的 LLM 纳入流处理用例中,从而为客户打造更好的体验。




谷歌开发者特别招募活动进行中

诚邀热爱技术的你加入


通过多种形式 (文章/视频/coding 等) 创作与 Google 技术相关的讲解分享、实践案例或活动感受等内容,以及分享您应用 AI 技术的故事经历与成果。我们将为您提供平台和资源,助力您在分享中提升技能。更有惊喜权益等您领取,快来报名参与吧!





 点击屏末 |  | 即刻体验 Gemma 2

继续滑动看下一个
谷歌开发者
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存