查看原文
其他

一个Job在OneFlow中的执行过程—上篇

Lyon OneFlow 2022-05-10

前言

之前,同事成诚创作了三篇文章:

以Top down的视角,介绍了分布式深度学习框架OneFlow的Actor、SBP、流式调度等先进的分布式系统架构设计及理念,本文将建立在前面的基础上,将基于上述文章,以Bottom up的角度,从一个简单的预测任务的demo入手,讲解一个Job(用户定义的训练/预测任务)在Oneflow中的调用入口、数据流转过程、从python端到c++端的代码执行流程,记录学习和梳理的过程。

下一篇文章,将详细梳理Job编译构图、Plan构建融合的过程。

需要特别说明的是,此系列文章仅在于描述流程、重点模块的代码,oneflow正处于快速版本更新和迭代过程,所以很多api在未来可能会有较大幅度的变动。目前在推进的工作有interface 1.0工作,包含了对齐pytorch api、multi client设计、构图编译期优化等,敬请期待!


Job执行流程

下面通过一个简单的静态图Job,描述一下在这个Job执行过程中的整体流程。 这个名为pad_Job的任务很简单,完整代码就10多行,目的是为输入的矩阵x通过reflection pad,得到输出矩阵y(可看做是一个非常简单的预测网络,输入x输出y)。

其主要方法即调用flow.reflection_pad2d()的API实现:

import oneflow as flow
import oneflow.typing as tp
import numpy as np

@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:
    with flow.scope.placement("cpu""0:0"):
        loss = flow.reflection_pad2d(x, padding=1)
        return loss

x = np.arange(18).reshape((2133)).astype(np.float32)
y = pad_Job(x)
print("in:\n", x, "y:\n", y)

输出:

in:
 [[[[ 0.  1.  2.]
   [ 3.  4.  5.]
   [ 6.  7.  8.]]]


 [[[ 9. 10. 11.]
   [12. 13. 14.]
   [15. 16. 17.]]]] 
y:
 [[[[ 4.  3.  4.  5.  4.]
   [ 1.  0.  1.  2.  1.]
   [ 4.  3.  4.  5.  4.]
   [ 7.  6.  7.  8.  7.]
   [ 4.  3.  4.  5.  4.]]]


 [[[13. 12. 13. 14. 13.]
   [10.  9. 10. 11. 10.]
   [13. 12. 13. 14. 13.]
   [16. 15. 16. 17. 16.]
   [13. 12. 13. 14. 13.]]]]

首先,代码进入到global_function()中,由于是一个静态类型的job,故会触发执行_RunLazyJob()方法:

def _RunLazyJob(session, job_func, *args, **kwargs):
    return session.TryInit().LazyRun(job_func, *args, **kwargs)

在其中,会初始化一个session对象,用于负责所有被global_function()修饰的job的执行(此处只定义了一个user job);初始化机器的环境;根据input op、中间op、output op的信息推导出所有op的shape、sbp属性等信息,从而编译job,构建出物理图(graph);根据物理图和集群环境等信息生成执行计划plan,并最终运行整个plan,返回输出数据,销毁session,完成整个job的执行过程。

代码调用过程

下面,还是通过这个简单的pad_Job,来讲解运行时从python到c++层面的代码调用流程,简单梳理下oneflow框架中的各个系统模块及代码调用流程。

import oneflow as flow
import oneflow.typing as tp
import numpy as np

@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:
    with flow.scope.placement("cpu""0:0"):
        loss = flow.reflection_pad2d(x, padding=1)
        return loss

x = np.arange(18).reshape((2133)).astype(np.float32)
y = pad_Job(x)
print("in:\n", x, "y:\n", y)

调用入口

首先,@flow.global_function()作为job调用的入口,会触发lazy_oneflow_function()[1]方法:

