技术内幕 | StarRocks Pipeline 执行框架(下)
作者:冉攀峰,StarRocks 核心研发,知乎账号 satanson
导读:欢迎来到 StarRocks 技术内幕系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你快速上手这款明星开源数据库产品。本期 StarRocks 技术内幕将主要介绍 StarRocks Pipeline 执行框架的基本概念、原理及代码逻辑。
StarRocks Pipeline 执行框架(上)篇中,主要为大家讲解了 Pipeline 执行引擎想解决的问题及一般性原理。关于 Pipeline 执行引擎的实现, BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑,将在本篇中继续与大家分享。
#01
背景介绍
—
详见 StarRocks Pipeline 执行框架(上)篇
#02
基本概念
—
详见 StarRocks Pipeline 执行框架(上)篇
#03
源码解析
—
章节二的基本概念输入完以后,我们开始从以下几个方面解析 StarRocks 的源码:
BE 初始化全局对象的方法主要有两种:
将全局对象定义在 ExecEnv 对象中,参考 be/src/runtime/exec_env.h,be/src/runtime/exec_env.cpp文件。
定义全局性的单例(Singleton)对象,例如 be/src/exec/pipeline/query_context.cpp,如果对象本身可以独立完成初始化、不依赖参数设置、不依赖于其他对象的初始化顺序,则可以定义为单例。
Pipeline 执行引擎的全局性对象
PipelineDriver 执行器
定义为 ExecEnv::_driver_executor,类型为 pipeline::GlobalDriverExecutor,主要由执行线程池和轮询线程构成。其中执行线程的数量默认为机器的硬件核数,轮询线程数量为 1。
// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths)
std::unique_ptr<ThreadPool> driver_executor_thread_pool;
auto max_thread_num = std::thread::hardware_concurrency();
if (config::pipeline_exec_thread_pool_thread_num > 0) {
max_thread_num = config::pipeline_exec_thread_pool_thread_num;
}
LOG(INFO) << strings::Substitute("[PIPELINE] Exec thread pool: thread_num=$0", max_thread_num);
RETURN_IF_ERROR(ThreadPoolBuilder("pip_executor") // pipeline executor
.set_min_threads(0)
.set_max_threads(max_thread_num)
.set_max_queue_size(1000)
.set_idle_timeout(MonoDelta::FromMilliseconds(2000))
.build(&driver_executor_thread_pool));
_driver_executor = new pipeline::GlobalDriverExecutor(std::move(driver_executor_thread_pool), false);
_driver_executor->initialize(max_thread_num)
pipeline::GlobalDriverExecutor 的结构如下:
Pipeline IO 线程池
config::pipeline_scan_thread_pool_queue_size config::pipeline_scan_thread_pool_thread_num
// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths)
int num_io_threads = config::pipeline_scan_thread_pool_thread_num <= 0
? std::thread::hardware_concurrency()
: config::pipeline_scan_thread_pool_thread_num;
_pipeline_scan_io_thread_pool =
new PriorityThreadPool("pip_scan_io", // pipeline scan io
num_io_threads, config::pipeline_scan_thread_pool_queue_size);
定义为 ExecEnv._wg_driver_executor,WorkGroup 用于 Pipeline 执行引擎的资源隔离,主要的设计动机是为了把不同业务场景的 Workload 划分到相应的 WorkGroup中,每个 WorkGroup 有自己的 CPU、Memory 和并行数量资源 Quota。 每个 WorkGroup 按照资源 Quota 的限制复用计算资源,从而实现隔离性。(WorkGroup 的详细源码分析请参考专门解析文档,此处提及,是为了保证本文的完整性。) WorkGroup 执行器和 PipelineDriver 执行器功能类似,实现了基于 WorkGroup 的调度逻辑。
定义为ExecEnv._scan_executor,也用于 WorkGroup 功能,类似 Pipeline IO 线程池,该执行器可以根据 WorkGroup 的资源 Quota 限制,执行 ScanOperator 提交的异步化 IO 任务。
QueryContext 管理一个查询在某台执行节点上的全体 Fragment Instance,QueryContextManager 顾名思义就是对 QueryContext 进行操作,主要用于其生命周期的管理。参考源码文件:be/src/exec/pipeline/query_context.cpp。
QueryContext 和 FragmentContext
计算节点 BE 为查询维护下列对象:
QueryContext:在 QueryContextManager 中注册,拥有 FragmentContextManager 对象管理 Fragment Instance。
FragmentContext:在QueryContext.fragment_mgr 中注册,每个 Fragment Instance 对应一个 FragmentContext。
Pipelines:FragmentContext 包含一组 Pipeline,来源于 Fragment Instance 的执行子树的拆解。
Drivers:FragmentContext 包含一组 PipelineDriver,PipelineDriver 通过 Pipeline 创建,来自同一个 Pipeline 的 PipelineDriver 的数量,取决于 Pipeline 并行度。
MorselQueues:ScanOperator 和 MorselQueue 的映射表,MorselQueue 包含一组 Morsel,Morsel 是 ScanOperator 读取数据的分片。
Fragment Instance 执行逻辑的入口
BE 收到来自 FE 的 exec_plan_fragment 后,创建 FragmentExecutor 执行该 Fragment Instance,代码如下:
// 文件:/home/grakra/workspace/sr/be/src/service/internal_service.cpp
template <typename T>
void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController* cntl_base,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response, google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
auto st = _exec_plan_fragment(cntl);
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
}
st.to_protobuf(response->mutable_status());
}
PInternalServiceImpl::exec_plan_fragment 调用 _exec_plan_fragment:
// 文件:/home/grakra/workspace/sr/be/src/service/internal_service.cpp
template <typename T>
Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
auto ser_request = cntl->request_attachment().to_string();
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, TProtocolType::BINARY, &t_request));
}
bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
LOG(INFO) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
<< ", coord=" << t_request.coord << ", backend=" << t_request.backend_num
<< ", is_pipeline=" << is_pipeline << ", chunk_size=" << t_request.query_options.batch_size;
if (is_pipeline) {
auto fragment_executor = std::make_unique<starrocks::pipeline::FragmentExecutor>();
auto status = fragment_executor->prepare(_exec_env, t_request);
if (status.ok()) {
return fragment_executor->execute(_exec_env);
} else {
return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
}
} else {
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
}
}
调用 FragmentExecutor::prepare 函数, 初始化 Fragment Instance 的执行环境,创建和注册 QueryContex、FragmentContext,将Fragment Instance 拆分成 Pipelines,创建 PipelineDrivers。
调用 FragmentExecutor::execute 函数,向 Pipeline 执行线程提交 PipelineDrivers 运行。
FragmentExecutor::prepare 函数
参考 be/src/exec/pipeline/fragment_executor.cpp,主要的逻辑如下:
1. 判断 Fragment Instance 是否为重复投递,如果是,直接返回错误状态 Status::DuplicateRpcInvocation。
2. 注册或获得已有的 QueryContext,处理 Query 的第一个 Fragment Instance 时,注册 QueryContext,后续到达的 Fragment Instance 复用已注册的 QueryContext。设置 QueryContext 需要处理的 Fragment Instance 的数量和 Query 过期时间等参数,Query 过期时间用于自动取消长期得不到执行的 Query;如果 Query 有大量的 Fragment Instance,先到达的部分 Fragment Instance 完成执行而退出,在没有活跃的 Fragment Instance 的情况下,QueryContext 依然需要保留一段时间,等到后续 Fragment Instance 全部达到或者自动过期而取消执行。
3. 创建和初始化 FragmentContext 对象,FragmentContext 需要注册到 QueryContext.fragment_mgr 中,注册的时机为 FragmentContext::prepare 函数的末尾。因为有的异步逻辑(比如 Global Runtime Filter 的投递),需要访问 FragmentContext 的成员变量,在 FragmentContext 未完成所有的初始化之前注册,会对异步逻辑暴露 FragmentContext,导致访问未初始化的成员变量而出错。
4. 调用函数 Exec::create_tree 生成 Non-pipeline 执行树,使用 PipelineBuilder 将执行树拆解成为 Pipeline。
5. 调用 convert_scan_range_to_morsel 函数将 ScanNode 需要访问的 TScanRangeParams 转换为 ScanOperator 可访问的 Morsel。
6. 将 Non-pipeline 执行引擎的 DataSink 转换为 Pipeline 引擎的 SinkOperator,调用 _decompose_data_sink_to_operator
7. 根据 DOP(degree-of-parallelism),为 Pipeline 创建 PipelineDriver,并且将 MorselQueue 和相应的 ScanOperator 关联。
FragmentExecutor::execute函数
//文件: be/src/exec/pipeline/fragment_executor.cpp
Status FragmentExecutor::execute(ExecEnv* exec_env) {
for (const auto& driver : _fragment_ctx->drivers()) {
RETURN_IF_ERROR(driver->prepare(_fragment_ctx->runtime_state()));
}
if (_fragment_ctx->enable_resource_group()) {
for (const auto& driver : _fragment_ctx->drivers()) {
exec_env->wg_driver_executor()->submit(driver.get());
}
} else {
for (const auto& driver : _fragment_ctx->drivers()) {
exec_env->driver_executor()->submit(driver.get());
}
}
return Status::OK();
}
FragmentExecutor::execute 函数的主要操作如下:
1. 变量 FragmentContext 中的所有 PipelineDriver,执行 PipelineDriver::prepare 函数。该函数主要完成 PipelineDriver 范围的 profile 注册、调用每个算子的 prepare 函数、设置 Driver 之间前置等待条件,比如 HashJoin 左侧的 PipelineDriver 需要等待右侧 PipelineDriver 完成,消费 RuntimeFilter 的 PipelineDriver 需要等待生产 RuntimeFilter 的 PipelineDriver 完成。
2. 把 PipelineDriver 提交给 Pipeline 执行线程。PipelineDriver 提交后,FragmentExecutor 的生命周期结束,FragmentExecutor 是临时性的,禁止在 FragmentExecutor 中定义 PipelineDriver 可引用的对象。
BE 上的 PipelineBuilder 会把 PlanFragment 拆分成多个 Pipeline,拆分过程中,PlanFragment 中物理算子会转化为 Pipeline 算子。
物理算子
另外,在 Fragment Instance 中,一般用 DataSink 的子类描述该 Fragment Instance 计算结果的去向,比如 DataStreamSink 会把计算结果发给下游 Fragment Instance的ExchangeNode。在 Pipeline 执行引擎中,DataStreamSink 和 ExchangeNode 会分别转化为 ExchangeSinkOperator 和 ExchangeSourceOperator。
Pipeline 算子
pull_chunk:从算子中拉取 chunk,一般计算时,从一对算子的前置算子拉取 chunk,然后推给后继算子。
push_chunk:向算子推 chunk。
has_output:表示状态,当前算子可输出,可以执行 pull_chunk。
need_input:表示状态,当前算子可输入,可以执行 push_chunk。
is_finished:当前算子已经结束,不能执行 pull_chunk/push_chunk。
prepare:prepare 和 open 表达式和调用其他内部数据结构的 prepare 函数。
close:close 表达式和调用其他内部数据结构的 close 函数。
set_finishing:关闭输入,执行 set_finishing 之后,算子的 need_input 始终返回 false,不可调用 push_chunk,但算子内部可能有缓存的计算结果,has_output 可能返回 true,可以调用 pull_chunk。
set_finished:关闭输入和输出,调用后,is_finished、has_output、need_input 都返回 false,pull_chunk 和 push_chunk 不可调用,当 Pipeline 中 HashJoinProbeOperator 和 LimitOperator 算子产生短路并且提前结束时,需要调用前置算子 set_finished 函数。如果两个算子之间通过专门的 Context 交换数据,则 set_finished 函数中,需要正确地重置 Context 状态,一个算子需要感知到另外一个算子的 set_finished 函数调用。比如 LocalExchangeSinkOperator 和 LocalExchangeSourceOperator。
set_canceled:类似 set_finished,但表示算子异常结束,如果算子需要区分正常或者异常结束,则需要重载 set_canceled 函数,目前只有 ExchangeSinkOperator 用到该函数。
pending_finish:表示状态,当算子实现了异步化,算子结束时,异步化任务尚未完成,算子需要等待异步化任务结束后,才能销毁所在的 PipelineDriver。提前销毁 PipelineDriver 可能会导致异步化任务延后执行引用算子中的已销毁对象。
一个算子会经过 prepare -> finishing -> finished -> [cancelled] -> closed 的转换,Pipeline 执行引擎根据算子的状态, 执行相应的接口。
Pipeline 执行引擎中,每个算子有一个 OperatorFactory 类,Pipeline 由 OperatorFactory 组成,PipelineDriver 是 Pipeline 的实例,PipelineDriver 由 Operator 构成,OperatorFactory 创建 Operator 对象,从 Pipeline 创建 PipelineDriver 时,遍历 Pipeline 中的 OperatorFactory,调用 OperatorFactory::create 方法。
// 文件:be/src/exec/pipeline/pipeline.h
// 函数: Pipeline::create_operators
Operators create_operators(int32_t degree_of_parallelism, int32_t i) {
Operators operators;
for (const auto& factory : _op_factories) {
operators.emplace_back(factory->create(degree_of_parallelism, i));
}
return operators;
}
OperatorFactory 如下:
如果表达式不可重入,在计算时,算子在多个线程中执行,存在线程不安全问题。因此新添加的表达式,要保证:
2. 线程不安全的表达式,不在 OperatorFactory 中共享,每个算子有自己私有的副本。
Pipeline 拆分
// file: be/src/exec/pipeline/fragment_executor.cpp
// function: FragmentExecutor::prepare
ExecNode* plan = nullptr;
RETURN_IF_ERROR(ExecNode::create_tree(runtime_state, obj_pool, fragment.plan, *desc_tbl, &plan))
// ...
PipelineBuilderContext context(_fragment_ctx, degree_of_parallelism);
PipelineBuilder builder(context);
_fragment_ctx->set_pipelines(builder.build(*_fragment_ctx, plan))
首先通过 ExecNode::create_tree 函数获得 PlanFragment 的物理算子构成执行树。
初始化 PipelineBuilderContext 对象,传入 degree_of_parallelism 参数。
构建 PipelineBuilder 对象,调用 PipelineBuilder::build 拆分 Pipeline。
PipelineBuilder::build 主要从执行树 root 节点开始,递归调用 decompose_to_pipeline 函数。
Pipelines PipelineBuilder::build(const FragmentContext& fragment, ExecNode* exec_node) {
pipeline::OpFactories operators = exec_node->decompose_to_pipeline(&_context);
_context.add_pipeline(operators);
_context.get_pipelines().back()->set_root();
return _context.get_pipelines();
}
物理算子需要重载 ExecNode 的 decompose_to_pipeline 函数。
decompose_to_pipeline 函数递归地调用,完成算子的拆分。以 ProjectNode 为例,ProjectNode 调用decompose_to_pipeline 函数对 _children[0] 先完成 Pipeline 拆解,并返回 OperatorFactory 数组,然后 ProjectNode 自身转变为 ProjectOperatorFactory,追加 OperatorFactory 数组的末尾,参考下面代码:
pipeline::OpFactories ProjectNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
using namespace pipeline;
OpFactories operators = _children[0]->decompose_to_pipeline(context);
// Create a shared RefCountedRuntimeFilterCollector
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(1, std::move(this->runtime_filter_collector()));
operators.emplace_back(std::make_shared<ProjectOperatorFactory>(
context->next_operator_id(), id(), std::move(_slot_ids), std::move(_expr_ctxs),
std::move(_type_is_nullable), std::move(_common_sub_slot_ids), std::move(_common_sub_expr_ctxs)));
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(operators.back().get(), context, rc_rf_probe_collector);
if (limit() != -1) {
operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
}
return operators;
}
复杂算子的拆分可能会用到 LocalExchange 算子,目前 LocalExchange 算子支持 Passthrough、broadcast 和 shuffle 模式。更多复杂的算子拆分可以参考针对这些算子的详细源码解析。
DataSink 的拆分和 ExecNode 不同,可以参考函数 FragmentExecutor::_decompose_data_sink_to_operator,此处不再赘述。
PipelineDriver 的调度主要涉及下面几个函数:
GlobalDriverExecutor::worker_thread:Pipeline 引擎执行线程的入口函数,该函数持续从就绪 Driver 队列获取 PipelineDriver,执行 PipelineDriver::process 函数。在 PipelineDriver 阻塞或者时间片用完时,主动 yield,换其他就绪 PipelineDriver 执行。
PipelineDriver::process 函数:调用 Operator::pull_chunk/push_chunk 函数进行计算,判断 PipelineDriver 是否阻塞或者需要 yield。
PipelineDriverPoller::run_internal:阻塞 PipelineDriver 的轮询线程的函数,遍历阻塞 PipelineDriver,将已经解除阻塞的 PipelineDriver 放回就绪队列。
GlobalDriverExecutor::worker_thread
从就绪队列取 PipelineDriver。 执行 PipelineDriver::process 函数 PipelineDriver 执行完一轮之后,判断 PipelineDriver 的当前状态
PipelineDriver 正常结束,异常结束或者计算出错,则调用 PipelineDriver::finalize_driver 函数完成 PipelineDriver 的清理; PipelineDriver 仍然处于 RUNNING 状态,则设置其状态为 READY,放回就绪 Driver 队列; PipelineDriver 处于阻塞状态,则调用 PipelineDriverPoller->add_blocked_driver 函数,将 PipelineDriver 加入到阻塞 Driver 队列中。
就绪 Driver 队列采用多级反馈队列(mlfq)实现,小查询优先调度,同时避免大查询饥饿。 请参考 https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver_executor.cpp
PipelineDriver::process
遍历 PipelineDriver 中的相邻算子对,只有当两个算子 is_finished() 返回 false,前置算子 has_output() 返回 true,后置算子 need_input() 返回 true 时,调用前置算子的 pull_chunk 获得 chunk,调用后置算子的 push_chunk,将 chunk 推给它,从而完成 chunk 的转移。请参考: https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver.cpp
当 PipelineDriver 中转移的 chunk 数量超过 100 个,本轮累积执行时间超过 100ms,则主动 yield,退出当前 process,返回就绪队列,换其他 PipelineDriver 执行。
当 PipelineDriver 中当前无 chunk 可以移动,则说明 PipelineDriver 处于阻塞状态,退出当前 process,放回阻塞队列。
PipelineDriverPoller::run_internal
#04
总结
—
本文主要讲解了 Pipeline 执行引擎想解决的问题及一般性原理。针对 Pipeline 执行引擎的实现,着重说明了 BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑。
想要深入学习 StarRocks 的执行引擎,还需要研究 MPP 调度和向量化执行,后续我们会继续撰文与大家分享。
读到这里,好学的你是不是又产生了一些新思考与启发?
扫描下方用户群二维码加入 StarRocks 社区一起自由交流!
关于 StarRocks
面世两年多来,StarRocks 一直专注打造世界顶级的新一代极速全场景 MPP 数据库,帮助企业建立“极速统一”的数据分析新范式,助力企业全面数字化经营。
2021 年 9 月,StarRocks 源代码开放,在 GitHub 上的星数已超过 3400 个。StarRocks 的全球社区飞速成长,至今已有超百位贡献者,社群用户突破 7000 人,吸引几十家国内外行业头部企业参与共建。
StarRocks 技术内幕:
👇 阅读原文了解 StarRocks 产品详细信息