一个Job在OneFlow中的执行过程—下篇
本文为《一个Job在OneFlow中的执行过程》系列文章的下篇,也是最后一篇。在之前的文章《一个Job在OneFlow中的执行过程—上篇》中,从bottom up的角度,简单讲解了一个Job(用户定义的训练/预测任务)在Oneflow中的调用入口、数据流转过程、从python端到c++端的代码执行流程;在《一个Job在OneFlow中的执行过程—中篇》中重点介绍了编译期过程,详细梳理编译期过程细节、代码执行流程。
本文则重点讲解Runtime运行时过程,主要内容分为两部分:
1.Runtime运行时相关概念2.Runtime各个系统及代码梳理
其中,1.Runtime运行时相关概念主要围绕Plan、Actor、Task、Thread等展开;2.运行时各个系统主要包含:
运行时Actor 内存管理系统Memory Allocator和RegstMgr 线程管理系统Thread Manager 数据/消息通信系统CommNet 集合通信管理系统CollectiveBoxingExecutor
等系统/模块,由于每个部分都很庞大,故本文无法一一覆盖,重点在于通过代码梳理和介绍各个系统的作用及其相互关系,以及从Runtime启动到任务执行、结束、Runtime下线的完整过程。日后会推出系列文章,详细介绍OneFlow系统中各个模块的功能及设计,敬请期待!
需要特别说明的是,此系列文章仅在于描述流程、重点模块的代码,oneflow正处于快速版本更新和迭代过程,所以很多api在未来可能会有较大幅度的变动。目前在推进的工作有interface 1.0工作,包含了对齐pytorch api、multi client设计、构图编译期优化等,敬请期待!
1.运行时相关概念
1.1 Task、Actor和Plan
Plan是运行时计划,同时它也是由Jobs经过编译后得到的物理计算图,其包含Runtime运行时任务执行所需的所有信息。一个Plan拓扑图的示意如下:
Task通过oneflow/oneflow/core/job/task.proto[1]中的TaskProto定义了一个任务所需的基本信息:
message TaskProto {
// common
required TaskType task_type = 1;
required int64 machine_id = 2;
required int64 thrd_id = 3;
required int64 task_id = 4;
required int64 job_id = 5;
required TaskSetInfo task_set_info = 6;
required ExecSequence exec_sequence = 7;
map<string, RegstDescProto> produced_regst_desc = 8;
map<string, RegstDescIdSet> consumed_regst_desc_id = 9;
// compute task
optional ParallelContext parallel_ctx = 1000; // CompTask
};
其中,根据TaskType中的枚举类型,我们可以定义不同类型的Task,如:
普通前向任务kNormalForward 负责将host memory内存搬运到device上的kCopyHd; 可重入锁相关的kReentrantLock; 各种Tick信号任务相关的kDeviceTick、kSourceTick、kAccTick; 集合通信boxing相关的kCollectiveBoxingGeneric等
enum TaskType {
kInvalid = 0;
kNormalForward = 1;
kDecode = 5;
kCopyHd = 12;
kCopyCommNet = 13;
kPrint = 15;
kRecordLoad = 16;
kDeviceTick = 27;
kDecodeRandom = 29;
kPack = 30;
kUnpack = 32;
kRepeat = 34;
kAcc = 37;
kSourceTick = 40;
kTick = 41;
kAccTick = 42;
kCase = 43;
kEsac = 44;
kWaitAndSendIds = 45;
kReentrantLock = 46;
kCallbackNotify = 47;
kForeignInput = 48;
kForeignOutput = 49;
kDistributeConcat = 55;
kDistributeSplit = 56;
kSliceBoxing = 57;
kCollectiveBoxingGeneric = 58;
kBoxingIdentity = 59;
kDecodeH2D = 60;
kBoxingS2SAll2AllPack = 61;
kBoxingS2SAll2AllUnpack = 62;
kSspVariableProxy = 63;
kBoxingZeros = 64;
};
除了任务类型外,我们看到,还有map<string, RegstDescProto> produced_regst_desc = 8;
map<string, RegstDescIdSet> consumed_regst_desc_id = 9;
这些变量记录了Task运行期间消费/生产的Regst内存块信息,这些信息记录了任务间的执行顺序、消费关系的拓扑关系。
1.2 Thread和Message
Thread即线程,由全局的ThreadMgr管理,其作用主要负责Actor的生命周期:创建、运行、销毁等。 Thread和Actor是一对多的关系,一个Thread负责管理1~多个Actor,每个Actor都有其对应的唯一Thread进行管理。
Message即消息,因为Actor运行时虽然是去中心化的,不过通常本机Actor之间或和其他机器的Actor间,也需要通过消息传递信息,这时就需要用ActorMsgBus来进行消息的通信和传递。关于消息类型、以及负责消息传递的ActorMsgBus会在下文中详细说明。
之前同事成诚在文章《仅此一文让您掌握OneFlow框架的系统设计(下篇)》[2]中描述的很详细,我这里就直接搬运了:)
消息的类型
在oneflow/core/actor/actor_message.h[3]中,我们可以看到消息类型分为以下几种:
enum class ActorMsgType { kRegstMsg = 0, kEordMsg, kCmdMsg };
kRegstMsg: 表示这个ActorMsg包含了一个Regst。这是运行时Actor之间通信最主要的消息,生产者生产一个Regst通知下游消费者的消息、消费者使用完Regst返还给生产者说我用完了,都是RegstMsg。可以从ActorMsg的 regst()
接口中拿到该Regst。需要注意的是,无论是生产者通知消费者的消息,还是消费者用完的Ack消息,都是同一种消息。OneFlow的Actor通信中是不需要指明“Ack”的。各个Actor在处理ActorMsg的时候都可以从Regst中得知是不是Ack。kCmdMsg: 一些控制指令信号。不包含数据。如kConstructActor(Thread直接处理的消息,用于Thread创建Actor);kStart,Actor启动并开始工作。运行时靠着Start消息的传染,整个计算图开始工作。 kEordMsg: 表示任务结束,Actor可以切换到Zombie状态。运行时靠着Eord消息的传染,整个计算图中的Actor均切换到Zombie状态,等待销毁和RunTime下线。运行时的结束不是一下子就结束的,有可能计算图的源节点已经发出了Eord的信号,并将自己切换成Zombie状态,而计算图中的后半部分还在工作中。
消息的路由
ActorMsgBus相当于一个消息的路由,会判断该消息的目的地是否是本机,如果是本机,则通过ThreadMgr找到对应的Thread,然后EnqueueActorMsg。如果消息的目的地是其他机器,则通过Global对象CommNet将该消息发送给其他机器。其他机器的Global
Actor间的消息传递
当一个Actor需要给另一个Actor发消息时,会判断接收者Actor:
是否是本线程内: 如果是,则ActorMsgBus会找到本机内的对应线程Thread,传入到该Thread的Msg channel中 否则:调用本机器的CommNet对象传输该消息。 接收者所在机器的CommNet对象收到消息后会转给该机器的ActorMsgBus处理。该机器的ActorMsgBus会找到对应的线程Thread将该消息传入线程的MsgChannel中 如果是,则直接压入Thread的LocalMsgQueue中 (最快) 否则:调用本机器的ActorMsgBus传输数据。 ActorMsgBus会判断接收者是否在本机内 Thread会不断轮询自己的LocalMsgQueue,取出对应的消息找到对应的Actor去处理该消息。 如果LocalMsgQueue为空,则尝试去从MsgChannel中取消息放到LocalMsgQueue中。
1.3 Regst和多级内存体系
上面主要介绍了Actor、Thread、Message相关概念,下面说说Regst和OneFlow中的多级内存体系,毕竟Actor执行最多的还是矩阵计算相关的任务,而这些任务往往涉及到内存的使用和释放,这就涉及到RegstMgr和Regst。简单来说,Regst是OneFlow运行时的使用内存的最小单元;RegstMgr则是全局的Regst管理者。在Runtime初始化后,会通过NewAllGlobal(plan, total_piece_num, is_experiment_phase);
创建系统所需的各种全局所需的Global对象,如ThreadMgr、RegstMgr等。
RegstMgr
RegstMgr在初始化时就会根据Plan申请所有的本机上的内存:主机内存HostMemory、HostPinnedMemory(For CUDA CopyH2D)、设备内存DeviceMemory、锁页内存LockedMemory(For RDMA)等。并根据Plan中的Regst配置信息分配相应的内存地址给Regst。Regst的内存地址是固定的,直到运行时结束Regst的内存地址和大小都不会变化。OneFlow的静态内存管理是Runtime启动时统一分配,Runtime结束时统一销毁。运行时的内存调度开销是0。
Regst
Regst是OneFlow运行时的基本内存单元,也是基本的消息单元,Actor之间的通信、所有的数据生产、消费、回收都是Regst。由于OneFlow是静态内存分配,内存的分时复用调度是编译期的内存复用算法已经做好了(通过控制边+offset方式),所以运行时仅需要按照编译期生成的MemChunk、MemBlock、Regst的配置描述(RegstDescProto[4])信息一次性申请内存,并分配给对应的Regst即可。
Regst存储了两类信息:
生产者Actor id和消费者 Actor ids。一个Regst的生产者是唯一的,消费者可能有多个。 Blob的信息
多级内存体系
MemCaseRegst通过MemCase[5]标记了自己所属的内存类型,如果是GPU上的显存,还需要标记自己所属的DeviceId。如果是CPU上的主存,会标记该Regst是否是被CopyHD或CommNet所使用的。Regst通过MemBlockId和MemBlockOffset标记了自己所属于哪个MemBlock以及对应的偏移量。MemoryAllocator根据MemCase和Size申请对应大小和类型的内存块,返回内存块首地址; 根据内存地址回收内存。在Lazy情况下,仅在Runtime的启动/结束时(RegstMgr的构造函数和析构函数里)才会申请/释放内存。MemBlock与Chunk这是OneFlow的多级内存设计:Chunk -> MemBlock -> Regst
MemBlock: 同一个Chain(MemChain,通常是GPU上的前后向的所有activation regsts在一个MemChain中,Optimizer子图部分的Regst在各自的MemChain中)内的Regst根据分时复用的原则共用一个MemBlock的不同段,通过size和offset标记。内存复用算法会尽可能让MemBlock的Size小,同时满足互斥的Regst(生命周期有重叠的)不会有内存区域的重叠。 Chunk:一个Job内在同一块GPU上的MemBlock的合集称为一个Chunk。Chunk的Size是所有内部MemBlock的Size之和。(即同一个Chunk内部的MemBlock之间没有复用内存) 多个Job在同一个块GPU上的Chunk,会根据Job之间的互斥关系,完整复用一个大的Chunk(取最大值)作为最终的Chunk。如TrainJob和EvalJob互斥,所以TrainJob的所有可复用的Regst的总Chunk跟Eval的总Chunk合并复用一块内存。通常情况下,Eval只有前向,比TrainJob计算图要小,可以完全被TrainJob的Chunk所包含。即新增一个EvalJob不会新增任何内存。
2.Runtime各个系统及代码梳理
启动Runtime
在前面的文章中我们说过,session init后,会通过Oneflow::Init()[6]方法中的CompileAndMergePlanOnMaster()完成整个Job逻辑图、物理图的编译及Plan生成的过程:
Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {
OF_PROFILER_RANGE_GUARD("Oneflow::Init");
// Runtime
OF_PROFILER_RANGE_PUSH("CompileAndMergePlanOnMaster");
JUST(CompileAndMergePlanOnMaster(job_set.job(), &plan_));
OF_PROFILER_RANGE_POP(); // CompileAndMergePlanOnMaster
if (Global<MachineCtx>::Get()->IsThisMachineMaster()) {
runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_));
}
OF_PROFILER_RANGE_PUSH("new Runtime");
runtime_.reset(new Runtime(plan_, GetMaxVal<size_t>(), false));
OF_PROFILER_RANGE_POP(); // new Runtime
return Maybe<void>::Ok();
}
Plan生成后,就会创建新的Runtime运行时。Runtime部分的代码在oneflow/core/job/runtime.cpp#L63[7]
需要注意的是Runtime创建后,各个任务不立刻执行,而是等待tick信号触发后才会执行。
Runtime::Runtime(const Plan& plan, size_t total_piece_num, bool is_experiment_phase) {
NewAllGlobal(plan, total_piece_num, is_experiment_phase);
std::vector<const TaskProto*> source_tasks;
std::vector<const TaskProto*> other_tasks;
int64_t this_machine_task_num = 0;
for (const TaskProto& task : plan.task()) {
if (task.machine_id() != Global<MachineCtx>::Get()->this_machine_id()) { continue; }
if (!HasNonCtrlConsumedRegstDescId(task)) {
source_tasks.push_back(&task);
} else {
other_tasks.push_back(&task);
}
this_machine_task_num += 1;
}
RuntimeCtx* runtime_ctx = Global<RuntimeCtx>::Get();
runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num);
HandoutTasks(source_tasks);
HandoutTasks(other_tasks);
runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");
LOG(INFO) << "Actors on this machine constructed";
OF_SESSION_BARRIER();
LOG(INFO) << "Actors on every machine constructed";
if (Global<CommNet>::Get()) { Global<CommNet>::Get()->RegisterMemoryDone(); }
OF_SESSION_BARRIER();
runtime_ctx->NewCounter("running_actor_cnt", this_machine_task_num);
SendCmdMsg(source_tasks, ActorCmd::kStart);
}
Runtime部分的代码主要做了以下几件事情:
创建系统Global对象 遍历Task构建Actor对象 更改Actor工作状态(为待执行)
下面,我们将分别梳理这3个部分所涉及到的系统及代码。
2.1 创建系统Global对象
在Runtime启动后,首先通过NewAllGlobal(plan, total_piece_num, is_experiment_phase);
创建了系统所需的各种全局所需的Global对象,如CommNet、MemoryAllocator、RegstMgr、ActorMsgBus、ThreadMgr等。这些Global对象的作用,在文章《仅此一文让您掌握OneFlow框架的系统设计(下篇)》[8]中有介绍:
CommNet[9]: CommNet是OneFlow分布式训练中负责多机数据传输和消息通信的模块。底层有基于Epoll的实现和基于RDMA的实现。 boxing::collective::CollectiveBoxingExecutor & boxing::collective::CollectiveBoxingDeviceCtxPoller:负责执行集合通信操作(NCCL) MemoryAllocator[10]: 负责内存(Host内存 和 GPU显存)的申请与释放 RegstMgr[11]:负责创建所有的Regst (Mgr是Manager的缩写) ActorMsgBus[12]: 负责运行时Actor之间的消息通信 (Msg是Message的缩写) ThreadMgr[13]:负责创建和管理所有的Thread
2.2 遍历Task构建Actor对象
创建完各种运行时所需的Global对象后,会遍历Plan中的所有Task,并构建与其一一对应的Actor。
遍历Tasks
在Runtime中,通过plan.task()方法拿到运行时plan所包含的所有tasks,对其中将在本机执行的tasks分为source tasks和other tasks两类。
判断一个task是否为源task,主要通过HasNonCtrlConsumedRegstDescId()方法,简单来说,就是通过Task的
consumed_regst_desc_id
属性,若key-value对中不存在“in_ctrl”的key,则表示其为源task。
遍历后的Task分别插入对应的task vector中,这一部分代码如下:
std::vector<const TaskProto*> source_tasks;
std::vector<const TaskProto*> other_tasks;
int64_t this_machine_task_num = 0;
for (const TaskProto& task : plan.task()) {
if (task.machine_id() != Global<MachineCtx>::Get()->this_machine_id()) { continue; }
if (!HasNonCtrlConsumedRegstDescId(task)) {
source_tasks.push_back(&task);
} else {
other_tasks.push_back(&task);
}
this_machine_task_num += 1;
}
HandoutTasks
task区分开以后,针对source_tasks和other_tasks分别构造对应的Actor,并通过HandoutTasks方法触发source_tasks和other_tasks初始化相应的Actor:
RuntimeCtx* runtime_ctx = Global<RuntimeCtx>::Get();
runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num);
HandoutTasks(source_tasks);
HandoutTasks(other_tasks);
runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");
LOG(INFO) << "Actors on this machine constructed";
OF_SESSION_BARRIER();
LOG(INFO) << "Actors on every machine constructed";
具体的,HandoutTasks主要通过SendCmdMsg()方法来向消息路由器ActorMsgBus发送Actor初始化的指令:kConstructActor:
void SendCmdMsg(const std::vector<const TaskProto*>& tasks, ActorCmd cmd) {
for (const TaskProto* task : tasks) {
ActorMsg msg = ActorMsg::BuildCommandMsg(task->task_id(), cmd);
Global<ActorMsgBus>::Get()->SendMsg(msg);
}
}
void HandoutTasks(const std::vector<const TaskProto*>& tasks) {
for (const TaskProto* task : tasks) {
Global<ThreadMgr>::Get()->GetThrd(task->thrd_id())->AddTask(*task);
}
SendCmdMsg(tasks, ActorCmd::kConstructActor);
}
ActorMsgBus负责将Actor初始化的信息发送到相应机器上(本机/其他机器),最终Thread都会通过轮训本地的消息队列(local_msg_queue_)获取到Actor初始化到信息,并通过ConstructActor()方法,触发对应id的Actor完成初始化Init。
void Thread::PollMsgChannel(const ThreadCtx& thread_ctx) {
while (true) {
if (local_msg_queue_.empty()) {
CHECK_EQ(msg_channel_.ReceiveMany(&local_msg_queue_), kChannelStatusSuccess);
}
ActorMsg msg = std::move(local_msg_queue_.front());
local_msg_queue_.pop();
if (msg.msg_type() == ActorMsgType::kCmdMsg) {
if (msg.actor_cmd() == ActorCmd::kStopThread) {
CHECK(id2actor_ptr_.empty());
break;
} else if (msg.actor_cmd() == ActorCmd::kConstructActor) {
ConstructActor(msg.dst_actor_id(), thread_ctx);
continue;
} else {
// do nothing
}
}
.....
}
}
Actor初始化
Actor Init时主要做了以下事情:
根据ThreadCtx创建DeviceCtx。 运行时的Context有三级: ThreadCtx->DeviceCtx->KernelCtx 。
构造Kernel(ConstructKernel)
创建Regst(NewRegsts) 在调用RegstMgr->NewRegsts之前,RegstMgr已经给所有的Regst都申请好了内存,NewRegsts更应该像是GetRegsts。对于同一个RegstDesc,根据其regst_num会有多个Regst实例
处理下游消费的Regst(RegstDescId)以及Regst之间的Inplace
虚接口VirtualActorInit,供各个子类Actor自己重载自定义的初始化内容
初始化相关的代码在:oneflow/oneflow/core/actor/actor.cpp[14],当Actor初始化完毕以后,Actor就进入了等待状态。在Actor收到Eord信号并销毁之前,Actor一直都在等待状态和执行状态之间切换。
Actor内部有多种MsgHandler来处理消息(HandlerNormal和HandlerZombie)。在Actor正常运行过程中都使用HandlerNormal来处理消息。HandlerZombie用于Actor在有序退出时的消息管理。
2.3 更改Actor工作状态
Runtime()的最后一步,当所有的Actor初始化完毕后,通过:SendCmdMsg(source_tasks, ActorCmd::kStart);
往消息队列中发送kStart启动信号,触发source task相关的Actor处于启动状态,之后,整个运行时系统便处于待命状态,等待接受到任务启动信号后,才会正式执行。BufferMgr中获取数据,驱动相关actor任务的执行、展开。
2.4 任务执行
经过上面Runtime启动、Actor完成初始化后,整个运行时系统便处于待命状态,当python前端实际发出job执行的指令后,数据便开始从BufferMgr灌入相关的actor,正式开始任务的执行、数据流转和传递,直到整个Plan执行完成。
在文章《一个Job在OneFlow中的执行过程—上篇》[16]中,我们知道通过:session.TryInit().LazyRun(job_func, *args, **kwargs)[17]中的TryInit()完成了Job任务的逻辑图、物理图编译、生成运行时计划Plan,Plan生成后还会进一步完成Actor的初始化以及Runtime()的启动。在这之后,通过LazyRun()实际触发了Runtime的执行。
LazyRun[18]代码如下:
def LazyRun(self, job_func, *arg):
print("enter oneflow/python/framework/session_util.py >> LazyRun()")
assert self.status_ is SessionStatus.RUNNING
remote_blobs = self.LaunchUserJob(job_func, *arg)
if remote_blobs is None:
return
future_blob = LazyFutureRemoteBlobs(self).SetResult(remote_blobs).Inited()
annotation = inspect.signature(job_func).return_annotation
return oft_util.TransformGlobalFunctionResult(future_blob, annotation)
LazyRun的核心为LaunchUserJob:
def LaunchUserJob(self, job_func, *arg):
assert self.status_ is SessionStatus.RUNNING
print("enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchUserJob()")
job_name = job_func.__name__
push_util.AsyncPush(self, job_func, *arg)
print("enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchJob():", job_name)
self.LaunchJob(job_instance_util.MakeUserJobInstance(job_name))
return job_func.__oneflow_output_remote_blobs__
LaunchUserJob顾名思义,即启动用户定义的user job。在实际执行LaunchJob前,会通过push_util.AsyncPush添加并触发执行系统的push job使得数据从BufferMgr灌入相应actor,之后通过self.LaunchJob开启整个任务的实际执行。
我们在如下示例代码中,可以看见LaunchJob的过程依次为push job >> user job(pad_Job) >> pull job:
import oneflow as flow
import oneflow.typing as tp
import numpy as np
@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:
with flow.scope.placement("cpu", "0:0"):
loss = flow.reflection_pad2d(x, padding=1)
return loss
x = np.arange(18).reshape((2, 1, 3, 3)).astype(np.float32)
print(" >>>>>>>>>>>>>>>>>>>>> pad_Job begin")
y = pad_Job(x)
print(" >>>>>>>>>>>>>>>>>>>>> pad_Job done!")
print("in:\n", x, "y:\n", y)
输出:
>>>>>>>>>>>>>>>>>>>>> pad_Job begin
E0409 21:45:57.386700 1368235 env_global_objects_scope.cpp:96] using rpc backend: local
enter oneflow/python/framework/session_util.py >> LazyRun()
enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchUserJob()
enter method >>>>>>>>>>>>>>>>>>>>>>>>> AsyncPush.LaunchJob: System-Push-Input_0
enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchJob(): pad_Job
enter oneflow/python/framework/pull_util.py >> SetResult()
enter method >>>>>>>>>>>>>>>>>>>>>>>>> AsyncPull.LaunchJob: System-Pull-Return_2
>>>>>>>>>>>>>>>>>>>>> pad_Job done!
in:
[[[[ 0. 1. 2.]
[ 3. 4. 5.]
[ 6. 7. 8.]]]
[[[ 9. 10. 11.]
[12. 13. 14.]
[15. 16. 17.]]]] y:
[[[[ 4. 3. 4. 5. 4.]
[ 1. 0. 1. 2. 1.]
[ 4. 3. 4. 5. 4.]
[ 7. 6. 7. 8. 7.]
[ 4. 3. 4. 5. 4.]]]
[[[13. 12. 13. 14. 13.]
[10. 9. 10. 11. 10.]
[13. 12. 13. 14. 13.]
[16. 15. 16. 17. 16.]
[13. 12. 13. 14. 13.]]]]
2.5 任务结束
当所有的actor依次执行并完成(running_actor_cnt变为0)后,标志Plan中所有任务已完成(结束),同时也触发了Runtime()的析构[19],所有系统全局对象依次销毁,整个Plan的生命周期完结:
Runtime::~Runtime() {
Global<RuntimeCtx>::Get()->WaitUntilCntEqualZero("running_actor_cnt");
OF_SESSION_BARRIER();
DeleteAllGlobal();
}
参考资料
oneflow/oneflow/core/job/task.proto: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/task.proto
[2]《仅此一文让您掌握OneFlow框架的系统设计(下篇)》: https://zhuanlan.zhihu.com/p/339208452
[3]oneflow/core/actor/actor_message.h: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/actor/actor_message.h#L24
[4]RegstDescProto: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/register/register_desc.proto%23L32
[5]MemCase: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/memory/memory_case.proto%23L17
[6]Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991
[7]oneflow/core/job/runtime.cpp#L63: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/runtime.cpp#L63
[8]《仅此一文让您掌握OneFlow框架的系统设计(下篇)》: https://zhuanlan.zhihu.com/p/339208452
[9]CommNet: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/comm_network/comm_network.h%23L36
[10]MemoryAllocator: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/memory/memory_allocator.h%23L24
[11]RegstMgr: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/register/register_manager.h%23L32
[12]ActorMsgBus: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/actor/actor_message_bus.h%23L24
[13]ThreadMgr: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/thread/thread_manager.h%23L28
[14]oneflow/oneflow/core/actor/actor.cpp: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/actor/actor.cpp#L39
[15]ProcessMsg: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/actor/actor.h#L44
[16]《一个Job在OneFlow中的执行过程—上篇》: https://www.yuque.com/zhaoluyang/ai/ss1gc9#2t74Q
[17]session.TryInit().LazyRun(job_func, *args, **kwargs)
: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L221
LazyRun: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L277
[19]Runtime()的析构: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/runtime.cpp#L91
网页端点击阅读原文,获得更好的阅读体验