@enable_if.condition(
    hob.in_normal_mode & ~hob.eager_execution_enabled & ~hob.session_initialized
)
def lazy_oneflow_function(function_config=FunctionConfig()):
    assert isinstance(function_config, FunctionConfig)

    def Decorator(job_func):
        if not hasattr(job_func, "__oneflow_function_signature__"):
            job_func.__oneflow_function_signature__ = inspect.signature(job_func)
        oft_util.CheckGlobalFunctionAnnotation(job_func.__oneflow_function_signature__)
        sess = session_ctx.GetDefaultSession()

        @functools.wraps(job_func)
        def Func(*args, **kwargs):
            return _RunLazyJob(sess, job_func, *args, **kwargs)

        sess.AddJob(_CloneFunctionDesc(function_config.function_desc, job_func))
        for x in dir(job_func):
            if x.startswith("__oneflow_"):
                setattr(Func, x, getattr(job_func, x))
        return Func

    return Decorator

其中,lazy_oneflow_function()定义了一个闭包函数Decorator(),Decorator()中将job添加至session,并返回一个Func函数,当Func()被调用时,实际执行的是_RunLazyJob()[2]

def _RunLazyJob(session, job_func, *args, **kwargs):
    return session.TryInit().LazyRun(job_func, *args, **kwargs)

_RunLazyJob()将通过TryInit()方法将Session类对象初始化,并将用户在global function中定义的user job、系统的push/pull job、model io job等所有job编译成物理图task graph,然后将物理图编译成整体的运行时计划—plan,最后调用LazyRun()触发运行。

注:user job即用户定义的job,通常为训练或者预测任务;push/pull job则是在用户的user job编译为可执行plan时,系统自动添加的用于处理输入输出的系统级job;model io job则是用于保存/加载模型的系统级job。见:CompileAndMergePlanOnMaster()[3]

数据流转

下面,我们跳出代码api层面,从数据层面看一下User Job的运行过程:首先,User Job可能有多个输入、多个输出,oneflow会遍历所有User Job中的Input Op和Return Op,针对每个Input Op,分别构建一个对应的Push Job;针对每个Return Op,分别构建一个对应的Pull Job。 User Job中,数据流转的示意图如下(看流程即可,不用细看数字):系统自动添加的Push Job用于接收输入数据,其ForeignInput Op[4] 内部维护一个buffer,该buffer等待Python端喂数据;Push Job处理完输入数据X1后,由于X1在Push Job和User Job间是内存共享的,可以直接被User Job所消费,从而继续被Op_a、Op_b处理,最后得到输出数据Y1;同样,系统添加的Pull Job专门用于处理输出数据,Pull Job中有一个ForeignOutput Op[5],其内部同样维护一个buffer,当往该buffer内填完数据以后,python端对应的of blob对象中的numpy就拷贝了对应的数据。从而完整整个从输入到输出的数据流转过程。

具体代码调用流程: python端:LaunchUserJob[6] >> push_util.AsyncPush() >> _AsyncPushArg() >> arg_blob_def.CheckAndAsyncPush(session, arg_ndarray) >> _MakePushNdarrayCallback() >> ofblob.CopyFromNdarray() >> oneflow_api.Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName()[7] >> c++端:Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName()[8]

TryInit()

TryInit()主要通过Init()[9]做了一些环境初始化、和静态图编译相关的事情,比如:

  • 1.初始化集群环境
  • 2.初始化静态图session对象
  • 3.编译Job逻辑图
  • 4.启动session,编译Job物理图&Plan生成 
  • 5.初始化model IO

代码如下:

def Init(self):
    assert self.status_ is SessionStatus.OPEN
    self.status_ = SessionStatus.RUNNING
    if not oneflow_api.IsEnvInited():
        oneflow.env.init()
    _TryCompleteConfigProto(self.config_proto)
    self.resource_ = self.config_proto.resource
    if not oneflow_api.EagerExecutionEnabled():
        c_api_util.InitLazyGlobalSession(self.config_proto)
        for job_name, func_desc in self.job_name2function_desc_.items():
            compiler.Compile(self, func_desc, self.config_proto)
            self.existed_module_names_ = set()
        self.job_name2var_name2var_blob_ = dict()
        assert len(self.job_name2function_desc_.items()) > 0
        oneflow_api.StartLazyGlobalSession()
        self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()
        # Get latest op_attr and job_name after compiler.Compile
        self.UpdateInfo4InterfaceOp()
        if not config_util.api_legacy_model_io_enabled():
            check_point_v2.Init()
    else:
        self.eager_config_proto_ctx_ = oneflow_api.LogicalConfigProtoContext(
            str(self.config_proto)
        )
    return self

