OneFlow源码解析:静态图与运行时
作者|郑建华
更新|许啸宇、张文骁、成诚
OneFlow静态图的训练效率远高于动态图(eager模式)。本文试图通过一个简单例子,结合v0.8.0版本的代码,解读一下静态图和运行时的实现机制。
在开始之前,建议先读一下参考资料中《OneFlow框架的系统设计(https://zhuanlan.zhihu.com/p/337851255)》等系列文章。对静态图、运行时的基本概念和设计理念有基本的了解,会更容易理解代码。
1
代码示例
下面的示例代码来自官方文档(https://docs.oneflow.org/master/basics/08_nn_graph.html),是一个线性模型的前向计算。后续主要基于这段代码进行分析。
import oneflow as flow
import oneflow.nn as nn
class ModuleMyLinear(nn.Module):
def __init__(self, in_features, out_features):
super().__init__()
self.weight = nn.Parameter(flow.randn(in_features, out_features))
self.bias = nn.Parameter(flow.randn(out_features))
def forward(self, input):
return flow.matmul(input, self.weight) + self.bias
linear_model = ModuleMyLinear(4, 3)
class GraphMyLinear(nn.Graph):
def __init__(self):
super().__init__()
# ModuleBlock
self.model = linear_model
def build(self, input):
# ModuleBlock.__call__
return self.model(input)
graph_mylinear = GraphMyLinear()
input = flow.randn(1, 4)
out = graph_mylinear(input)
print(out)
2
oneflow包的初始化
import oneflow在初始化包(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py)时,与静态图相关的主要操作如下:
GetEnv(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py#L228) EnvGlobalObjectsScope::Init(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L126) 启动各个节点的控制面(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L160-L162)网络连接
初始化VM(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L180)
启动各个节点的数据面网络连接(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L184-L188)
初始化KernelObserver(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L192-L203)
NewDefaultSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py#L229) RegsiterSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/multi_client_session.py#L39) 创建 Session,并注册为 default session(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/session_util.cpp#L89) 创建 Python MultiClientSession 并保存到dict(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/session_context.py#L40),但并不 TryInit 创建 C++ MultiClientSessionContext(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/multi_client_session.py#L41) 但并不 TryInit
EnvGlobalObjectsScope::Init中先创建一个全局的ProcessCtx(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L132)对象。然后根据环境变量等配置,在各个进程间创建gRPC和CommNet的连接,分别负责控制面和数据面的数据传输。其中在Bootstrap过程中会初始化全局的ProcessCtx(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/rpc/lib/grpc.cpp#L42),给每个进程分配一个全局唯一的rank编号(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/rpc/lib/global_process_ctx.cpp#L28)(machine_id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/rpc/lib/global_process_ctx.cpp#L24))。
本文不涉及网络层面的操作,只讨论同一进程内各线程间的交互。
3
Module类
虽然可以直接用op和tensor构造模型,但是op的粒度太细了,直接用op构造模型会比较繁琐。
Module(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/module.py#L54)是由op和tensor构成的、可复用的子模块。利用Module可以更高效、更快捷的构建复杂模型。oneflow.nn(https://github.com/Oneflow-Inc/oneflow/blob/d825243aa7aff5cba8bd3a901b4cc56c2b1a36af/python/oneflow/nn/__init__.py)模块导出了很多预定义的Module。
Module定义了自己的属性设置逻辑(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/module.py#L262),核心逻辑是
如果value是Parameter类型,就保存到Module._parameters中 如果value是Module类型,就保存到Module._modules中 如果value是Tensor类型,就保存到Module._buffers中 否则按常规属性处理
Module可以包含子Module,形成树结构。因为Module通过setattr将子Module和Parameter都保存到字典结构中,可以方便的遍历所有Module及其参数tensor。
4
Graph类
4.1 构造函数
Graph的构造函数中GetDefaultSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L145)得到的session,就是导入oneflow包时NewDefaultSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py#L229)构建的session。当时没有初始化,而是在Graph构造时进行初始化(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L147)。对应的C++函数是MultiClientSessionContext::TryInit(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/multi_client_session_context.cpp#L67),执行时会创建各种全局的资源管理器,比如:
LazyJobBuildAndInferCtxMgr BufferMgr RegstMgr ActorMsgBus ThreadMgr
4.2 __setattr__: 将Module和Tensor封装为Block
Graph.__setattr__ 支持通过设置属性的方式把一个 Module 添加到 Graph 中,之后改 Module 就可以被 Graph 调用了。添加到 Graph 中的 Module,会被包装到 Block 里面,Block 起到了代理执行的作用,它会给原 Eager 下的 Module 扩展出静态执行需要的一些特殊功能。
添加到 Graph 中的 Module 和原 Module 共享了状态(Parameter、Buffer)和 forward 执行逻辑。共享 forward 执行逻辑使得静态和动态执行计算逻辑相同。共享状态则可以使动态图下的模型状态被静态图复用。基于此,两个 Graph,一个用于训练,一个用于预测,他们都复用统一模型 Module,这样训练和预测 Graph 也就实现了模型共享。
setattr最重要的动作就是对_add_block的调用(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L1332),_add_block中主要是调用get_block_cls并保存结果(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L1326)。get_block_cls(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L39)的作用是将Module及其所有Tensor属性都转为对应的Block对象。为什么要做这个动作呢?主要是静态图编译需要借助Block类型来实现代理执行的功能,这些功能不适合直接写到 eager 下的 Module 和 Tensor 上。
这个转换是在ModuleBlock构造时调用set_origin(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L131)完成的。对于子Module,会递归调用get_block_cls函数(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L145),这样所有子Module及其Tensor属性都会被转换为对应的Block对象。
所以,上述示例代码中,GraphMyLinear实际存储的是ModuleBlock,Graph.build执行时获取的model属性也是ModuleBlock对象,ModuleBlock.origin才是ModuleMyLinear。
Graph.__setattr__不允许将Tensor对象设置为属性(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L1340)。Tensor只能存到Module中,因为 Module 是做状态共享的基本单位,而 Graph 是不允许复用的。
4.3 针对不同任务,定义不同的计算图
根据Oneflow Model Zoo的模型示例(https://github.com/Oneflow-Inc/models/blob/1b291f78d8f60e5f04ee0c5962e4611cc4bab40a/Vision/classification/image/alexnet/graph/train.py),train/eval等阶段可以创建不同的Graph子类。动态图下提供了 Module、Optimizer、Dataloader等模块,这些模型都可以被添加到 Graph 中。不同的组合可以构建不同类型的任务。
在这些不同阶段,Graph构造函数的行为、build函数的输入输出都有各自特点。了解这些,看后续代码时会更容易理解各个参数的具体含义。
构造函数 train阶段,需要添加Module、损失函数、优化器和dataloader eval阶段,只需要添加Module和dataloader build函数 train 导入样本和label 调用Module得到前向计算结果 计算损失 计算梯度 返回loss eval 导入样本和label 调用Module得到预估结果 返回预估结果和label
4.4 小结
上述几个类型的关系如下:
下面描述了GraphMyLinear的构造流程
* `__init__`
* `Graph.__init__`
* self.model = linear_model
* `Graph.__setattr__`
* _add_block
* get_block_cls: 递归地把Module转为ModuleBlock
* `ModuleBlock.__init__`
* ModuleBlock.set_origin
* `ModuleBlock._origin = origin` (Module)
* 对origin的sub modules, parameters, buffers递归调用get_block_cls
* `ModuleBlock.__setattr__`
5
逻辑图的编译
计算机语言的编译,是将高级语言的语句编译为汇编或机器指令。深度学习框架对计算任务的编译,是将用户的特定语句操作转换为DAG图。oneflow中用Job(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job.proto#L30)描述逻辑的计算图。
不同于eager模式的动态图,静态图在开始执行前可以得到整个计算任务的所有信息,可以对DAG进行多轮优化。每轮优化都是输入一个Job、得到一个新Job。
最后,根据分布式环境配置,将逻辑图Job转换为物理执行的计算图Plan(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/plan.proto#L34)。在物理图中,一个op可能分布在多个节点/进程。
启动DAG计算需要调用Graph.__call__,这个函数的执行主要分以下几个步骤:
__call__ _compile(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L221) if not _is_compiled build_graph(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L741) __build_graph(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L759) finish_complie_and_init_runtime(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L742) __run(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L226)
逻辑图编译主要在__build_graph中进行。finish_complie_and_init_runtime会继续做一些优化pass,然后构建物理图、初始化运行时Actor系统。__run会启动一次DAG的运算。
5.1 graph_build_context: 为逻辑图编译设置基本环境
在 Graph 中,build 函数里面的代码执行都在 graph_build_context 的作用域下,这样实现了动态转静态的功能。
__build_graph中的graph_build_context(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L851)虽然只有一行代码,但却做了几件非常重要的事情。
首先在context作用域内设置全局的lazy_mode为True(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L46)。在这个context作用域内,所有op都由LazyInterpreter解释执行。
其次,在JobBuildAndInferCtx(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L47)作用域内,JobBuildAndInferCtx_Open(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L57)调用类似如下C++代码
// oneflow/api/python/job_build/job_build_and_infer.h
// oneflow/core/job/job_build_and_infer_ctx_mgr.cpp
// 如前所述,LazyJobBuildAndInferCtxMgr 在 MultiClientSessionContext::TryInit 执行时初始化。
// LazyJobBuildAndInferCtxMgr mgr;
mgr.OpenJobBuildAndInferCtx(job_name);
OpenJobBuildAndInferCtx会新建一个Job对象(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx_mgr.cpp#L32)、一个LazyJobBuildAndInferCtx对象(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx_mgr.cpp#L34)。LazyJobBuildAndInferCtx负责根据用户定制的op等操作,修改Job,其中最主要的功能是添加新 Op。
5.2 __build_io:为计算图添加input和output Op
self.__build_io("input", graph_build_util.build_graph_input_arg, *args, **kwargs)
_named_io_args: NamedArg _value: tuple [0]: NamedArg _value: tuple of NamedArg [0]: NamedArg _value: args tensor from Graph.__call__ [1]: NamedArg _value: empty kwargs from Graph.__call__
5.2.1 将op添加到逻辑图
infer_ctx->AddAndInferConsistentOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L503)
AddAndInferOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx.cpp#L563)
ConstructOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx.cpp#L580)
CheckAndConstructOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/operator/operator.cpp#L1216)
NewObj(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/operator/operator.cpp#L51)
OperatorConf中,多种op配置共享op_type字段(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/operator/op_conf.proto#L412),protobuf oneof的op_type_case常量作为注册NewObj的key。
5.2.2 lazy tensor 和 eager tensor 的区别
5.3 build: 将UserOp和FeedVariableOp添加到逻辑图
-> out = graph_mylinear(input)
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(221)__call__()
-> self._compile(*args, **kwargs)
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(741)_compile()
-> _, eager_outputs = self.build_graph(*args, **kwargs)
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(759)build_graph()
-> outputs = self.__build_graph(*args, **kwargs)
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/graph.py(864)__build_graph()
-> outputs = self.build(*lazy_args, **lazy_kwargs)
/mnt/project/machine-learning/oneflow/oneflow/test.py(21)build()
-> return self.model(input)
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(234)__call__()
-> result = self.__block_forward(*args, **kwargs)
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(266)__block_forward()
-> result = self._origin.__class__.forward(self, *args, **kwargs)
/mnt/project/machine-learning/oneflow/oneflow/test.py(11)forward()
-> return flow.matmul(input, self.weight) + self.bias
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(483)__getattr__()
-> p_state = self._get_from_states(name, "_parameters")
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(521)_get_from_states()
-> _s_block.try_build()
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(679)try_build()
-> self._lazy_origin_builder.try_build(self)
/usr/local/lib64/python3.6/site-packages/oneflow/nn/graph/block.py(627)try_build()
-> self.result = self.method()
> /usr/local/lib64/python3.6/site-packages/oneflow/framework/graph_build_util.py(227)build_graph_state()
-> op_name, var_conf_str, ["in_0"], ["out_0"]
5.4 逻辑图优化
5.5 物理图的编译
5.6 Plan的结构
{
"out": {
"op_name": "System-AutoTick-DstSubsetTick_21",
"blob_name": "out"
},
"in_0": {
"op_name": "System-EagerCriticalSection-Interface-End-Tick-19",
"blob_name": "out"
},
"in_1": {
"op_name": "System-AutoTick-SrcSubsetTick_20",
"blob_name": "out"
}
}
{
"out": "29",
"in_0": "27",
"in_1": "28"
}
6 运行时的初始化
创建Thread 通知Thread创建Actor,Actor会创建Regst和Kernel 给没有输入的source_tasks发送启动信号kStart
6.1 Runtime创建Thread
6.2 Runtime通知Thread创建Actor
System-Src-WaitAndSendIds_16 System-AutoTick-AppendDeviceTick_9 System-EagerCriticalSection-Interface-End-Tick-19 System-EagerCriticalSection-Interface-End-Tick-25
6.3 ActorMsgBus和Thread的消息处理
Actor是自调度的基本单元,接受消息然后工作,工作完后再继续发送消息。 actor_id就是task_id,是在编译Plan时就确定的。task是编译时概念,actor是对等的运行时概念。
task_id有特定的编码格式(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/graph/task_id.cpp#L21-L29),从中可以解析出machine_id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/graph/task_id.cpp#L73)和thread_id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/graph/task_id.cpp#L77)。
在跨网络的整个物理图Plan中,actor id相当于地址,通过它可以定位唯一的actor实体。 Actor 通过 ActorMsgBus::SendMsg(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message_bus.cpp#L24) 发送 ActorMsg(https://github.com/Oneflow-Inc/oneflow/blob/4856d691051accd72f13f4139d281e411977b297/oneflow/core/lazy/actor/actor_message.h#L34) 消息。 ActorMsg包含源和目的actor id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message.h#L84-L85)。 如果是进程内通讯(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message_bus.cpp#L26),将通过 ActorMsgBus::SendMsgWithoutCommNet (https://github.com/Oneflow-Inc/oneflow/blob/4856d691051accd72f13f4139d281e411977b297/oneflow/core/lazy/actor/actor_message_bus.cpp#L49)把 ActorMsg 朝目的 actor 所在的 thread 入队消息(https://github.com/Oneflow-Inc/oneflow/blob/4856d691051accd72f13f4139d281e411977b297/oneflow/core/thread/thread.h#L40)。 Thread::EnqueueActorMsg 会判断当前 thread 是否是 actor thread,如果是则入本地队列,否则则入 actor thead 的 channel 队列。 如果ActorMsg是跨进程消息,ActorMsgBus通过CommNet发送消息(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message_bus.cpp#L42-L44),接收方的CommNet应该会根据actor id获得线程id,从ThreadMgr查到Thread,将消息交给Thread处理。 Thread::PollMsgChannel(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L60) 负责消息的接收和处理。 如果线程本地队列local_msg_queue_为空,则从thread的channel队列中取出全部ActorMsg放入本地队列(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L63)。 从本地队列中取出一个ActorMsg,然后开始处理。 处理一些特殊的kCmdMsg消息(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L67-L79),然后普通消息交给Actor自行处理(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L83)。 Actor收到消息后,会判断是否满足了Act的条件,如果满足,则会执行Act,从而调用LaunchKernel执行计算,Act执行结束后通过ActorMsgBus发消息通知上下游Actor。
6.4 激活source Actor
7 Actor
7.1 Actor的创建
kNormalForward,比如matmul、add等user op。 kCopyHd kTick kCollectiveBoxingGeneric
7.2 Actor的初始化
7.3 Actor的消息处理
7.4 Act执行的条件
Act执行依赖的数据是否都已经就绪?(IsReadReady) Act生产出来的数据,消费方是否已经用完、并收到ack消息确认?(IsWriteReady)
RegstSlot naive_produced_rs_; RegstSlot inplace_produced_rs_; RegstSlot naive_consumed_rs_; RegstSlot inplace_consumed_rs_;
7.5 Actor上下游之间的通知机制
AsyncSendNaiveProducedRegstMsgToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L427) VirtualAsyncSendNaiveProducedRegstMsgToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L441) HandleProducedNaiveDataRegstToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L446) HandleRegstToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L577) EnqueueAsyncMsg(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L523) 如果目标线程是当前线程,ActorMsgBus::SendMsg(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L662) 否则,将消息加入async_msg_queue_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L664) 增加 total_reading_cnt_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L526)(这个变量表示已经发消息给下游、但未收到的ack数量) naive_produced_rs_.PopFrontRegsts(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L581) AsyncSendProducedCtrlRegstMsgToConsumer
AsyncSendNaiveConsumedRegstMsgToProducer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L431)
AsyncRetInplaceConsumedRegstIfNoConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L432)在Actor::HandlerNormal中收到kRegstMsg消息后,将消息添加到consumed_rs_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L344),以保证当前Actor在收到所有依赖数据后是ReadReady的。
7.6 Act执行的动作
AsyncLaunchKernel(https://github.com/Oneflow-Inc/oneflow/blob/22f70a1719f371a54512633bb92086580d9c3c89/oneflow/core/lazy/actor/wait_and_send_ids_actor.cpp#L58)
ek.kernel->Launch(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L562),启动Kernel计算
Forward(https://github.com/Oneflow-Inc/oneflow/blob/eae9ff38f074479d79ce24b0f6e0594f82126171/oneflow/core/kernel/kernel.cpp#L52)
ForwardDataContent(https://github.com/Oneflow-Inc/oneflow/blob/eae9ff38f074479d79ce24b0f6e0594f82126171/oneflow/core/kernel/kernel.cpp#L65)
buffer->Pull(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/kernel/wait_and_send_ids_kernel.cpp#L40)
给regst的存储地址mut_dptr赋值(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/kernel/wait_and_send_ids_kernel.cpp#L47)
buffer->Pull会等待条件变量的通知(https://github.com/Oneflow-Inc/oneflow/blob/49f60e682518436dfeb37344a15902a959e0e4f2/oneflow/core/common/buffer.h#L60)。现在,看上去所有Actor都已准备就绪,只等发令枪一响就开跑了。
8 启动静态图的计算
RunLazyNNGraph(https://github.com/Oneflow-Inc/oneflow/blob/81edd938826a7ea903174d682348847658b64653/python/oneflow/nn/graph/graph.py#L1076) builder->LaunchLazyJob(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/nn_graph.cpp#L568) LaunchLazyJobInstructionType(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/instructions_builder.cpp#L179) Buffer::Push(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/instructions_builder.cpp#L179)
9 运行时的退出机制
9.1 Actor的退出
NNGraph在析构时,会关闭所有的Buffer对象(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/nn_graph.cpp#L82)。
Buffer在关闭时,会设置is_closed_ = true并通知所有监听者(https://github.com/Oneflow-Inc/oneflow/blob/49f60e682518436dfeb37344a15902a959e0e4f2/oneflow/core/common/buffer.h#L81)。但是Pull会继续处理完已经提交的计算。
所以,Buffer应该是主要用于进程内的通信和异步协调的一个类。 WaitAndSendIdsKernel这时候正在等待新一轮计算开始(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/kernel/wait_and_send_ids_kernel.cpp#L40),结果收到Pull返回的kBufferStatusErrorClosed(https://github.com/Oneflow-Inc/oneflow/blob/49f60e682518436dfeb37344a15902a959e0e4f2/oneflow/core/common/buffer.h#L61)。 WaitAndSendIdsActor::IsCustomizedReadReady以后就一直返回false(https://github.com/Oneflow-Inc/oneflow/blob/22f70a1719f371a54512633bb92086580d9c3c89/oneflow/core/lazy/actor/wait_and_send_ids_actor.cpp#L68),IsReadReady也返回false(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L533)。 这之后,ActUntilFail只会执行异步消息发送(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L437)(不再进入while循环)
WaitAndSendIdsActor::HandlerNormal仍然会处理其它Actor发来的消息(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L340)。但因为IsCustomizedReadReady返回false,会进入AsyncSendEORDMsgForAllProducedRegstDesc(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L394)执行。它会给每个下游发送kEordMsg消息(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L614)。 Actor在收到上游发来的kEordMsg消息后,递减remaining_eord_cnt_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L331)。 remaining_eord_cnt_被初始化为Actor的输入regst的数量(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L171)。 total_reading_cnt_是当前Actor生产的、已经发给consumer、但尚未收到ack的消息数量。 Actor目前仍可以正常接收consumer发来的ack消息。 当上述2个变量都为0时(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L395),意味着所有上游都发出了kEordMsg消息,也收到了所有下游的ack消息。Actor就给Thread返回1(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L397)。 如果上述两个变量有不为0的,就修改handler,由HandlerZombie(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L399)处理后续收到的消息。 Thread收到Actor返回的1后(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L84),将它从自己的存储中删除(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L89),并递减运行Actor的数量。
9.2 Thread的退出
NNGraph重置runtime_导致运行时对象被析构(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/nn_graph.cpp#L83)。 Runtime删除所有Thread(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/job/runtime.cpp#L117)。 ThreadMgr给所有Thread发送kStopThread消息(https://github.com/Oneflow-Inc/oneflow/blob/c8c6d351fa28c5ebce948d69c06670a783f83f74/oneflow/core/thread/thread_manager.cpp#L64)。同时,重置指针导致Thread析构(https://github.com/Oneflow-Inc/oneflow/blob/c8c6d351fa28c5ebce948d69c06670a783f83f74/oneflow/core/thread/thread_manager.cpp#L66)。 Thread的物理线程退出PollMsgChannel循环(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L68)。 Thread等待物理线程结束,关闭channel(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L52)。
10 分布式场景的静态图
10.1 示例代码
import oneflow as flow
import oneflow.nn as nn
P0 = flow.placement("cpu", ranks=[0, 1])
a0_sbp = flow.sbp.split(0)
class ModuleMyLinear(nn.Module):
def __init__(self, in_features, out_features):
super().__init__()
self.weight = nn.Parameter(flow.randn(in_features, out_features,
placement=P0, sbp=flow.sbp.broadcast))
self.bias = nn.Parameter(flow.randn(1, out_features,
placement=P0, sbp=flow.sbp.broadcast))
def forward(self, input):
return flow.matmul(input, self.weight) + self.bias
linear_model = ModuleMyLinear(4, 3)
class GraphMyLinear(nn.Graph):
def __init__(self):
super().__init__()
# ModuleBlock
self.model = linear_model
def build(self, input):
# ModuleBlock.__call__
return self.model(input)
graph_mylinear = GraphMyLinear()
input = flow.randn(5, 4, placement=P0, sbp=flow.sbp.split(1))
out = graph_mylinear(input)
print(out)
11 附录
11.1 断点
11.1.1 Python断点示例
# python3 -m pdb test.py
break test.py:25
break oneflow/nn/graph/graph.py:221
break oneflow/nn/graph/graph.py:741
break oneflow/nn/graph/graph.py:745
break oneflow/nn/graph/graph.py:759
break oneflow/nn/graph/graph.py:828
break oneflow/nn/graph/graph.py:777
break oneflow/nn/graph/graph.py:1066
break oneflow/nn/graph/graph.py:1133
break oneflow/framework/graph_build_util.py:227
# python3 -m pdb test.py
break test.py:25
break oneflow/nn/graph/graph.py:221
break oneflow/nn/graph/graph.py:741
break oneflow/nn/graph/graph.py:745
break oneflow/nn/graph/graph.py:759
break oneflow/nn/graph/graph.py:828
break oneflow/nn/graph/graph.py:777
break oneflow/nn/graph/graph.py:1066
break oneflow/nn/graph/graph.py:1133
break oneflow/framework/graph_build_util.py:227
11.1.2 C++断点示例
source /mnt/oneflow/build/source.sh
gdb --args python3 /mnt/oneflow/test.py
# set breakpoints
# run
set breakpoint pending on
break oneflow::ActorMsg::BuildEordMsg
break oneflow/core/common/buffer.h:80
break oneflow::(anonymous namespace)::CheckAndConstructOp
break oneflow::WaitAndSendIdsActor::Act
break oneflow::WaitAndSendIdsActor::HandlerWaitToStart
break oneflow/core/lazy/actor/light_actor.cpp:452
break oneflow/core/lazy/actor/light_actor.cpp:485
break oneflow::ForeignInputKernel::ForwardDataContent
break oneflow::vm::LaunchLazyJobInstructionType::Compute
11.2 静态图的json表示
forward(https://quip.com/OMc4A0HOOr0C) full(https://quip.com/JLaMAHGBLXmK) compiled(https://quip.com/tXjuAiS3J0Ab) plan(https://quip.com/a0DMAAIte6PQ)
11.3 actor type
System-AutoTick-AppendDeviceTick_9
System-AutoTick-DstSubsetTick_12
System-AutoTick-DstSubsetTick_21
System-AutoTick-DstSubsetTick_27
System-AutoTick-Prepend-DeviceTick_7
System-AutoTick-SrcSubsetTick_20
System-AutoTick-SrcSubsetTick_26
System-AutoTick-SrcSubsetTick_8
System-AutoTick-Tick_11
System-AutoTick-Tick_13
System-EagerCriticalSection-Callback-23
System-EagerCriticalSection-Callback-29
System-EagerCriticalSection-Interface-Begin-Tick-18
System-EagerCriticalSection-Interface-Begin-Tick-24
System-EagerCriticalSection-Interface-End-Tick-19
System-EagerCriticalSection-Interface-End-Tick-25
System-EagerCriticalSection-Wait-22
System-EagerCriticalSection-Wait-28
_GraphMyLinear_0_input.0.0_2
_GraphMyLinear_0_output.0.0_2
model.bias
model-broadcast_add-1
model-matmul-0
model.weight
System-AutoTick-SinkTick_15
System-SyncAllRanksSinkTick_14
System-Src-WaitAndSendIds_16
System-Sink-CallbackNotify_17
12 参考资料
oneflow v0.8.0(https://github.com/Oneflow-Inc/oneflow/tree/release/v0.8.0) OneFlow框架的系统设计(上篇)(https://zhuanlan.zhihu.com/p/337851255) OneFlow框架的系统设计(中篇)(https://zhuanlan.zhihu.com/p/338699487) OneFlow框架的系统设计(下篇)(https://zhuanlan.zhihu.com/p/339208452) 一个Job在OneFlow中的执行过程—上篇(https://zhuanlan.zhihu.com/p/344531540) 一个Job在OneFlow中的执行过程—中篇(https://zhuanlan.zhihu.com/p/355654002) 一个Job在OneFlow中的执行过程—下篇(https://zhuanlan.zhihu.com/p/363689736) 静态图模块 nn.Graph(https://docs.oneflow.org/master/basics/08_nn_graph.html) OneFlow系统设计(https://docs.oneflow.org/v0.4.0/basics_topics/essentials_of_oneflow.html) torch.nn.Module(https://pytorch.org/docs/1.10/generated/torch.nn.Module.html)
OneFlow源码解析:自动微分机制 ChatGPT的一小步,NLP范式转变的一大步 李白:你的模型权重很不错,可惜被我没收了 OpenAI掌门Sam Altman:AI下一个发展阶段 32篇年度最佳AI论文;Python编译器Codon开源 比快更快,开源Stable Diffusion刷新作图速度 OneEmbedding:单卡训练TB级推荐模型不是梦