一个Job在OneFlow中的执行过程—上篇
前言
之前,同事成诚创作了三篇文章:
以Top down的视角,介绍了分布式深度学习框架OneFlow的Actor、SBP、流式调度等先进的分布式系统架构设计及理念,本文将建立在前面的基础上,将基于上述文章,以Bottom up的角度,从一个简单的预测任务的demo入手,讲解一个Job(用户定义的训练/预测任务)在Oneflow中的调用入口、数据流转过程、从python端到c++端的代码执行流程,记录学习和梳理的过程。
下一篇文章,将详细梳理Job编译构图、Plan构建融合的过程。
需要特别说明的是,此系列文章仅在于描述流程、重点模块的代码,oneflow正处于快速版本更新和迭代过程,所以很多api在未来可能会有较大幅度的变动。目前在推进的工作有interface 1.0工作,包含了对齐pytorch api、multi client设计、构图编译期优化等,敬请期待!
Job执行流程
下面通过一个简单的静态图Job,描述一下在这个Job执行过程中的整体流程。 这个名为pad_Job的任务很简单,完整代码就10多行,目的是为输入的矩阵x通过reflection pad,得到输出矩阵y(可看做是一个非常简单的预测网络,输入x输出y)。
其主要方法即调用flow.reflection_pad2d()的API实现:
import oneflow as flow
import oneflow.typing as tp
import numpy as np
@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:
with flow.scope.placement("cpu", "0:0"):
loss = flow.reflection_pad2d(x, padding=1)
return loss
x = np.arange(18).reshape((2, 1, 3, 3)).astype(np.float32)
y = pad_Job(x)
print("in:\n", x, "y:\n", y)
输出:
in:
[[[[ 0. 1. 2.]
[ 3. 4. 5.]
[ 6. 7. 8.]]]
[[[ 9. 10. 11.]
[12. 13. 14.]
[15. 16. 17.]]]]
y:
[[[[ 4. 3. 4. 5. 4.]
[ 1. 0. 1. 2. 1.]
[ 4. 3. 4. 5. 4.]
[ 7. 6. 7. 8. 7.]
[ 4. 3. 4. 5. 4.]]]
[[[13. 12. 13. 14. 13.]
[10. 9. 10. 11. 10.]
[13. 12. 13. 14. 13.]
[16. 15. 16. 17. 16.]
[13. 12. 13. 14. 13.]]]]
首先,代码进入到global_function()中,由于是一个静态类型的job,故会触发执行_RunLazyJob()方法:
def _RunLazyJob(session, job_func, *args, **kwargs):
return session.TryInit().LazyRun(job_func, *args, **kwargs)
在其中,会初始化一个session对象,用于负责所有被global_function()修饰的job的执行(此处只定义了一个user job);初始化机器的环境;根据input op、中间op、output op的信息推导出所有op的shape、sbp属性等信息,从而编译job,构建出物理图(graph);根据物理图和集群环境等信息生成执行计划plan,并最终运行整个plan,返回输出数据,销毁session,完成整个job的执行过程。
代码调用过程
下面,还是通过这个简单的pad_Job,来讲解运行时从python到c++层面的代码调用流程,简单梳理下oneflow框架中的各个系统模块及代码调用流程。
import oneflow as flow
import oneflow.typing as tp
import numpy as np
@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:
with flow.scope.placement("cpu", "0:0"):
loss = flow.reflection_pad2d(x, padding=1)
return loss
x = np.arange(18).reshape((2, 1, 3, 3)).astype(np.float32)
y = pad_Job(x)
print("in:\n", x, "y:\n", y)
调用入口
首先,@flow.global_function()作为job调用的入口,会触发lazy_oneflow_function()[1]方法:
@enable_if.condition(
hob.in_normal_mode & ~hob.eager_execution_enabled & ~hob.session_initialized
)
def lazy_oneflow_function(function_config=FunctionConfig()):
assert isinstance(function_config, FunctionConfig)
def Decorator(job_func):
if not hasattr(job_func, "__oneflow_function_signature__"):
job_func.__oneflow_function_signature__ = inspect.signature(job_func)
oft_util.CheckGlobalFunctionAnnotation(job_func.__oneflow_function_signature__)
sess = session_ctx.GetDefaultSession()
@functools.wraps(job_func)
def Func(*args, **kwargs):
return _RunLazyJob(sess, job_func, *args, **kwargs)
sess.AddJob(_CloneFunctionDesc(function_config.function_desc, job_func))
for x in dir(job_func):
if x.startswith("__oneflow_"):
setattr(Func, x, getattr(job_func, x))
return Func
return Decorator
其中,lazy_oneflow_function()定义了一个闭包函数Decorator(),Decorator()中将job添加至session,并返回一个Func函数,当Func()被调用时,实际执行的是_RunLazyJob()[2]:
def _RunLazyJob(session, job_func, *args, **kwargs):
return session.TryInit().LazyRun(job_func, *args, **kwargs)
_RunLazyJob()将通过TryInit()方法将Session类对象初始化,并将用户在global function中定义的user job、系统的push/pull job、model io job等所有job编译成物理图task graph,然后将物理图编译成整体的运行时计划—plan,最后调用LazyRun()触发运行。
注:user job即用户定义的job,通常为训练或者预测任务;push/pull job则是在用户的user job编译为可执行plan时,系统自动添加的用于处理输入输出的系统级job;model io job则是用于保存/加载模型的系统级job。见:CompileAndMergePlanOnMaster()[3]
具体代码调用流程: python端:LaunchUserJob[6] >> push_util.AsyncPush() >> _AsyncPushArg() >> arg_blob_def.CheckAndAsyncPush(session, arg_ndarray) >> _MakePushNdarrayCallback() >> ofblob.CopyFromNdarray() >> oneflow_api.Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName()[7] >> c++端:Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName()[8]
TryInit()
TryInit()主要通过Init()[9]做了一些环境初始化、和静态图编译相关的事情,比如:
1.初始化集群环境 2.初始化静态图session对象 3.编译Job逻辑图 4.启动session,编译Job物理图&Plan生成 5.初始化model IO
代码如下:
def Init(self):
assert self.status_ is SessionStatus.OPEN
self.status_ = SessionStatus.RUNNING
if not oneflow_api.IsEnvInited():
oneflow.env.init()
_TryCompleteConfigProto(self.config_proto)
self.resource_ = self.config_proto.resource
if not oneflow_api.EagerExecutionEnabled():
c_api_util.InitLazyGlobalSession(self.config_proto)
for job_name, func_desc in self.job_name2function_desc_.items():
compiler.Compile(self, func_desc, self.config_proto)
self.existed_module_names_ = set()
self.job_name2var_name2var_blob_ = dict()
assert len(self.job_name2function_desc_.items()) > 0
oneflow_api.StartLazyGlobalSession()
self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()
# Get latest op_attr and job_name after compiler.Compile
self.UpdateInfo4InterfaceOp()
if not config_util.api_legacy_model_io_enabled():
check_point_v2.Init()
else:
self.eager_config_proto_ctx_ = oneflow_api.LogicalConfigProtoContext(
str(self.config_proto)
)
return self
1.初始化集群环境
主要通过oneflow.env.init() 初始化EnvProto对象,包含machine、ctrl_port、data_port、cpp_logging_conf这几个对象,用于记录集群环境中机器的环境信息(控制端口、数据端口)、log配置等信息。
2.初始化静态图session对象
python代码中的c_api_util.InitLazyGlobalSession()会调用session.h[10]中定义的InitLazyGlobalSession()[11]实际地创建一个c++的session对象。
inline Maybe<void> InitLazyGlobalSession(const std::string& config_proto_str) {
CHECK_NOTNULL_OR_RETURN(Global<EnvDesc>::Get()) << "env not found";
CHECK_OR_RETURN(Global<MachineCtx>::Get()->IsThisMachineMaster());
ClusterInstruction::MasterSendSessionStart();
ConfigProto config_proto;
CHECK_OR_RETURN(TxtString2PbMessage(config_proto_str, &config_proto))
<< "failed to parse config_proto: " << config_proto_str;
FixCpuDeviceNum(&config_proto);
Global<CtrlClient>::Get()->PushKV("config_proto", config_proto);
CHECK_ISNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get());
Global<SessionGlobalObjectsScope>::SetAllocated(new SessionGlobalObjectsScope());
JUST(Global<SessionGlobalObjectsScope>::Get()->Init(config_proto));
LOG(INFO) << "NewGlobal " << typeid(SessionGlobalObjectsScope).name();
return Maybe<void>::Ok();
}
首先,会通过GlobalGlobal<CtrlClient>::Get()->PushKV("config_proto", config_proto);
将config_proto和"config_proto"这个key绑定,用于后续的传递。最后初始化一个全局的session对象(SessionGlobalObjectsScope),拥有唯一的id。
3.编译Job逻辑图
主要通过compiler.Compile(self, func_desc, self.config_proto) 中的 _CompileJob[12]()来编译user_job、推导输入输出的op shape、sbp信息、构建Job基于op的逻辑图等,代码如下:
def Compile(session, function_desc, config_proto):
with InterpretScope(session, function_desc, config_proto):
_CompileJob(function_desc)
oneflow_api.CurJobBuildAndInferCtx_Complete()
def _CompileJob(function_desc):
func = function_desc.job_func
parameters = func.__oneflow_function_signature__.parameters
if len(parameters) == 0:
func.__oneflow_input_blob_defs__ = ()
elif all(p.annotation is inspect._empty for _, p in parameters.items()):
func.__oneflow_input_blob_defs__ = _GetArgDefault(func)
elif all(p.annotation is not inspect._empty for _, p in parameters.items()):
func.__oneflow_input_blob_defs__ = _MakeInputBlobDefFromParameterSignature(
parameters
)
else:
raise NotImplementedError(
"All parameters of global function should be annotated"
)
inputs = _RecursiveMakeInputBlobs(func.__oneflow_input_blob_defs__)
ret = func(*inputs)
return_annotation = func.__oneflow_function_signature__.return_annotation
oft_util.CheckReturnByAnnotation(func.__name__, ret, return_annotation)
func.__oneflow_output_remote_blobs__ = _RecursiveMakeRetRemoteBlobs(
ret, allow_cpu_return_op=function_desc.function_attribute.allow_cpu_return_op
)
其中,JobBuildAndInferCtx会保存已经加入的Op及其OpConf(SBP、shape等),并通过_RecursiveMakeInputBlobs()和_RecursiveMakeRetRemoteBlobs() 为userjob的输入输出自动插入相应的blob op,其中_RecursiveMakeInputBlobs会自动插入Input op用于数据输入;_RecursiveMakeRetRemoteBlobs将自动插入Return op用于数据输出。
_RecursiveMakeInputBlobs >> InputOpByArgBlobDef >>>>>>>>>>>>>>>>>>>>
op_conf >>>>>>>>>>>>>>>>>>>>
name: "Input_0"
device_tag: "gpu"
scope_symbol_id: 4611686018427420670
input_conf {
out: "out"
blob_conf {
shape {
dim: 2
dim: 1
dim: 3
dim: 3
}
data_type: kFloat
split_axis {
value: 0
}
batch_axis {
value: 0
}
is_dynamic: false
is_tensor_list: false
}
}
_RecursiveMakeRetRemoteBlobs >> LazyReturnRemoteBlob >>>>>>>>>>>>>>>>>>>>
op_conf >>>>>>>>>>>>>>>>>>>>
name: "Return_2"
device_tag: "cpu"
return_conf {
in: "Reflection_Pad2d1/y_0"
out: "out"
}
在经过上述操作后,Job推导时需要用到的所有op都已经齐全,将在后续的CurJobBuildAndInferCtx_Complete()[13]中调用c++的LazyJobBuildAndInferCtx::Complete()[14],开始Job逻辑图编译的主过程:
Maybe<void> LazyJobBuildAndInferCtx::Complete() {
CHECK_NOTNULL(Global<JobDesc>::Get());
Global<JobDesc>::Delete();
if (job().job_conf().has_train_conf()) {
CHECK_OR_RETURN(job().job_conf().train_conf().has_model_update_conf());
CHECK_OR_RETURN(job().job_conf().train_conf().has_primary_lr());
}
auto scope = std::make_unique<GlobalJobDescScope>(mut_job()->job_conf(), job_id());
JobPassCtx job_pass_ctx(GlobalJobDesc());
auto DoPass = [&](const std::string& pass_name "&") -> Maybe<void> {
return JobPass4Name(pass_name)(mut_job(), &job_pass_ctx);
};
if (GlobalJobDesc().Bool("__is_user_function__")) {
JUST(DoPass("CompleteOfrecordDecoder"));
JUST(DoPass("SetDefaultVariableConf"));
#ifdef WITH_CUDA
JUST(DoPass("AutoMixedPrecision"));
#endif
JUST(DoPass("OptimizerPlacementOptimizationPass"));
JUST(DoPass("DynamicLossScaleSchedulePass"));
JUST(DoPass("AutoTrainStep"));
JUST(DoPass("AutoLearningRate"));
JUST(DoPass("GenerateBackwardAndOptimizerOpConfs"));
JUST(DoPass("AddSspVariableProxy"));
JUST(DoPass("CheckpointingPass"));
JUST(DoPass("CudnnFusedNormalizationAddReluPass"));
JUST(DoPass("PruneCastToStaticShapeOpsPass"));
JUST(DoPass("FuseAddToOutputPass"));
JUST(DoPass("IndexedSlicesOptimizerRewritePass"));
JUST(DoPass("SplitSparseSoftmaxCrossEntropyOpPass"));
JUST(DoPass("DoParallelCastBeforeWideningTypeCast"));
JUST(DoPass("AddLbiDiffWatcherOpConfs"));
JUST(DoPass("FuseCastScalePass"));
JUST(DoPass("PruneParallelCastOpsPass"));
JUST(DoPass("FuseUpdateOpsPass"));
JUST(DoPass("DumpVariableInfoPass"));
}
JUST(DoPass("DumpTimeShapeAndBlobParallelConfPass"));
return Maybe<void>::Ok();
}
此过程主要基于由op节点(OpNode)构成的Job逻辑图(OpGragh),进行了一系列pass的系统优化过程,每个pass对逻辑图进行了一次图修改/重写,最终完成User Job的第一个阶段的编译过程。编译的具体过程,将在下一篇文章中详细说明。
4.启动session,编译Job物理图&Plan生成
这一步将会对之前编译完成的、由各种op节点构成的User Job(jobset)进一步处理,触发Oneflow::Init()过程,该过程主要内容为生成最终Jobs,编译Jobs物理图生成一个最终的运行时执行计划—Plan。
首先,通过oneflow_api.StartLazyGlobalSession()[15],启动session:
inline Maybe<void> StartLazyGlobalSession() {
CHECK_NOTNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get()) << "session not found";
CHECK_OR_RETURN(Global<MachineCtx>::Get()->IsThisMachineMaster());
const JobSet& job_set = Global<LazyJobBuildAndInferCtxMgr>::Get()->job_set();
if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
TeePersistentLogStream::Create("job_set.prototxt")->Write(job_set);
}
if (job_set.job().empty()) { return Error::JobSetEmptyError() << "no function defined"; }
CHECK_ISNULL_OR_RETURN(Global<Oneflow>::Get());
Global<CtrlClient>::Get()->PushKV("session_job_set", job_set);
Global<const InterJobReuseMemStrategy>::New(job_set.inter_job_reuse_mem_strategy());
Global<Oneflow>::New();
JUST(Global<Oneflow>::Get()->Init(job_set));
return Maybe<void>::Ok();
}
通过SessionGlobalObjectsScope
的Get()方法检查之前的SessionGlobalObjectsScope
对象是否成功Init,如果成功Init且本机为master节点,则进行后续操作:
通过LazyJobBuildAndInferCtxMgr获取本session的 job_set; 将job_set绑定到protobuf的key-value中,key为session_job_set; 创建一个job间内存复用管理策略的对象InterJobReuseMemStrategy; 创建一个Oneflow对象,并通过Oneflow::Init()[16]方法,触发job编译成plan的主过程并生成最终的可执行计划—Plan;
5.初始化model IO
主要通过check_point_v2.Init()[17]初始化model IO,用于加载/保存模型文件。至此基本完成了整个job从编译至生成运行时plan的完整流程。
编译Job物理图&Plan融合
此过程主要发生在Oneflow::Init()[18]方法中,Init过程的代码如下:
Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {
OF_PROFILER_RANGE_GUARD("Oneflow::Init");
// Runtime
OF_PROFILER_RANGE_PUSH("CompileAndMergePlanOnMaster");
JUST(CompileAndMergePlanOnMaster(job_set.job(), &plan_));
OF_PROFILER_RANGE_POP(); // CompileAndMergePlanOnMaster
if (Global<MachineCtx>::Get()->IsThisMachineMaster()) {
runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_));
}
OF_PROFILER_RANGE_PUSH("new Runtime");
runtime_.reset(new Runtime(plan_, GetMaxVal<size_t>(), false));
OF_PROFILER_RANGE_POP(); // new Runtime
return Maybe<void>::Ok();
}
主要过程为:
CompileAndMergePlanOnMaster() 将jobs编译成物理图并生成运行时计划—plan new RuntimeBuffersScope() 新建一个运行时buffer scope对象用于内存管理 new Runtime() 根据生成的最终可执行plan,新建运行时—runtime对象
CompileAndMergePlanOnMaster()[19]
在CompileAndMergePlanOnMaster()[20]的过程中,系统将动态为每个user job添加一个push job用于处理数据输入;添加一个pull job用于处理数据输出,如果开启enable_legacy_model_io则添加model io相关的job至jobs中; 最后通过
CompileCurJobOnMaster() MergeSubPlanWithoutGenNetTopo() MakeMainJob() CompileMainJob() LinkMainPlan()
等一系列方法将jobs物理图编译融合至Plan中,最后new Runtime()会创建一个运行时的runtimeruntime``_.reset(new Runtime(plan_``, GetMaxVal<size_t>(), false));
不过Runtime[21]并没有立即执行,而是会等待一个tick信号触发。通常,系统自动添加的Push Job中的ForeignInput Op[22] 内部维护一个buffer,该buffer等待Python端喂数据,一旦有数据输入此op,将触发tick信号,开启整个Plan的运行过程。
总结来说,编译时将每个job编译成运行时计划plan,并将所有plan融合的过程,分为几步:1) 将每个Job生成的Plan(SubPlan)合并到一个大的MergedPlan中 2) Job之间的内存复用和内存共享 3) 计算CriticalSection[23]4) 生成MainJob[24]5) 编译MainJob得到MainPlan 6) 将MainPlan和MergedPlan中每个Job生成的SubPlan进行link,得到最终的MergedPlan 关于CriticalSection[25]的设计和描述,请参考:仅此一文让您掌握OneFlow框架的系统设计(上篇)
附:Plan流程图(png/svg)制作
开启debug模式
export ONEFLOW_DEBUG_MODE=1
开启后,会保存log,log/dot目录下会有merged_plan.dot安装graphviz ubuntu下可通过
sudo apt-get install graphviz
命令根据merged_plan.dot创建svg/png图
dot -Tsvg merged_plan.dot > merged_plan.svg
或dot -Tpng merged_plan.dot > merged_plan.png
LazyRun()
TryInit()之后,整个执行计划plan构建完成,通过session.TryInit().LazyRun(job_func, *args, **kwargs)[26]发送tick信号,触发整个plan的运行。LazyRun()方法如下:[27]
def LazyRun(self, job_func, *arg):
assert self.status_ is SessionStatus.RUNNING
remote_blobs = self.LaunchUserJob(job_func, *arg)
if remote_blobs is None:
return
future_blob = LazyFutureRemoteBlobs(self).SetResult(remote_blobs).Inited()
annotation = inspect.signature(job_func).return_annotation
return oft_util.TransformGlobalFunctionResult(future_blob, annotation)
整个过程即传入输入数据至用户定义的user job中,计算得到最终的remote_blobs(remote_blobs = self.LaunchUserJob(job_func, *arg)),然后通过转换得到输出数据。
def LaunchUserJob(self, job_func, *arg):
assert self.status_ is SessionStatus.RUNNING
job_name = job_func.__name__
push_util.AsyncPush(self, job_func, *arg)
self.LaunchJob(job_instance_util.MakeUserJobInstance(job_name))
return job_func.__oneflow_output_remote_blobs__
其中,运行时Plan的触发是通过LaunchUserJob触发的,具体请看数据流转部分。
总结
总体来说,在oneflow中一个用户自定义的job执行过程,主要分为编译时和运行时。编译时将每个job编译成运行时计划plan;运行时,则是plan的实际执行过程。
以上,只是基于文章:仅此一文让您掌握OneFlow框架的系统设计(上篇)仅此一文让您掌握OneFlow框架的系统设计(上篇)")和代码,梳理出的粗浅总结,Oneflow的系统设计远不止上述这些,譬如:流控机制、Actor、Register、SBP抽象、支持epoll/RDMA通信的网络模块CommNet、内存管理模块、ID编址系统等、Op/Kernel的设计...
更多介绍,请参考:
参考资料
lazy_oneflow_function(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L150
[2]_RunLazyJob(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L220
[3]CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L897
[4]ForeignInput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_input_kernel.cpp%23L27
[5]ForeignOutput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_output_kernel.cpp%23L27
[6]LaunchUserJob: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L298
[7]oneflow_api.Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/ofblob.py#L113
[8]Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/ofblob/ofblob.e.h#L80
[9]Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L207
[10]session.h: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/session/session.h#L41
[11]InitLazyGlobalSession(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/session/session.h#L41
[12]_CompileJob: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/compiler.py#L86
[13]CurJobBuildAndInferCtx_Complete(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/compiler.py#L48
[14]LazyJobBuildAndInferCtx::Complete(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/job_build_and_infer_ctx.cpp#L939
[15]oneflow_api.StartLazyGlobalSession(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L221
[16]Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991
[17]check_point_v2.Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L226
[18]Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991
[19]CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L983
[20]CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L983
[21]Runtime: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/runtime.cpp#L63
[22]ForeignInput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_input_kernel.cpp%23L27
[23]CriticalSection: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L737
[24]MainJob: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L573
[25]CriticalSection: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L737
[26]session.TryInit().LazyRun(job_func, *args, **kwargs): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L221
[27]LazyRun()方法如下:: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L277
网页端点击阅读原文,获得更好的阅读体验