1.初始化集群环境

主要通过oneflow.env.init() 初始化EnvProto对象,包含machine、ctrl_port、data_port、cpp_logging_conf这几个对象,用于记录集群环境中机器的环境信息(控制端口、数据端口)、log配置等信息。

2.初始化静态图session对象

python代码中的c_api_util.InitLazyGlobalSession()会调用session.h[10]中定义的InitLazyGlobalSession()[11]实际地创建一个c++的session对象。

inline Maybe<void> InitLazyGlobalSession(const std::string& config_proto_str) {
  CHECK_NOTNULL_OR_RETURN(Global<EnvDesc>::Get()) << "env not found";
  CHECK_OR_RETURN(Global<MachineCtx>::Get()->IsThisMachineMaster());

  ClusterInstruction::MasterSendSessionStart();

  ConfigProto config_proto;
  CHECK_OR_RETURN(TxtString2PbMessage(config_proto_str, &config_proto))
      << "failed to parse config_proto: " << config_proto_str;
  FixCpuDeviceNum(&config_proto);
  Global<CtrlClient>::Get()->PushKV("config_proto", config_proto);

  CHECK_ISNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get());
  Global<SessionGlobalObjectsScope>::SetAllocated(new SessionGlobalObjectsScope());
  JUST(Global<SessionGlobalObjectsScope>::Get()->Init(config_proto));
  LOG(INFO) << "NewGlobal " << typeid(SessionGlobalObjectsScope).name();
  return Maybe<void>::Ok();
}

首先,会通过Global::Get()->IsThisMachineMaster()来检查当前节点是否为master节点,是则继续否则return跳出。然后将string类型的config转换为protobuf的配置对象—config_proto,再通过:Global<CtrlClient>::Get()->PushKV("config_proto", config_proto);将config_proto和"config_proto"这个key绑定,用于后续的传递。最后初始化一个全局的session对象(SessionGlobalObjectsScope),拥有唯一的id。

3.编译Job逻辑图

主要通过compiler.Compile(self, func_desc, self.config_proto) 中的 _CompileJob[12]()来编译user_job、推导输入输出的op shape、sbp信息、构建Job基于op的逻辑图等,代码如下:

def Compile(session, function_desc, config_proto):
    with InterpretScope(session, function_desc, config_proto):
        _CompileJob(function_desc)
        oneflow_api.CurJobBuildAndInferCtx_Complete()
        
 
def _CompileJob(function_desc):
    func = function_desc.job_func
    parameters = func.__oneflow_function_signature__.parameters
    if len(parameters) == 0:
        func.__oneflow_input_blob_defs__ = ()
    elif all(p.annotation is inspect._empty for _, p in parameters.items()):
        func.__oneflow_input_blob_defs__ = _GetArgDefault(func)
    elif all(p.annotation is not inspect._empty for _, p in parameters.items()):
        func.__oneflow_input_blob_defs__ = _MakeInputBlobDefFromParameterSignature(
            parameters
        )
    else:
        raise NotImplementedError(
            "All parameters of global function should be annotated"
        )
    inputs = _RecursiveMakeInputBlobs(func.__oneflow_input_blob_defs__)
    ret = func(*inputs)
    return_annotation = func.__oneflow_function_signature__.return_annotation
    oft_util.CheckReturnByAnnotation(func.__name__, ret, return_annotation)
    func.__oneflow_output_remote_blobs__ = _RecursiveMakeRetRemoteBlobs(
        ret, allow_cpu_return_op=function_desc.function_attribute.allow_cpu_return_op
    )

其中,JobBuildAndInferCtx会保存已经加入的Op及其OpConf(SBP、shape等),并通过_RecursiveMakeInputBlobs()和_RecursiveMakeRetRemoteBlobs()  为userjob的输入输出自动插入相应的blob op,其中_RecursiveMakeInputBlobs会自动插入Input op用于数据输入;_RecursiveMakeRetRemoteBlobs将自动插入Return op用于数据输出。

