AIGC 利器 Ray 云原生探索之路--分布式构建本地知识库
在 ChatGPT 火爆全网之后,各行各业都在围绕它来进行构建自己场景的应用。这里以本地知识库为场景来聊聊怎么结合 Ray Core、Ray Serve、KubeRay、LangChain、Embeddings、向量数据库、LLM、Kubernetes 和 GPU 方案来构建属于自己的本地的知识库。对整体方案和 Ray 相关感兴趣的可以阅读前面发布的一些文章,《AIGC 利器-Ray 云原生探索之路--总览篇》,《AIGC 利器 Ray 云原生探索之路--Ray Core 篇 (上)》,《AIGC 利器 Ray 云原生探索之路--Ray Core 篇 (下)》。
基本概念
Embeddings:在自然语言处理中,Embeddings 通常指的是将单词或短语映射到连续向量空间中的向量表示。
向量索引:向量索引(Vector Indexing)是一种用于快速检索相似向量的技术。它是基于向量空间模型(Vector Space Model)的思想,通过构建索引结构来加速相似向量的查询。在向量索引中,每个向量都被映射到一个高效的数据结构中,使得可以根据向量之间的相似度进行快速检索。如 FAISS,LlamaIndex 等。
向量数据库:支持向量数据的存储和查询的数据库就可以理解成是向量数据库,目前也有很多基于原有的数据库实现了向量插件的数据库,比如 Postgresql,ElasticSearch 等等。
LLM:Large Language Model 大语言模型。LLM 是一种基于深度学习的人工智能模型,具备强大的自然语言处理能力。它通过大规模的预训练过程,学习了广泛的语言知识和语言模式,并可以根据输入的文本生成连贯、合理的回复。
Prompt:在自然语言处理中,Prompt 是指用户给出的一段文字或指令,用于引导或提示语言模型生成特定的回复或完成特定的任务。Prompt 可以是一个问题、一个描述性的语句、一句指令,甚至是一个完整的上下文对话。它作为输入提供给语言模型,以指导模型生成相关的回答或执行相应的任务。Prompt 的设计因为直接影响语言模型的输出,所以非常重要。一个设计合理的 Prompt 可以引导模型生成更准确、更有针对性的回答。通过在 Prompt 中提供所需的背景信息、明确的要求或限制条件,可以控制模型的回答风格、内容的方向性,或者针对特定任务的执行。
LangChain:提高了一个基于 LLM 构建应用的框架。支持 6 个主要的场景支持,包括 LLM/Prompt(和大语言模型对接,提高大语言模型生成内容的质量),Chains(提供链式的方式运行各种任务),Data Augmented Generation(数据增强的生成能力),Agents(通过大语言模型结合外部工具,完成定制化能力的任务),Memory(提供大语言模型聊天的历史上下文能力),Evaluation(生成式模型很难用传统指标来评估。评估它们的一种新方法是使用语言模型本身。LangChain 提供了一些 prompts/chains 来帮助实现这一点)。
Ray Core:是 Ray 框架的底层框架,提供了一整套的分布式计算的框架,可以将普通的应用转化成分布式的系统。这里主要以 Python 语言的程序为主,具体可以参看前面的文章。
Ray Serve:是一个可扩展的模型服务库,用于构建在线推理 API。Serve 是框架无关的,所以可以使用一个工具包来服务所有的东西,从使用 PyTorch、Tensorflow 和 Keras 等框架构建的深度学习模型,到 Scikit-Learn 模型,再到任意的 Python 业务逻辑。Serve 特别适合于模型组合,使您能够构建由多个 ML 模型和业务逻辑组成的复杂推理服务(全部使用 Python 代码)。Ray Serve 是建立在 Ray Core 之上的,所以它很容易扩展到许多机器上,并提供灵活的调度支持。
Fine-Tune:基于数据对模型进行微调。这个过程也是涉及到对 LLM 的训练。目前,社区也有不少开放出来的模型都是基于这种方式的。至于是不要对模型进行调整,可以根据你的需求,如果你的任务和 LLM 本身是有不同的,是想基于 LLM 构建自己任务的大模型,那就需要微调模型。如果 LLM 本身和你的任务是一致的,比如只是为了让 LLM 知道更多知识,那也不一定要调整模型本身,用原来的基础模型就可以。
Search-Ask 方法:是一种用于问答系统的策略,结合了基于检索和基于生成的方法。它旨在充分利用检索和生成两种不同的技术,以提供更全面和准确的答案。在 Search-Ask 方法中,首先进行搜索(Search)阶段。用户的查询被发送到一个检索系统,该系统在预先建立的知识库或文档集合中执行搜索,并返回与查询相关的文档或候选答案。然后是询问(Ask)阶段。在此阶段,通过使用基于生成的方法,从检索到的文档或候选答案中提取或生成最终的答案。这可能涉及文本摘要、实体识别、关系抽取等技术来提取关键信息,或者使用生成模型(如语言模型)来生成具有上下文和语义连贯性的答案。通过结合搜索和生成两个阶段,Search-Ask 方法旨在通过检索提供初始的答案候选集,并通过生成进行下一步的筛选和精炼。这种方法的优势在于,检索阶段可以快速找到相关的文档或信息,而生成阶段可以通过理解上下文和语义生成更准确和丰富的答案。需要注意的是,具体实施 Search-Ask 方法的细节可以因应用场景和具体需求而有所不同。这种方法可以根据实际情况进行调整和定制,以达到最佳的问答效果。
02
本地向量处理
在 OpenAI 的 API 服务中提供了 embeddings 的 API,就是用于根据提供的文本输入,生成向量。但是这种是需要 OpenAI 的账号以及可以连接到 OpenAI 的 API 访问地址,而且还是收费的 API。所以,在内网或者国内的环境,如果有离线的方式也可以完成类似工作,那就是更好的选择。答案是肯定的。
在线的 API:
curl https://api.openai.com/v1/embeddings \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"input": "Your text string goes here",
"model": "text-embedding-ada-002"
}'
离线的方式:
以下 code sample 中使用了 HuggingFace 的 Embeddings 的模型 “text2vec-large-chinese” 来完成这个能力。其中 lHuggingFaceEmbeddings 有两个方法。一个是 embed_documents 方法,用于在构建 embeddings 的时候被使用。还有一个是 embed_query 方法,是在提问的整个环境中,以及在和 LLM 通信前,需要被使用到的方法,来完成对提问的文本的向量化处理。
from langchain.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
使用 embeddings 对文档进行向量化处理:
基于 pgvector 完成向量处理和向量数据的保存:
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver=os.environ.get("PGVECTOR_DRIVER", "psycopg2"),
host=os.environ.get("PGVECTOR_HOST", "xxx.xxx.xxx.xxx"),
port=int(os.environ.get("PGVECTOR_PORT", "5432")),
database=os.environ.get("PGVECTOR_DATABASE", "testpg"),
user=os.environ.get("PGVECTOR_USER", "test"),
password=os.environ.get("PGVECTOR_PASSWORD", "xxx"),
)
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
PGVector.from_documents(
embedding=embeddings,
documents=shard,
collection_name="langchain_pg_collection01",
connection_string=PGVECTOR_CONNECTION_STRING,
distance_strategy=DistanceStrategy.COSINE,
pre_delete_collection=False
)
基于 elasticsearch 完成向量处理和向量数据的保存:
elastic_host = "xxx.xxx.xxx.xxx"
elasticsearch_url = f"http://elastic:McGX42vk5y705TQoA8c126bh@{elastic_host}:30003"
index_name="test_index"
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
ElasticVectorSearch.from_documents(shard, embeddings, index_name=index_name, elasticsearch_url=elasticsearch_url)
03
串行向量化
import os
import time
from typing import List
import numpy as np
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from tqdm import tqdm
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver=os.environ.get("PGVECTOR_DRIVER", "psycopg2"),
host=os.environ.get("PGVECTOR_HOST", "xxx.xxx.xxx.xxx"),
port=int(os.environ.get("PGVECTOR_PORT", "5432")),
database=os.environ.get("PGVECTOR_DATABASE", "xxx"),
user=os.environ.get("PGVECTOR_USER", "xxx"),
password=os.environ.get("PGVECTOR_PASSWORD", "xxx"),
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
length_function=len,
)
# Put your directory containing PDFs here
directory = '/root/ray/docker/know'
pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
print(f"pdf_documents is {pdf_documents}")
langchain_documents = []
for document in tqdm(pdf_documents):
try:
loader = PyPDFLoader(document)
data = loader.load()
langchain_documents.extend(data)
except Exception:
continue
print("Num pages: ", len(langchain_documents))
print("Splitting all documents")
chunks = text_splitter.split_documents(langchain_documents)
# Stage two: embed the docs.
print(f"Loading chunks into vector store ... using {db_shards} shards")
st = time.time()
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
PGVector.from_documents(
embedding=embeddings,
documents=chunks,
collection_name="langchain_pg_collection01",
connection_string=PGVECTOR_CONNECTION_STRING,
distance_strategy=DistanceStrategy.COSINE,
pre_delete_collection=False
)
et = time.time() - st
print(f"Shard processing complete. Time taken: {et} seconds.")
第一个阶段:主要是读取数据文件,并进行文本的拆分。
第二个阶段:使用拆分出来的文本,使用 Embeddings 模型进行向量化处理,这里使用的是 “text2vec-large-chinese” 模型,可以根据需要替换不同的 Embeddings 模型, 然后持久化到 pg 这个类型的向量数据库中。
04
并行向量化
并行指的是在处理的过程中有并发多任务处理能力,有 n 个 worker 并行的方式去运行各种任务。如果在数据量很大的情况下,整个数据的向量化处理能力,会随着可用资源的增多,有很明显的提升。能充分的利用好整个集群的可用资源去处理相关的任务。
import os
import time
from typing import List
import numpy as np
import ray
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from tqdm import tqdm
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver=os.environ.get("PGVECTOR_DRIVER", "psycopg2"),
host=os.environ.get("PGVECTOR_HOST", "10.29.26.43"),
port=int(os.environ.get("PGVECTOR_PORT", "5432")),
database=os.environ.get("PGVECTOR_DATABASE", "ats_dev"),
user=os.environ.get("PGVECTOR_USER", "ats"),
password=os.environ.get("PGVECTOR_PASSWORD", "ats.123456"),
)
db_shards = 8
ray.init()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
length_function=len,
)
# Put your directory containing PDFs here
directory = '/root/ray/docker/know'
pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
print(f"pdf_documents is {pdf_documents}")
langchain_documents = []
for document in tqdm(pdf_documents):
try:
loader = PyPDFLoader(document)
data = loader.load()
langchain_documents.extend(data)
except Exception:
continue
print("Num pages: ", len(langchain_documents))
print("Splitting all documents")
chunks = text_splitter.split_documents(langchain_documents)
@ray.remote(num_gpus=1)
def process_shard(shard):
print(f"Starting process_shard of {len(shard)} chunks.")
st = time.time()
embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
et = time.time() - st
print(f"Loading embeddings took {et} seconds.")
st = time.time()
db = PGVector.from_documents(
embedding=embeddings,
documents=shard,
collection_name="langchain_pg_collection01",
connection_string=PGVECTOR_CONNECTION_STRING,
distance_strategy=DistanceStrategy.COSINE,
pre_delete_collection=False
)
et = time.time() - st
print(f"Shard completed in {et} seconds.")
return 1
# Stage two: embed the docs.
print(f"Loading chunks into vector store ... using {db_shards} shards")
st = time.time()
shards = np.array_split(chunks, db_shards)
futures = [process_shard.remote(shards[i]) for i in range(db_shards)]
results = ray.get(futures)
et = time.time() - st
print(f"Shard processing complete. Time taken: {et} seconds.")
以上的 sample code 主要分为三个阶段:
第一个阶段:主要是读取数据文件,并进行文本的拆分,为后续的并行处理提供分组数据。这里将数据分成了 8 个分组,这里是根据 db_shards 设置的数值来决定的。
第二个阶段:使用拆分出来的文本,将拆分出来的数据进行分组,每组数据进行并行的向量化的处理。这里关键方法是@ray.remote(num_gpus=1) def process_shard(shard),这里使用到了 Ray Core 的 Task 的能力,Task 是可以充分利用好集群的资源,分布式的调度和运行。由于每一个负责分组处理的 Task 需要完成 Embeddings,而 Embeddings 是需要模型参与的,这里使用的是“text2vec-large-chinese”模型,可以根据需要替换不同的 Embeddings 模型, 这里是给每一个 Task 都分配一个 GPU 去独立的处理。这样的并行效率是很高的。虽然有一些模型是可以用 CPU 去运行的,但是效率还是非常低的。
第三个阶段:最后,将每组向量化后的数据进行合并后,再写入向量数据库,这样完成了整个并行化向量处理的能力。
05
串行知识问答
06
并行知识问答
import os
import time
from typing import List
import requests
from ray import serve
from starlette.requests import Request
import numpy as np
import ray
from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.base import Embeddings
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
from langchain.prompts import PromptTemplate
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from langchain.embeddings import HuggingFaceEmbeddings
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver=os.environ.get("PGVECTOR_DRIVER", "psycopg2"),
host=os.environ.get("PGVECTOR_HOST", "xxx.xxx.xxx.xxx"),
port=int(os.environ.get("PGVECTOR_PORT", "5432")),
database=os.environ.get("PGVECTOR_DATABASE", "xxx"),
user=os.environ.get("PGVECTOR_USER", "xxx"),
password=os.environ.get("PGVECTOR_PASSWORD", "xxx"),
)
template = """
<|SYSTEM|># chatglm2-6b
- You are a helpful, polite, fact-based agent for answering questions.
- Your answers include enough detail for someone to follow through on your suggestions.
<|USER|>
If you don't know the answer, just say that you don't know. Don't try to make up an answer.
Please answer the following query using the context provided.
CONTEXT:
{context}
=========
QUESTION: {query}
ANSWER: <|ASSISTANT|>"""
PROMPT = PromptTemplate(template=template, input_variables=["context", "query"])
class chatGLM():
def __init__(self, model_name) -> None:
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
self.model = AutoModel.from_pretrained(model_name, trust_remote_code=True).half().cuda().eval()
def __call__(self, prompt) -> any:
max_length = 10000
temperature = 0.01
response, _ = self.model.chat(self.tokenizer , prompt, max_length=max_length, temperature=temperature)
return response
class ChatglmChain():
def __init__(self, llm, prompt) -> None:
self.llm = llm
self.prompt = prompt
def run(self, query, context=None) -> any:
if context is not None:
prompt = self.prompt.format(query=query, context=context)
else:
prompt = self.prompt.format(query=query)
print("query=%s -> prompt=%s"%(query, prompt))
print("*"*60)
response = self.llm(prompt)
return response
@serve.deployment(num_replicas=1, ray_actor_options={"num_gpus": 1})
class VectorSearchDeployment:
def __init__(self):
st = time.time()
self.embeddings = HuggingFaceEmbeddings(model_name="/root/ray/docker/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
self.db = PGVector(
connection_string=PGVECTOR_CONNECTION_STRING,
embedding_function=self.embeddings,
collection_name="langchain_pg_collection01",
distance_strategy=DistanceStrategy.COSINE
)
et = time.time() - st
print(f"Loading database took {et} seconds.")
self.llm = chatGLM(model_name="/root/ray/docker/chatglm2-6b-int4")
st = time.time()
self.chain = ChatglmChain(llm=self.llm, prompt=PROMPT)
et = time.time() - st
print(f"Loading HF model took {et} seconds.")
print("===== end =====")
def search(self, query):
print(f"query is: {query}")
search_results = self.db.similarity_search_with_score(query, k=4)
context = ""
for pack in search_results:
doc, socre = pack
content = doc.page_content
print("检索到的知识=%s, from=%s, socre=%.3f"%(content, doc.metadata.get("from"), socre))
context += content
response = self.chain.run(query=query, context=context)
print(f"Result is: {response}")
return response
async def __call__(self, request: Request) -> List[str]:
return self.search(request.query_params["query"])
deployment = VectorSearchDeployment.bind()
以上的 sample code 主要分为五个阶段:
第一个阶段:首先定义了 QADeployment 类,这个类中完成整体的代码的封装和逻辑。这里将这个类标记成 Ray 的 Serve,只要将类使用相关的标注 “@serve.deployment(ray_actor_options={"num_gpus": 1})”。这里使用到了 Ray Core 的 Actor,以及使用 GPU 的资源来提供推理服务。同时在处理请求的时候,使用了 Ray Core 的 async 能力,让其可以支持异步方式处理请求。
第二个阶段:在__init__过程中,首先会从向量数据库中加载向量数据,这里使用了支持 pgvector 的 postgresql 数据库作为向量数据库;其次会设置用于对问题进行向量化处理的 embeddings 的模型,这里使用的是 “text2vec-large-chinese” 模型;然后指定 LLM,这里使用的是 “chatglm2-6b-int4” 模型;最后,使用 LangChain 定义 chain 对象,将 LLM,Prompt 等关联起来。至此,Serve 的初始化环节就结束了。
第三个阶段:在接收提问的过程中,首先会使用问题进行向量化处理后,再使用向量的相似度查询去获取问题相关的上下文,使用问题以及问题相关的上下文结合 PromptTemplate 完成 LLM 的输入内容的封装,最后去传递给 LLM 去做最后的问题的回答,得到 LLM 的回答。
第四个阶段:最后使用 deployment = QADeployment.bind()的方法定义 Ray Serve 应用,然后通过 serve run serve:deployment 的方式启动 serve 应用,对外提供推理服务。
第五个阶段:server 应用在本地启动的默认的监听的 port 是 8000。接下来就可以去 call 这个地址去完成问答了。
07
离线模型
在企业内部建议下载离线的模型,使用本地的方式加载模型,因为一般模型都很大,在运行的时候在线下载模型不是很稳定,耗时也很久。所以建议下载到本地。可以到https://huggingface.co/上去 search 到自己需要的模型,然后下载所有的相关文件到本地就可以。在构建 Dockerfile 的时候,将这些文件 COPY 到指定的容器的文件夹下,然后程序在加载 model 的时候,用包含模型文件的文件夹即可,包括 Embeddings 和 LLM 模型等。
08
镜像准备
向量构建相关:
from rayproject/ray:2.5.0.128cf3-py310
RUN sudo apt-get update
RUN sudo apt-get -y install libpq-dev python3-dev
COPY requirements.txt /tmp
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
COPY know.tar.gz /tmp
RUN tar -zxvf /tmp/know.tar.gz -C /tmp
from rayproject/ray-ml:2.5.0.87d3e6-py310-gpu
RUN sudo apt-get update
RUN sudo apt-get -y install libpq-dev python3-dev
COPY requirements.txt /tmp
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
COPY know.tar.gz /tmp
RUN tar -zxvf /tmp/know.tar.gz -C /tmp
问答系统相关:
from rayproject/ray:2.5.0.128cf3-py310
RUN sudo apt-get update
RUN sudo apt-get -y install libpq-dev python3-dev
COPY requirements.txt /tmp
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
COPY chatglm2-6b-int4 /tmp/chatglm2-6b-int4
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
RUN sudo mkdir /home/ray/qa
COPY qa.zip /home/ray/qa/
from rayproject/ray-ml:2.5.0.87d3e6-py310-gpu
RUN sudo apt-get update
RUN sudo apt-get -y install libpq-dev python3-dev
COPY requirements.txt /tmp
RUN pip install -r /tmp/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
COPY text2vec-large-chinese /tmp/text2vec-large-chinese
COPY chatglm2-6b-int4 /tmp/chatglm2-6b-int4
RUN rm /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
COPY pgvector.py /home/ray/anaconda3/lib/python3.10/site-packages/langchain/vectorstores/pgvector.py
RUN sudo mkdir /home/ray/qa
COPY qa.zip /home/ray/qa/
09
实践--向量构建
apiVersion: v1
kind: ConfigMap
metadata:
name: buildpgvector
data:
buildpgvector.py: |
import os
import time
from typing import List
import numpy as np
import ray
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from tqdm import tqdm
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver=os.environ.get("PGVECTOR_DRIVER", "psycopg2"),
host=os.environ.get("PGVECTOR_HOST", "xxx.xxx.xxx.xxx"),
port=int(os.environ.get("PGVECTOR_PORT", "5432")),
database=os.environ.get("PGVECTOR_DATABASE", "xxx"),
user=os.environ.get("PGVECTOR_USER", "xxx"),
password=os.environ.get("PGVECTOR_PASSWORD", "xxx"),
)
db_shards = 1
ray.init()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
length_function=len,
)
# Put your directory containing PDFs here
directory = '/tmp/know'
pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
print(f"pdf_documents is {pdf_documents}")
langchain_documents = []
for document in tqdm(pdf_documents):
try:
loader = PyPDFLoader(document)
data = loader.load()
langchain_documents.extend(data)
except Exception:
continue
print("Num pages: ", len(langchain_documents))
print("Splitting all documents")
chunks = text_splitter.split_documents(langchain_documents)
@ray.remote(num_gpus=1)
def process_shard(shard):
print(f"Starting process_shard of {len(shard)} chunks.")
st = time.time()
embeddings = HuggingFaceEmbeddings(model_name="/tmp/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
et = time.time() - st
print(f"Loading embeddings took {et} seconds.")
st = time.time()
db = PGVector.from_documents(
embedding=embeddings,
documents=shard,
collection_name="langchain_pg_collection01",
connection_string=PGVECTOR_CONNECTION_STRING,
distance_strategy=DistanceStrategy.COSINE,
pre_delete_collection=False
)
et = time.time() - st
print(f"Shard completed in {et} seconds.")
return 1
# Stage two: embed the docs.
print(f"Loading chunks into vector store ... using {db_shards} shards")
st = time.time()
shards = np.array_split(chunks, db_shards)
futures = [process_shard.remote(shards[i]) for i in range(db_shards)]
results = ray.get(futures)
et = time.time() - st
print(f"Shard processing complete. Time taken: {et} seconds.")
---
apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
name: buildpgvector
spec:
entrypoint: python /home/ray/samples/buildpgvector.py
runtimeEnv: ewogICAgInBpcCI6IFsKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9
rayClusterSpec:
rayVersion: '2.5.0'
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
num-gpus: "0"
serviceType: NodePort
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
- containerPort: 8000
name: serve
resources:
requests:
cpu: "1000m"
volumeMounts:
- mountPath: /home/ray/samples
name: buildpgvector
securityContext:
privileged: true
volumes:
- name: buildpgvector
configMap:
name: buildpgvector
items:
- key: buildpgvector.py
path: buildpgvector.py
workerGroupSpecs:
- replicas: 1
minReplicas: 1
maxReplicas: 5
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
nvidia.com/gpu: "1"
requests:
cpu: "1000m"
nvidia.com/gpu: "1"
securityContext:
privileged: true
查看 Ray Cluster 中的 Ray Job 的运行情况(使用 1 张 GPU 卡):从运行的日志看的出,在一个 GPU 的情况下使用了 43s 左右的时间处理了所有的数据。
构建向量索引(2 张 GPU):这里实用 kuberay 项目的 RayJob CRD 来完成向量的构建,它会负责创建一个 Ray Cluster,同时在 Ray Cluster 启动成功之后,将 configmap 中挂载的 Job 提交给 Ray Cluster,当 Job 中的所有任务运行结束之后,向量构建就算完成了。同时支持多个 GPU 并行进行文件的向量化处理, 这里 db_shards=2,是因为这里测试的是两张 GPU 卡的Serve 能力,所以设置成了 2,这样就可以完全并行起来了。
apiVersion: v1
kind: ConfigMap
metadata:
name: buildpgvector
data:
buildpgvector.py: |
import os
import time
from typing import List
import numpy as np
import ray
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.pgvector import PGVector
from langchain.vectorstores.pgvector import DistanceStrategy
from tqdm import tqdm
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings import HuggingFaceEmbeddings
PGVECTOR_CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver=os.environ.get("PGVECTOR_DRIVER", "psycopg2"),
host=os.environ.get("PGVECTOR_HOST", "xxx.xxx.xxx.xxx"),
port=int(os.environ.get("PGVECTOR_PORT", "5432")),
database=os.environ.get("PGVECTOR_DATABASE", "xxx"),
user=os.environ.get("PGVECTOR_USER", "xxx"),
password=os.environ.get("PGVECTOR_PASSWORD", "xxx"),
)
db_shards = 2
ray.init()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
length_function=len,
)
# Put your directory containing PDFs here
directory = '/tmp/know'
pdf_documents = [os.path.join(directory, filename) for filename in os.listdir(directory)]
print(f"pdf_documents is {pdf_documents}")
langchain_documents = []
for document in tqdm(pdf_documents):
try:
loader = PyPDFLoader(document)
data = loader.load()
langchain_documents.extend(data)
except Exception:
continue
print("Num pages: ", len(langchain_documents))
print("Splitting all documents")
chunks = text_splitter.split_documents(langchain_documents)
@ray.remote(num_gpus=1)
def process_shard(shard):
print(f"Starting process_shard of {len(shard)} chunks.")
st = time.time()
embeddings = HuggingFaceEmbeddings(model_name="/tmp/text2vec-large-chinese" ,model_kwargs={'device': "cuda"})
et = time.time() - st
print(f"Loading embeddings took {et} seconds.")
st = time.time()
db = PGVector.from_documents(
embedding=embeddings,
documents=shard,
collection_name="langchain_pg_collection01",
connection_string=PGVECTOR_CONNECTION_STRING,
distance_strategy=DistanceStrategy.COSINE,
pre_delete_collection=False
)
et = time.time() - st
print(f"Shard completed in {et} seconds.")
return 1
# Stage two: embed the docs.
print(f"Loading chunks into vector store ... using {db_shards} shards")
st = time.time()
shards = np.array_split(chunks, db_shards)
futures = [process_shard.remote(shards[i]) for i in range(db_shards)]
results = ray.get(futures)
et = time.time() - st
print(f"Shard processing complete. Time taken: {et} seconds.")
---
apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
name: buildpgvector
spec:
entrypoint: python /home/ray/samples/buildpgvector.py
runtimeEnv: ewogICAgInBpcCI6IFsKICAgIF0sCiAgICAiZW52X3ZhcnMiOiB7ImNvdW50ZXJfbmFtZSI6ICJ0ZXN0X2NvdW50ZXIifQp9
rayClusterSpec:
rayVersion: '2.5.0'
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
num-gpus: "0"
serviceType: NodePort
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
- containerPort: 8000
name: serve
resources:
requests:
cpu: "1000m"
volumeMounts:
- mountPath: /home/ray/samples
name: buildpgvector
securityContext:
privileged: true
volumes:
- name: buildpgvector
configMap:
name: buildpgvector
items:
- key: buildpgvector.py
path: buildpgvector.py
workerGroupSpecs:
- replicas: 2
minReplicas: 1
maxReplicas: 5
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
nvidia.com/gpu: "1"
requests:
cpu: "1000m"
nvidia.com/gpu: "1"
securityContext:
privileged: true
查看 Ray Cluster 中的 Ray Job 的运行情况(使用 2 张 GPU 卡):分析运行时间:这里还是相同的数据,但是使用的时间为,一个 task 用了 15s,一个 task 用了 25s,因为这里的两张 GPU 卡型号不同,性能不同。
10
实践--问答服务
问答服务(一张 GPU 卡)。这里使用了 RayService CRD 来完成 serve 环节。其中 RayService 也是会启动一个 Ray Cluster 集群,同时根据设置的 runtimeEnv 的 working_dir 的 code 所在路径作为工作路径,基于 importPath 的设置启动 Ray Serve。启动成功之后,会对外提供一个 NodePort 类型的 Kubernetes 的 Service,通过这个 Service 可以访问部署出来的基于 Embeddings 和 LLM 的 serve 了。访问的方式就是基于 http 的方式请求就可以。head group 使用 CPU 类型的镜像,worker group 使用 GPU 类型的镜像。这里测试使用一张 GPU 卡的情况,所以 Serve 的副本数只有 1 个,在 cr 的 numReplicas 中设置为 1。
apiVersion: v1
kind: ConfigMap
metadata:
name: query
data:
query.py: |
import sys
import requests
query = sys.argv[1]
response = requests.post(f"http://localhost:8000/?query={query}")
print(response.content.decode())
---
apiVersion: ray.io/v1alpha1
kind: RayService
metadata:
name: qallmpg
spec:
serviceUnhealthySecondThreshold: 3000
deploymentUnhealthySecondThreshold: 3000
serveConfig:
importPath: qa.deployment
runtimeEnv: |
working_dir: "file:///home/ray/qa/qa.zip"
deployments:
- name: VectorSearchDeployment
numReplicas: 1
rayActorOptions:
numGpus: 1 rayClusterConfig:
rayVersion: '2.5.0'
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
num-gpus: "0"
serviceType: NodePort
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
- containerPort: 8000
name: serve
resources:
requests:
cpu: "1000m"
securityContext:
privileged: true
volumeMounts:
- mountPath: /home/ray/query
name: query
volumes:
- name: query
configMap:
name: query
items:
- key: query.py
path: query.py
workerGroupSpecs:
- replicas: 1
minReplicas: 1
maxReplicas: 5
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
nvidia.com/gpu: "1"
requests:
cpu: "1000m"
nvidia.com/gpu: "1"
securityContext:
privileged: true
volumeMounts:
- mountPath: /home/ray/query
name: query
volumes:
- name: query
configMap:
name: query
items:
- key: query.py
path: query.py
查看 Ray Cluster 的运行情况和资源使用情况:一个 head 节点,一个 GPU 的 worker 节点。
查看 Ray Cluster 中的 Ray Serve 的运行情况:这里启动了一个副本进行提供服务。
apiVersion: v1
kind: ConfigMap
metadata:
name: query
data:
query.py: |
import sys
import requests
query = sys.argv[1]
response = requests.post(f"http://localhost:8000/?query={query}")
print(response.content.decode())
---
apiVersion: ray.io/v1alpha1
kind: RayService
metadata:
name: qallmpg
spec:
serviceUnhealthySecondThreshold: 3000
deploymentUnhealthySecondThreshold: 3000
serveConfig:
importPath: qa.deployment
runtimeEnv: |
working_dir: "file:///home/ray/qa/qa.zip"
deployments:
- name: VectorSearchDeployment
numReplicas: 2
rayActorOptions:
numGpus: 1
rayClusterConfig:
rayVersion: '2.5.0'
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
num-gpus: "0"
serviceType: NodePort
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-cpu:0.1.12
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
- containerPort: 8000
name: serve
resources:
requests:
cpu: "1000m"
securityContext:
privileged: true
volumeMounts:
- mountPath: /home/ray/query
name: query
volumes:
- name: query
configMap:
name: query
items:
- key: query.py
path: query.py
workerGroupSpecs:
- replicas: 2
minReplicas: 1
maxReplicas: 5
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-langchain-chatglm6b-text2veclarge-pg-es-gpu:0.1.12
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
nvidia.com/gpu: "1"
requests:
cpu: "1000m"
nvidia.com/gpu: "1"
securityContext:
privileged: true
volumeMounts:
- mountPath: /home/ray/query
name: query
volumes:
- name: query
configMap:
name: query
items:
- key: query.py
path: query.py
查看 Ray Cluster 的运行情况和资源使用情况:一个 head 节点,两个 worker 节点,每个 worker 节点占用一个 GPU。
测试问答系统:以下脚本是打包在镜像中的,可以在容器里/home/ray/query 路径下使用 query.py 去测试,如果是外部测试的话,需要根据 RayService 派生出来的 serve 的 Kubernetes Service 对应的 NodePort 去修改 localhost:8000 这个地址就可以。
kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.233.0.1 <none> 443/TCP 27d
qallmpg-head-svc NodePort 10.233.26.237 <none> 10001:31581/TCP,8265:31199/TCP,52365:32145/TCP,6379:31982/TCP,8080:31512/TCP,8000:30937/TCP 35s
qallmpg-raycluster-rpr9z-dashboard-svc NodePort 10.233.17.34 <none> 52365:30241/TCP 115s
qallmpg-raycluster-rpr9z-head-svc NodePort 10.233.22.76 <none> 10001:31899/TCP,8265:32540/TCP,52365:30413/TCP,6379:31509/TCP,8080:30152/TCP,8000:30252/TCP 115s
qallmpg-serve-svc NodePort 10.233.11.80 <none> 8000:31223/TCP 35s
vi query-external.py
import sys
import requests
query = sys.argv[1]
response = requests.post(f"http://10.29.26.99:31223/?query={query}")
print(response.content.decode())
提问:
1.一张 GPU 卡(性能较差的卡)
1.1 每次只问一个问题:
1.2. 每次同时问两个问题:
问题 1: karmada 包含哪些组件?
2. 两张 GPU 卡(一张性能较差的卡,一张性能较好的卡)
2.1. 每次只问同一个问题,但是请求被 LB 到不同的卡上的测试效果:
问题: karmada 包含哪些组件?
第一次 耗时:69.1 s。
第二次 耗时:14.5 s。
耗时:85.2 s。
11
总结
结合 Ray Core、Ray Serve、KubeRay、LangChain、Embeddings、向量数据库、LLM、Kubernetes 和 GPU 方案来构建知识库可以更大程度的充分利用资源,也可以更大程度的提升整体的灵活性和效率。
参考链接:
https://docs.ray.io/en/latest/
https://github.com/ray-project/kuberay
https://github.com/ray-project/langchain-ray
本文作者
现任「DaoCloud 道客」技术合伙人兼云原生技术专家
热门推荐
访问以下网址,或点击文末【阅读原文】直接下载
DaoCloud 公司简介
「DaoCloud 道客」云原生领域的创新领导者,成立于 2014 年底,拥有自主知识产权的核心技术,致力于打造开放的云操作系统为企业数字化转型赋能。产品能力覆盖云原生应用的开发、交付、运维全生命周期,并提供公有云、私有云和混合云等多种交付方式。成立迄今,公司已在金融科技、先进制造、智能汽车、零售网点、城市大脑等多个领域深耕,标杆客户包括交通银行、浦发银行、上汽集团、东风汽车、海尔集团、屈臣氏、金拱门(麦当劳)等。目前,公司已完成了 D 轮超亿元融资,被誉为科技领域准独角兽企业。公司在北京、南京、武汉、深圳、成都设立多家分公司及合资公司,总员工人数超过 350 人,是上海市高新技术企业、上海市“科技小巨人”企业和上海市“专精特新”企业,并入选了科创板培育企业名单。网址:www.daocloud.io
邮件:info@daocloud.io
电话:400 002 6898