_RecursiveMakeInputBlobs >> InputOpByArgBlobDef >>>>>>>>>>>>>>>>>>>>
op_conf >>>>>>>>>>>>>>>>>>>> 
 name: "Input_0"
device_tag: "gpu"
scope_symbol_id: 4611686018427420670
input_conf {
  out: "out"
  blob_conf {
    shape {
      dim: 2
      dim: 1
      dim: 3
      dim: 3
    }
    data_type: kFloat
    split_axis {
      value: 0
    }
    batch_axis {
      value: 0
    }
    is_dynamic: false
    is_tensor_list: false
  }
}


_RecursiveMakeRetRemoteBlobs >> LazyReturnRemoteBlob >>>>>>>>>>>>>>>>>>>>
op_conf >>>>>>>>>>>>>>>>>>>> 
 name: "Return_2"
device_tag: "cpu"
return_conf {
  in: "Reflection_Pad2d1/y_0"
  out: "out"
}

在经过上述操作后,Job推导时需要用到的所有op都已经齐全,将在后续的CurJobBuildAndInferCtx_Complete()[13]中调用c++的LazyJobBuildAndInferCtx::Complete()[14],开始Job逻辑图编译的主过程:

Maybe<void> LazyJobBuildAndInferCtx::Complete() {
  CHECK_NOTNULL(Global<JobDesc>::Get());
  Global<JobDesc>::Delete();
  if (job().job_conf().has_train_conf()) {
    CHECK_OR_RETURN(job().job_conf().train_conf().has_model_update_conf());
    CHECK_OR_RETURN(job().job_conf().train_conf().has_primary_lr());
  }
  auto scope = std::make_unique<GlobalJobDescScope>(mut_job()->job_conf(), job_id());
  JobPassCtx job_pass_ctx(GlobalJobDesc());
  auto DoPass = [&](const std::string& pass_name "&") -> Maybe<void> {
    return JobPass4Name(pass_name)(mut_job(), &job_pass_ctx);
  };
  if (GlobalJobDesc().Bool("__is_user_function__")) {
    JUST(DoPass("CompleteOfrecordDecoder"));
    JUST(DoPass("SetDefaultVariableConf"));
#ifdef WITH_CUDA
    JUST(DoPass("AutoMixedPrecision"));
#endif
    JUST(DoPass("OptimizerPlacementOptimizationPass"));
    JUST(DoPass("DynamicLossScaleSchedulePass"));
    JUST(DoPass("AutoTrainStep"));
    JUST(DoPass("AutoLearningRate"));
    JUST(DoPass("GenerateBackwardAndOptimizerOpConfs"));
    JUST(DoPass("AddSspVariableProxy"));
    JUST(DoPass("CheckpointingPass"));
    JUST(DoPass("CudnnFusedNormalizationAddReluPass"));
    JUST(DoPass("PruneCastToStaticShapeOpsPass"));
    JUST(DoPass("FuseAddToOutputPass"));
    JUST(DoPass("IndexedSlicesOptimizerRewritePass"));
    JUST(DoPass("SplitSparseSoftmaxCrossEntropyOpPass"));
    JUST(DoPass("DoParallelCastBeforeWideningTypeCast"));
    JUST(DoPass("AddLbiDiffWatcherOpConfs"));
    JUST(DoPass("FuseCastScalePass"));
    JUST(DoPass("PruneParallelCastOpsPass"));
    JUST(DoPass("FuseUpdateOpsPass"));
    JUST(DoPass("DumpVariableInfoPass"));
  }
  JUST(DoPass("DumpTimeShapeAndBlobParallelConfPass"));
  return Maybe<void>::Ok();
}

此过程主要基于由op节点(OpNode)构成的Job逻辑图(OpGragh),进行了一系列pass的系统优化过程,每个pass对逻辑图进行了一次图修改/重写,最终完成User Job的第一个阶段的编译过程。编译的具体过程,将在下一篇文章中详细说明。

4.启动session,编译Job物理图&Plan生成

这一步将会对之前编译完成的、由各种op节点构成的User Job(jobset)进一步处理,触发Oneflow::Init()过程,该过程主要内容为生成最终Jobs,编译Jobs物理图生成一个最终的运行时执行计划—Plan。

首先,通过oneflow_api.StartLazyGlobalSession()[15],启动session:

inline Maybe<void> StartLazyGlobalSession() {
  CHECK_NOTNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get()) << "session not found";
  CHECK_OR_RETURN(Global<MachineCtx>::Get()->IsThisMachineMaster());
  const JobSet& job_set = Global<LazyJobBuildAndInferCtxMgr>::Get()->job_set();
  if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
    TeePersistentLogStream::Create("job_set.prototxt")->Write(job_set);
  }
  if (job_set.job().empty()) { return Error::JobSetEmptyError() << "no function defined"; }
  CHECK_ISNULL_OR_RETURN(Global<Oneflow>::Get());
  Global<CtrlClient>::Get()->PushKV("session_job_set", job_set);
  Global<const InterJobReuseMemStrategy>::New(job_set.inter_job_reuse_mem_strategy());
  Global<Oneflow>::New();
  JUST(Global<Oneflow>::Get()->Init(job_set));
  return Maybe<void>::Ok();
}

通过SessionGlobalObjectsScope的Get()方法检查之前的SessionGlobalObjectsScope对象是否成功Init,如果成功Init且本机为master节点,则进行后续操作:

  • 通过LazyJobBuildAndInferCtxMgr获取本session的 job_set;
  • 将job_set绑定到protobuf的key-value中,key为session_job_set;
  • 创建一个job间内存复用管理策略的对象InterJobReuseMemStrategy;
  • 创建一个Oneflow对象,并通过Oneflow::Init()[16]方法,触发job编译成plan的主过程并生成最终的可执行计划—Plan;

5.初始化model IO

主要通过check_point_v2.Init()[17]初始化model IO,用于加载/保存模型文件。至此基本完成了整个job从编译至生成运行时plan的完整流程。

编译Job物理图&Plan融合

104751691-b0ea8a00-5790-11eb-94f3-8dff215b13eb.png

此过程主要发生在Oneflow::Init()[18]方法中,Init过程的代码如下:

Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {
  OF_PROFILER_RANGE_GUARD("Oneflow::Init");
  // Runtime
  OF_PROFILER_RANGE_PUSH("CompileAndMergePlanOnMaster");
  JUST(CompileAndMergePlanOnMaster(job_set.job(), &plan_));
  OF_PROFILER_RANGE_POP();  // CompileAndMergePlanOnMaster
  if (Global<MachineCtx>::Get()->IsThisMachineMaster()) {
    runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_));
  }
  OF_PROFILER_RANGE_PUSH("new Runtime");
  runtime_.reset(new Runtime(plan_, GetMaxVal<size_t>(), false));
  OF_PROFILER_RANGE_POP();  // new Runtime
  return Maybe<void>::Ok();
}

主要过程为:

  • CompileAndMergePlanOnMaster() 将jobs编译成物理图并生成运行时计划—plan
  • new RuntimeBuffersScope() 新建一个运行时buffer scope对象用于内存管理
  • new Runtime() 根据生成的最终可执行plan,新建运行时—runtime对象

CompileAndMergePlanOnMaster()[19]

CompileAndMergePlanOnMaster()[20]的过程中,系统将动态为每个user job添加一个push job用于处理数据输入;添加一个pull job用于处理数据输出,如果开启enable_legacy_model_io则添加model io相关的job至jobs中; 最后通过

  • CompileCurJobOnMaster()
  • MergeSubPlanWithoutGenNetTopo()
  • MakeMainJob()
  • CompileMainJob()
  • LinkMainPlan()

等一系列方法将jobs物理图编译融合至Plan中,最后new Runtime()会创建一个运行时的runtimeruntime``_.reset(new Runtime(plan_``, GetMaxVal<size_t>(), false));不过Runtime[21]并没有立即执行,而是会等待一个tick信号触发。通常,系统自动添加的Push Job中的ForeignInput Op[22] 内部维护一个buffer,该buffer等待Python端喂数据,一旦有数据输入此op,将触发tick信号,开启整个Plan的运行过程。

总结来说,编译时将每个job编译成运行时计划plan,并将所有plan融合的过程,分为几步:1) 将每个Job生成的Plan(SubPlan)合并到一个大的MergedPlan中 2) Job之间的内存复用和内存共享 3) 计算CriticalSection[23]4) 生成MainJob[24]5) 编译MainJob得到MainPlan 6) 将MainPlan和MergedPlan中每个Job生成的SubPlan进行link,得到最终的MergedPlan 关于CriticalSection[25]的设计和描述,请参考:仅此一文让您掌握OneFlow框架的系统设计(上篇)

附:Plan流程图(png/svg)制作

  1. 开启debug模式export ONEFLOW_DEBUG_MODE=1开启后,会保存log,log/dot目录下会有merged_plan.dot

  2. 安装graphviz ubuntu下可通过sudo apt-get install graphviz命令

  3. 根据merged_plan.dot创建svg/png图dot -Tsvg merged_plan.dot > merged_plan.svgdot -Tpng merged_plan.dot > merged_plan.png

LazyRun()

TryInit()之后,整个执行计划plan构建完成,通过session.TryInit().LazyRun(job_func, *args, **kwargs)[26]发送tick信号,触发整个plan的运行。LazyRun()方法如下:[27]

def LazyRun(self, job_func, *arg):
        assert self.status_ is SessionStatus.RUNNING
        remote_blobs = self.LaunchUserJob(job_func, *arg)
        if remote_blobs is None:
            return
        future_blob = LazyFutureRemoteBlobs(self).SetResult(remote_blobs).Inited()
        annotation = inspect.signature(job_func).return_annotation
        return oft_util.TransformGlobalFunctionResult(future_blob, annotation)

整个过程即传入输入数据至用户定义的user job中,计算得到最终的remote_blobs(remote_blobs = self.LaunchUserJob(job_func, *arg)),然后通过转换得到输出数据。

def LaunchUserJob(self, job_func, *arg):
        assert self.status_ is SessionStatus.RUNNING
        job_name = job_func.__name__
        push_util.AsyncPush(self, job_func, *arg)
        self.LaunchJob(job_instance_util.MakeUserJobInstance(job_name))
        return job_func.__oneflow_output_remote_blobs__

其中,运行时Plan的触发是通过LaunchUserJob触发的,具体请看数据流转部分。

总结

总体来说,在oneflow中一个用户自定义的job执行过程,主要分为编译时运行时。编译时将每个job编译成运行时计划plan;运行时,则是plan的实际执行过程。

以上,只是基于文章:仅此一文让您掌握OneFlow框架的系统设计(上篇)仅此一文让您掌握OneFlow框架的系统设计(上篇)")和代码,梳理出的粗浅总结,Oneflow的系统设计远不止上述这些,譬如:流控机制、Actor、Register、SBP抽象、支持epoll/RDMA通信的网络模块CommNet、内存管理模块、ID编址系统等、Op/Kernel的设计...

更多介绍,请参考:

参考资料

[1]

lazy_oneflow_function(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L150

[2]

_RunLazyJob(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L220

[3]

CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L897

[4]

ForeignInput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_input_kernel.cpp%23L27

[5]

ForeignOutput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_output_kernel.cpp%23L27

[6]

LaunchUserJob: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L298

[7]

oneflow_api.Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/ofblob.py#L113

[8]

Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/ofblob/ofblob.e.h#L80

[9]

Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L207

[10]

session.h: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/session/session.h#L41

[11]

InitLazyGlobalSession(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/session/session.h#L41

[12]

_CompileJob: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/compiler.py#L86

[13]

CurJobBuildAndInferCtx_Complete(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/compiler.py#L48

[14]

LazyJobBuildAndInferCtx::Complete(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/job_build_and_infer_ctx.cpp#L939

[15]

oneflow_api.StartLazyGlobalSession(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L221

[16]

Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991

[17]

check_point_v2.Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L226

[18]

Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991

[19]

CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L983

[20]

CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L983

[21]

Runtime: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/runtime.cpp#L63

[22]

ForeignInput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_input_kernel.cpp%23L27

[23]

CriticalSection: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L737

[24]

MainJob: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L573

[25]

CriticalSection: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L737

[26]

session.TryInit().LazyRun(job_func, *args, **kwargs): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L221

[27]

LazyRun()方法如下:: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L277


网页端点击阅读原文,获得更好的阅读体验

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存