查看原文
其他

【他山之石】autograd源码剖析

“他山之石,可以攻玉”,站在巨人的肩膀才能看得更高,走得更远。在科研的道路上,更需借助东风才能更快前行。为此,我们特别搜集整理了一些实用的代码链接,数据集,软件,编程技巧等,开辟“他山之石”专栏,助你乘风破浪,一路奋勇向前,敬请关注。

作者:知乎—adamYao

地址:https://www.zhihu.com/people/xiao-yao-81-19-18


01

总体框架
在模型训练过程中,一轮的步骤如下:
  1. 网络forward计算出损失loss;
  2. 调用loss(Tensor及其子类)的backward()函数进行backward;
  3. 计算得出叶子节点的梯度
  4. 更新梯度
在pytorch中, 模型的训练是通过autograd模块来实现的. 通过在python层面的tensor的backward函数调用进入torch.autograd.backward函数, 该函数会启动execution engine, execution engine的具体实现由csrc中的autograd具体实现, 在该engine中完成训练.
主要的调用过程如下:
tensor.backward() // tensor.py ||calltorch.autograd.backward() // torch/autograd/__init__.py ||callVariable._execution_engine.run_backward() // torch/autograd/__init__.py ||returnImperativeEngine() // variable.py ||run_backward方法实际callTHPEngine_run_backward 该步为backward的核心步骤, 具体实现见2:关键细节 // python_engine.cpp

02

关键细节

2.1: backward()

函数定义于torch/tensor.py中,如下所示:
def backward(self, gradient=None, retain_graph=None, create_graph=False): r"""Computes the gradient of current tensor w.r.t. graph leaves. The graph is differentiated using the chain rule. If the tensor is non-scalar (i.e. its data has more than one element) and requires gradient, the function additionally requires specifying ``gradient``. It should be a tensor of matching type and location, that contains the gradient of the differentiated function w.r.t. ``self``. This function accumulates gradients in the leaves - you might need to zero ``.grad`` attributes or set them to ``None`` before calling it. See :ref:`Default gradient layouts<default-grad-layouts>` for details on the memory layout of accumulated gradients. Arguments: gradient (Tensor or None): Gradient w.r.t. the tensor. If it is a tensor, it will be automatically converted to a Tensor that does not require grad unless ``create_graph`` is True. None values can be specified for scalar Tensors or ones that don't require grad. If a None value would be acceptable then this argument is optional. retain_graph (bool, optional): If ``False``, the graph used to compute the grads will be freed. Note that in nearly all cases setting this option to True is not needed and often can be worked around in a much more efficient way. Defaults to the value of ``create_graph``. create_graph (bool, optional): If ``True``, graph of the derivative will be constructed, allowing to compute higher order derivative products. Defaults to ``False``. """ relevant_args = (self,) from torch.overrides import has_torch_function, handle_torch_function if type(self) is not Tensor and has_torch_function(relevant_args): return handle_torch_function( Tensor.backward, relevant_args, self, gradient=gradient, retain_graph=retain_graph, create_graph=create_graph) torch.autograd.backward(self, gradient, retain_graph, create_graph)
上图中, 会对调用backward函数的对象进行类型判定,如果该类型不是Tensor且has_torchfunction(检查__torch_function_ overrides)返回true, 则进入handle_torchfunction,否则进入torch.autograd.backward(绝大多数情况).

2.2: torch.autograd.backward

函数位于torch/autograd/__init__.py中,其定义如下:
def backward( tensors: _TensorOrTensors, grad_tensors: Optional[_TensorOrTensors] = None, retain_graph: Optional[bool] = None, create_graph: bool = False, grad_variables: Optional[_TensorOrTensors] = None,) -> None: r"""Computes the sum of gradients of given tensors w.r.t. graph leaves. The graph is differentiated using the chain rule. If any of ``tensors`` are non-scalar (i.e. their data has more than one element) and require gradient, then the Jacobian-vector product would be computed, in this case the function additionally requires specifying ``grad_tensors``. It should be a sequence of matching length, that contains the "vector" in the Jacobian-vector product, usually the gradient of the differentiated function w.r.t. corresponding tensors (``None`` is an acceptable value for all tensors that don't need gradient tensors). This function accumulates gradients in the leaves - you might need to zero ``.grad`` attributes or set them to ``None`` before calling it. See :ref:`Default gradient layouts<default-grad-layouts>` for details on the memory layout of accumulated gradients. .. note:: Using this method with ``create_graph=True`` will create a reference cycle between the parameter and its gradient which can cause a memory leak. We recommend using ``autograd.grad`` when creating the graph to avoid this. If you have to use this function, make sure to reset the ``.grad`` fields of your parameters to ``None`` after use to break the cycle and avoid the leak. Arguments: tensors (sequence of Tensor): Tensors of which the derivative will be computed. grad_tensors (sequence of (Tensor or None)): The "vector" in the Jacobian-vector product, usually gradients w.r.t. each element of corresponding tensors. None values can be specified for scalar Tensors or ones that don't require grad. If a None value would be acceptable for all grad_tensors, then this argument is optional. retain_graph (bool, optional): If ``False``, the graph used to compute the grad will be freed. Note that in nearly all cases setting this option to ``True`` is not needed and often can be worked around in a much more efficient way. Defaults to the value of ``create_graph``. create_graph (bool, optional): If ``True``, graph of the derivative will be constructed, allowing to compute higher order derivative products. Defaults to ``False``. """ if grad_variables is not None: warnings.warn("'grad_variables' is deprecated. Use 'grad_tensors' instead.") if grad_tensors is None: grad_tensors = grad_variables else: raise RuntimeError("'grad_tensors' and 'grad_variables' (deprecated) " "arguments both passed to backward(). Please only " "use 'grad_tensors'.")
tensors = (tensors,) if isinstance(tensors, torch.Tensor) else tuple(tensors)
grad_tensors_ = _tensor_or_tensors_to_tuple(grad_tensors, len(tensors)) grad_tensors_ = _make_grads(tensors, grad_tensors_) if retain_graph is None: retain_graph = create_graph
Variable._execution_engine.run_backward( tensors, grad_tensors_, retain_graph, create_graph, allow_unreachable=True) # allow_unreachable flag
该函数前段是进行一些参数的设置, 参数的意义注释中已经写得很明白, 最后调用Variable._execution_engine.run_backward进行实际的backward.

2.3 Variable._execution_engine

定义在torch/autograd/variable.py中, 具体代码如下:
// torch/autograd/variable.pyfrom torch._C import _ImperativeEngine as ImperativeEngineVariable._execution_engine = ImperativeEngine()
可以看到, Variable.execution_engine实现就是torch._C中的ImperativeEngine的实现; torch._C我在上一篇文章中已经详细阐述, 其源码位于torch/csrc中, 为C++代码编写,但可被python调用.
torch/csrc/autograd/python_engine.cpp中的THPEngine_initModule函数调用PyModule_AddObject(module, "_ImperativeEngine", (PyObject *)&THPEngineType)将THPEngineType加入到module中, 并命名为ImperativeEngine。关于THPEngine_initModule和PyModule_AddObject, 我在上文<torch._C>中的关键细节部分已有详细说明,可参考.
Variable._executionengine.runbackward实际调用的是THPEngineType的runbackward,THPEngineType的tp_methods为THPEngine_methods,THPEngine_methods的定义如下:
static struct PyMethodDef THPEngine_methods[] = { {(char*)"run_backward", (PyCFunction)(void(*)(void))THPEngine_run_backward, METH_VARARGS | METH_KEYWORDS, nullptr}, {(char*)"queue_callback", (PyCFunction)THPEngine_queue_callback, METH_O, nullptr}, {(char*)"is_checkpoint_valid", (PyCFunction)THPEngine_is_checkpoint_valid, METH_NOARGS, nullptr}, {nullptr}};
实际就是调用的THPEngine_run_backward.

2.4 THPEngine_run_backward

// Implementation of torch._C._EngineBase.run_backwardPyObject *THPEngine_run_backward(THPEngine *self, PyObject *args, PyObject *kwargs){ // --------------------- 1 ---------------------------- HANDLE_TH_ERRORS PyObject *tensors = nullptr; PyObject *grad_tensors = nullptr; unsigned char keep_graph = 0; unsigned char create_graph = 0; PyObject *inputs = nullptr; unsigned char allow_unreachable = 0; const char *accepted_kwargs[] = { "tensors", "grad_tensors", "keep_graph", "create_graph", "inputs", "allow_unreachable", nullptr }; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OObb|Ob", (char**)accepted_kwargs, &tensors, &grad_tensors, &keep_graph, &create_graph, &inputs, &allow_unreachable)) return nullptr; // --------------------- 2 ---------------------------- THPUtils_assert(PyTuple_Check(tensors), "tensors argument is expected to " "be a tuple, but got %s", THPUtils_typename(tensors)); THPUtils_assert(PyTuple_Check(grad_tensors), "grad_tensors argument is " "expected to be a tuple, but got %s", THPUtils_typename(grad_tensors));
Py_ssize_t num_tensors = PyTuple_GET_SIZE(tensors); Py_ssize_t num_gradients = PyTuple_GET_SIZE(grad_tensors); THPUtils_assert(num_tensors == num_gradients, "got %ld tensors and %ld " "gradients", num_tensors, num_gradients);
// The user either called autograd.backward(...) or autograd.grad(...) to get here bool backward_api_called = inputs == nullptr; TORCH_CHECK(!backward_api_called || at::impl::VmapMode::current_vmap_level() == 0, "backward() called inside torch.vmap. This is not supported, " "please call backward() outside torch.vmap or instead use " "torch.autograd.grad inside torch.vmap"); // --------------------- 3 ---------------------------- edge_list roots; roots.reserve(num_tensors); variable_list grads; grads.reserve(num_tensors); for (int i = 0; i < num_tensors; i++) { PyObject *_tensor = PyTuple_GET_ITEM(tensors, i); THPUtils_assert(THPVariable_Check(_tensor), "element %d of tensors " "tuple is not a Tensor", i); auto& variable = ((THPVariable*)_tensor)->cdata; TORCH_CHECK(!isBatchedTensor(variable), "torch.autograd.grad(outputs, inputs, grad_outputs) called inside ", "torch.vmap. We do not support the case where any outputs are ", "vmapped tensors (output ", i, " is being vmapped over). Please " "call autograd.grad() outside torch.vmap or file a bug report " "with your use case.") auto gradient_edge = torch::autograd::impl::gradient_edge(variable); THPUtils_assert(gradient_edge.function, "element %d of tensors does not require grad and does not have a grad_fn", i); roots.push_back(std::move(gradient_edge));
PyObject *grad = PyTuple_GET_ITEM(grad_tensors, i); if (THPVariable_Check(grad)) { const Variable& grad_var = ((THPVariable*)grad)->cdata; if (grad_var.has_names()) { TORCH_WARN( "Autograd was passed a named grad tensor with dims ", grad_var.names(), ". Autograd does not yet support named tensor semantics, so all names ", "will be ignored. In practice all computed gradients will still be correct " "according to regular tensor semantics."); } grads.push_back(grad_var); } else { THPUtils_assert(grad == Py_None, "element %d of gradients tuple is not a Tensor or None", i); THPUtils_assert(!variable.requires_grad(), "element %d of gradients tuple is None, but the corresponding Tensor requires grad"); } } // --------------------- 4 ---------------------------- std::vector<Edge> output_edges; if (!backward_api_called) { int num_inputs = PyTuple_GET_SIZE(inputs); output_edges.reserve(num_inputs); for (int i = 0; i < num_inputs; ++i) { PyObject *input = PyTuple_GET_ITEM(inputs, i); THPUtils_assert(THPVariable_Check(input), "all inputs have to be Tensors, but got %s", THPUtils_typename(input)); THPVariable *input_var = (THPVariable*)input; TORCH_CHECK(!isBatchedTensor(input_var->cdata), "torch.autograd.grad(outputs, inputs, grad_outputs) called inside ", "torch.vmap. We do not support the case where any inputs are ", "vmapped tensors (input ", i, " is being vmapped over). Please " "call autograd.grad() outside torch.vmap or file a bug report " "with your use case.") const auto output_nr = input_var->cdata.output_nr(); auto grad_fn = input_var->cdata.grad_fn(); if (!grad_fn) { grad_fn = torch::autograd::impl::try_get_grad_accumulator(input_var->cdata); } THPUtils_assert(input_var->cdata.requires_grad(), "One of the differentiated Tensors does not require grad"); if (!grad_fn) { output_edges.emplace_back(); } else { output_edges.emplace_back(grad_fn, output_nr); } } }// --------------------- 5 ---------------------------- variable_list outputs; { pybind11::gil_scoped_release no_gil; auto& engine = python::PythonEngine::get_python_engine(); outputs = engine.execute(roots, grads, keep_graph, create_graph, output_edges); }//--------------------- 6 ---------------------------- if (!backward_api_called) { int num_inputs = PyTuple_GET_SIZE(inputs); THPObjectPtr py_outputs {PyTuple_New(num_inputs)}; if (!py_outputs) return nullptr; for (int i = 0; i < num_inputs; i++) { THPUtils_assert(allow_unreachable || outputs[i].defined(), "One of the " "differentiated Tensors appears to not have been used " "in the graph. Set allow_unused=True if this is the " "desired behavior."); PyTuple_SET_ITEM(py_outputs.get(), i, THPVariable_Wrap(outputs[i])); } return py_outputs.release(); } else { Py_RETURN_NONE; } END_HANDLE_TH_ERRORS}
1): 主要是通过PyArgParseTupleAndKeywords将输入参数进行解析并赋给tensors、grad_tensors、keep_graph 、create_graph 、inputs 、allow_unreachable 等.
2): 主要是进行tensors、grad_tensors的类型和元素个数检查
3): 主要是通过tensors、grad_tensors构建roots、grads
4): 主要是构建output_edges,首先对inputs中的每个元素尝试找它的cdata的grad_fn,若找不到, 则调用torch::autograd::impl::try_get_grad_accumulator(inputvar->cdata)生成grad_fn, 最后判断grad_fn是否为nullptr, 若是则向output_edges推入空元素, 否则向其中推入grad_fn和该input对应的output_nr.
5): 则是通过获取单例的PythonEngine,并调用其execute函数执行backward生成outputs.
6): 则是通过断定backward_apicalled是否为假, 若是,则将outputs包装成py_outputs,并返回,若不是,则返回为空.
对于3)的roots、grad_tensors,其类型主要为edge_list和variable_list, 依照从上层到底层的顺序来找其定义:
// torch/csrc/autograd/function.h using tensor_list = std::vector<at::Tensor>;using variable_list = std::vector<Variable>;using edge_list = std::vector<Edge>;using saved_variable_list = std::vector<SavedVariable>;using IndexRange = std::pair<size_t, size_t>;
// torch/csrc/autograd/variable.h /// `Variable` is exactly the same as `Tensor` (i.e. we have `using Variable = at::Tensor`)./// This means you can perform all the usual mathematical and other/// operations you can perform on `Tensor`s also on `Variable`s.////// The only reason we are keeping the `Variable` class is backward compatibility/// with external user's legacy C++ frontend code. Our intention is to eliminate/// the `Variable` class in the near future.using Variable = at::Tensor;
// torch/csrc/autograd/edge.h/// Represents a particular input of a function.struct Edge { Edge() noexcept : function(nullptr), input_nr(0) {}
Edge(std::shared_ptr<Node> function_, uint32_t input_nr_) noexcept : function(std::move(function_)), input_nr(input_nr_) {}
/// Convenience method to test if an edge is valid. bool is_valid() const noexcept { return function != nullptr; }
// Required for use in associative containers. bool operator==(const Edge& other) const noexcept { return this->function == other.function && this->input_nr == other.input_nr; }
bool operator!=(const Edge& other) const noexcept { return !(*this == other); }
/// The function this `Edge` points to. std::shared_ptr<Node> function;
/// The identifier of a particular input to the function. uint32_t input_nr;};}} // namespace torch::autograd
在4)中output_edges的类型为 std::vector<Edge>, 对inputs中的每个元素尝试生成其grad_fn, 该过程通过 torch::autograd::impl::try_get_grad_accumulator来实现.torch::autograd::impl::try_get_grad_accumulator的定义如下:
//torch/csrc/autograd/variable.cpp std::shared_ptr<Node> try_get_grad_accumulator(const Variable& self) { if (get_autograd_meta(self)) { return get_autograd_meta(self)->grad_accumulator_.lock(); } else { return nullptr; } }
//torch/csrc/autograd/variable.hstruct TORCH_API AutogradMeta : public c10::AutogradMetaInterface { std::string name_;
Variable grad_; std::shared_ptr<Node> grad_fn_; std::weak_ptr<Node> grad_accumulator_;
std::vector<std::shared_ptr<FunctionPreHook>> hooks_; std::shared_ptr<hooks_list> cpp_hooks_list;
// Only meaningful on leaf variables (must be false otherwise) bool requires_grad_;
// Only meaningful on non-leaf variables (must be false otherwise) bool retains_grad_;
bool is_view_;
// The "output number" of this variable; e.g., if this variable // was the second output of a function, then output_nr == 1. // We use this to make sure we can setup the backwards trace // correctly when this variable is passed to another function. uint32_t output_nr_;
// Mutex to ensure that concurrent read operations that modify internal // state are still thread-safe. Used by grad_fn() and // grad_accumulator(). std::mutex mutex_;
/// Sets the `requires_grad` property of `Variable`. This should be true for /// leaf variables that want to accumulate gradients, and false for all other /// variables. void set_requires_grad(bool requires_grad, at::TensorImpl* self_impl) override { TORCH_CHECK( !requires_grad || isDifferentiableType(at::typeMetaToScalarType(self_impl->dtype())), "Only Tensors of floating point and complex dtype can require gradients"); requires_grad_ = requires_grad; }
bool requires_grad() const override { return requires_grad_ || grad_fn_; }
/// Accesses the gradient `Variable` of this `Variable`. Variable& mutable_grad() override { return grad_; }
const Variable& grad() const override { return grad_; }
AutogradMeta(at::TensorImpl* self_impl = nullptr, bool requires_grad = false, Edge gradient_edge = Edge() ) { grad_fn_ = std::move(gradient_edge.function); requires_grad_ = false; retains_grad_ = false; is_view_ = false; output_nr_ = gradient_edge.input_nr;
// set_requires_grad also checks error conditions. if (requires_grad) { TORCH_INTERNAL_ASSERT(self_impl); set_requires_grad(requires_grad, self_impl); } TORCH_CHECK( !grad_fn_ || !requires_grad_, "requires_grad should be false if grad_fn is set"); }};
可见torch::autograd::impl::try_get_grad_accumulator是用来获取AutogradMeta 中的grad_accumulator_的.
5): PythonEngine的execute函数为整个backward的核心, 执行backward生成outputs.
//torch/csrc/autograd/python_engine.cppEngine& PythonEngine::get_python_engine() { static PythonEngine engine; // This is "probably" thread-safe because the flag is set in a fork handler // before any threads are created, and this function is only called with the // GIL held. However, using fork + threads is playing with fire so this is // more of a "best effort" thing. For example, if the fork occurs while the // backwards threads hold a lock, we'll probably deadlock in the engine // destructor. if (_reinitialize_engine) { engine.release_workers(); engine.~PythonEngine(); new (&engine) torch::autograd::python::PythonEngine(); _reinitialize_engine = false; } return engine;}
首先通过PythonEngine::get_python_engine获取Engine的单例, 然后调用execute函数.
//torch/csrc/autograd/python_engine.cppvariable_list PythonEngine::execute( const edge_list& roots, const variable_list& inputs, bool keep_graph, bool create_graph, const edge_list& outputs) { TORCH_CHECK(!PyGILState_Check(), "The autograd engine was called while holding the GIL. If you are using the C++ " "API, the autograd engine is an expensive operation that does not require the " "GIL to be held so you should release it with 'pybind11::gil_scoped_release no_gil;'" ". If you are not using the C++ API, please report a bug to the pytorch team.") try { return Engine::execute(roots, inputs, keep_graph, create_graph, outputs); } catch (python_error& e) { e.restore(); throw; }}
PythonEngine继承于Engine, executre也是重写的Engine的.
struct PythonEngine : public Engine { static Engine& get_python_engine(); void thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue, bool should_increment) override; void thread_on_exception( std::shared_ptr<GraphTask> graph_task, const std::shared_ptr<Node>& fn, std::exception& e) override; variable_list execute( const edge_list& roots, const variable_list& inputs, bool keep_graph, bool create_graph, const edge_list& outputs = {}) override;
std::shared_ptr<at::ivalue::Future> execute_with_graph_task( const std::shared_ptr<GraphTask>& graph_task, std::shared_ptr<Node> graph_root) override;
std::unique_ptr<AnomalyMetadata> make_anomaly_metadata() override; private: PythonEngine();};

2.5: execute

auto Engine::execute(const edge_list& roots, const variable_list& inputs, bool keep_graph, bool create_graph, const edge_list& outputs) -> variable_list
//--------------------- 1 ----------------------------void validate_outputs( const edge_list& edges, variable_list& grads, const std::function<std::string(const std::string&)>& format_error)//--------------------- 2 ----------------------------void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue)bool not_reentrant_backward_call = worker_device == NO_DEVICE;
auto graph_task = std::make_shared<GraphTask>( /* keep_graph */ keep_graph, /* create_graph */ create_graph, /* depth */ not_reentrant_backward_call ? 0 : total_depth + 1, /* cpu_ready_queue */ local_ready_queue);//--------------------- 3 ----------------------------auto graph_root = std::make_shared<GraphRoot>(roots, inputs);auto Engine::compute_dependencies(Node* root, GraphTask& task) -> void //--------------------- 4 ----------------------------if (!outputs.empty()) { grad_task->init_to_execute(*graph_root, outputs);}void GraphTask::init_to_execute(Node& graph_root, const edge_list& outputs)//--------------------- 5 ----------------------------std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task( const std::shared_ptr<GraphTask>& graph_task, std::shared_ptr<Node> graph_root)
execute的执行分为5个部分.
1): validate_outputs: 主要是用来验证roots与inputs的shape、device、size是否匹配, 并作一些简单的转换如engine.cpp中第603行和615行
grad = at::sum_to(std::move(grad), metadata.shape());grad = grad.to(metadata.device());
2): 初始化 thread local ready queue或重用已存在的, 在获取了thead local ready queue后, 创建GraphTask(将获取的thread local ready queue放入GraphTask)
3): 创建GraphRoot, 并调用compute_dependecies计算每个参与梯度计算的node依赖数(通过GraphRoot对GraphTask中的node进行BFS搜索实现.)
/* Computes the number of dependencies for each function which requires grad */auto Engine::compute_dependencies(Node* root, GraphTask& task) -> void { // Just to make sure that they will never be added to the queue again std::unordered_set<Node*> seen; std::vector<Node*> queue { root };
// Queue contains all nodes that will start propagating gradients. // We no longer have to expand functions that don't require grad. auto& dependencies = task.dependencies_; while (!queue.empty()) { auto fn = queue.back(); queue.pop_back(); for (const auto& edge : fn->next_edges()) { if (auto next_ptr = edge.function.get()) { dependencies[next_ptr] += 1; const bool was_inserted = seen.insert(next_ptr).second; if (was_inserted) queue.push_back(next_ptr); } } }}
4): 根据graph_root和outputs, 创建outputs中每个元素对应的ExecInfo, 并对GraphTask输出的std::vector<Variable> captured_vars指定大小, 再对graphroot的next_edges中每个元素进行DFS设置exec_info.
void GraphTask::init_to_execute(Node& graph_root, const edge_list& outputs) { exec_info_[&graph_root].needed_ = true;
int output_idx = 0; for (auto & output_edge : outputs) { Node *output = output_edge.function.get(); auto & info = exec_info_[output]; if (!info.captures_) info.captures_ = make_unique<std::vector<ExecInfo::Capture>>(); info.captures_->emplace_back(output_edge.input_nr, output_idx++); } captured_vars_.resize(output_idx);
// NB: this is an uglier version (recursion replaced with iteration) of the following code: // is_needed = {} // def compute_is_needed(fn): // if fn not in is_needed: // is_needed[fn] = any(compute_is_needed(next_edge) // for next_edge in fn.next_edges) // return is_needed[fn] struct Frame { Frame (Node *fn) : fn_(fn), next_next_fn_(0) {} Node *fn_; size_t next_next_fn_;
Node* get_next_fn() { const auto & next = fn_->next_edges(); auto num_next = next.size(); while (next_next_fn_ < num_next) { auto fn = next[next_next_fn_++].function.get(); if (fn) return fn; } return nullptr; } }; std::vector<Frame> stack; std::unordered_set<Node*> seen; for (const auto & input : graph_root.next_edges()) { if (seen.count(input.function.get()) > 0) continue; stack.emplace_back(input.function.get()); while (!stack.empty()) { auto &frame = stack.back(); if (Node *next_fn = frame.get_next_fn()) { if (/* bool unseen = */ seen.emplace(next_fn).second) { stack.emplace_back(next_fn); continue; // recurse } } else { // NB: if we were using real recursion we could have saved some lookups // using a return value from recursive call. It would make this manually unrolled // version a lot more complicated, so I skipped that. const auto & next_edges = frame.fn_->next_edges(); const bool needed = std::any_of( next_edges.begin(), next_edges.end(), [&](const Edge& edge) { auto it = exec_info_.find(edge.function.get()); return it != exec_info_.end() && it->second.should_execute(); }); exec_info_[frame.fn_].needed_ = needed; stack.pop_back(); } } }}
5): execute_with_graph_task
//--------------------- 1 ----------------------------initialize_device_threads_pool()//--------------------- 2 ----------------------------ready_queue(graph_task->cpu_ready_queue_, at::kCPU)->push( NodeTask(graph_task, std::move(graph_root), InputBuffer(0)));//--------------------- 3 ----------------------------, 若为cpu线程 // We set the worker_device to CPU_DEVICE only if worker_device was previously // NO_DEVICE. Setting it to CPU afterwards allow us to detect whether this is // a re-entrant call or not. set_device(CPU_DEVICE);
// set the graph_task owner to the current device graph_task->owner_ = worker_device;
// The owning thread start to drive the engine execution with the GraphTask // that has already been pushed to the current CPU thread's ready_queue lock.unlock(); thread_main(graph_task); TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed()); // reset the worker_device after the completion of the graph_task, this is so // that the initial state of the engine remains the same across every backward() // or grad() call, we don't need to reset local_ready_queue as we could possibly // reuse it for new backward calls. worker_device = NO_DEVICE;
//--------------------- 4 ----------------------------, 若为其它线程 // If worker_device is any devices (i.e. CPU, CUDA): this is a re-entrant // backward call from that device. graph_task->owner_ = worker_device; if (current_depth >= max_recursion_depth_) { // See Note [Reentrant backwards] // If reached the max depth, switch to a different thread add_thread_pool_task(graph_task); } else { // Total depth needs to be updated only in this codepath, since it is // not used in the block above (when we call add_thread_pool_task). // In the codepath above, GraphTask.reentrant_depth_ is used to // bootstrap total_depth in the other thread. ++total_depth;
// Get back to work while we wait for our new graph_task to // complete! ++current_depth; lock.unlock(); thread_main(graph_task); --current_depth; --total_depth;
// The graph task should have completed and the associated future should // be marked completed as well since 'thread_main' above is a call // blocking an autograd engine thread. TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed()); }
5.1): 先初始化线程池, 用callonce和once_flag确保start_device_threads只会被调用一次
5.2): 将graph_root所在的NodeTask放入CPU关联的ready_queue中
5.3): 当前线程为CPU线程, 调用thread_main(graph_task)执行graph_task, 并注意worker_device的set和reset.
5.4): 当前线程非CPU线程, 则为re-entrant backward, 也是调用thread_main(graph_task)执行graphtask, 注意根据current_depth是否大于等于max_recursion_depth决定是否切换到另一线程执行graph_task.

2.6: start_device_threads

initialize_device_threads函数调用start_device_threads函数
// engine.cppauto Engine::start_device_threads() -> void { // See Note [Allocating GPUs to autograd threads] c10::DeviceIndex num_devices = 0; for (const auto& impl_atomic : c10::impl::device_guard_impl_registry) { auto* impl = impl_atomic.load(); if (impl) { num_devices = std::max(num_devices, impl->deviceCount()); } }
// allocate one thread for every GPU device (but colocate GPUs of different // types), and pre-allocate the device_ready_queues_ to ensure safe reading on it. device_ready_queues_ = std::vector<std::shared_ptr<ReadyQueue>>(num_devices); for (auto& queue : device_ready_queues_) { queue.reset(new ReadyQueue()); }
thread_pool_shared_ = std::make_shared<ThreadPoolShared>();
for (int i = 0; i < num_devices; ++i) { std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true); t.detach(); } // Wait for the threads to start { std::unique_lock<std::mutex> lk(non_reentrant_device_thread_mutex_); while(non_reentrant_device_thread_count_.load() != num_devices) { non_reentrant_device_thread_condvar_.wait(lk); } }}
start_device_hreads最核心的是初始化num_devices个ReadyQueue, 创建num_device个线程调用thread_init对每个ReadyQueue进行初始化.
void Engine::thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue, bool should_increment) { if (should_increment) { increment_non_reentrant_thread_count(); }
at::init_num_threads();
// Note [Allocating GPUs to autograd threads] // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // What's our strategy here? Originally, the autograd engine was written // with only CUDA in mind. We allocate one thread to handle all CPU // operations, and a thread per CUDA device. // // But what if we have OTHER devices? There are two plausible // strategies: // // - We can allocate threads equal to max(num_cuda_devices, num_xla_devices, // ...) and colocate cuda device 0 with xla device 0 // - We can allocate threads equal to sum(num_cuda_devices, num_xla_devices, // ...) keeping everyone separate. // // We don't have any good reason to prefer one or the other, so we've // arbitrarily picked to colocate devices. Maybe the other approach is // better. set_device(device);
// initialize each device thread's thread local ready queue with the ready queue // that is created before the thread initialization init_local_ready_queue(ready_queue);
std::shared_ptr<GraphTask> graph_task = nullptr; thread_main(graph_task); if (should_increment) { // Decrement the count during shutdown if we incremented earlier. decrement_non_reentrant_thread_count(); }}
注释中有关于分配的线程数与其它设备数(cpu除外)关系的两种方案, 一种为所分配的线程数等于max(num_cudadevices, num_xla_devices,...), 另一种为sum(num_cuda_devices, num_xla_devices, ...), 这两种方案中每一种相对于另一种并没有绝对优势, 但是最终选了第1种;
若该函数传入的read_queue存在实例, 则相应线程的thread local ready_queue为该ready_queue; 否则新创建一个新的ready_queue作为thread local ready_queue;
void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue) { if (ready_queue) { // if ready_queue provided in the caller, use the caller's ready_queue to initialize local_ready_queue local_ready_queue = std::move(ready_queue); } else if (!local_ready_queue){ // otherwise if local_ready_queue not allocated, allocate a new ready_queue local_ready_queue = std::make_shared<ReadyQueue>(); }}
接下来调用thread_main(graph_task).

2.7: thread_main

// NOTE: graph_tasks do not necessarily form a stack. Imagine this// case://// +----> Eval1// Root// +----> Eval2//// Once Root is executed, both Eval1 and Eval2 are added to the ready queue.// Next, Eval1 is run and this causes the worker to enter thread_main again.// Then, it pops the next task from the queue, but at this point it is Eval2.// It enters thread_main once again, but now with graph_task of Eval2, which is// completely unrelated to that of Eval1 (it's not a recursive call).// It's all ok and is handled right now, but it should be accounted for// in case this code is to be changed.//// thread_main is used by:// 1). autograd threads for devices (i.e. CUDA, XLA)// 2). the caller/owning thread of the backward call on CPU (sync mode)// 3). Renetrant backward that invoked by either 1) or 2)// The exit conditions are different for the above three cases.// For 1), we are spinning on running the thread_main on device autograd// threads throughout the Engine lifetime, thread_main will get// terminated during Engine destruction by pushing shutdown tasks// For 2), the owning thread of the backward call drives the thread_main// synchronously until the graph_task of that owning thread is// completed and exit the thread_main to continue executing the// result of caller's code.// For 3), the reentrant backward that invokes// thread_main, either from 1) or 2), will not spin and will exit as// long as graph_task is completed and notify the owning thread as// needed.auto Engine::thread_main(const std::shared_ptr<GraphTask>& graph_task) -> void { // When graph_task is nullptr, this is a long running thread that processes // tasks (ex: device threads). When graph_task is non-null (ex: reentrant // backwards, user thread), this function is expected to exit once that // graph_task complete.
// local_ready_queue should already been initialized when we get into thread_main TORCH_INTERNAL_ASSERT(local_ready_queue != nullptr); while (graph_task == nullptr || !graph_task->future_result_->completed()) { // local_graph_task represents the graph_task we retrieve from the queue. // The outer graph_task represents the overall graph_task we need to execute // for reentrant execution. std::shared_ptr<GraphTask> local_graph_task; { // Scope this block of execution since NodeTask is not needed after this // block and can be deallocated (release any references to grad tensors // as part of inputs_). NodeTask task = local_ready_queue->pop(); // This will only work if the worker is running a non backward task // TODO Needs to be fixed this to work in all cases if (task.isShutdownTask_) { C10_LOG_API_USAGE_ONCE("torch.autograd.thread_shutdown"); break; }
if (!(local_graph_task = task.base_.lock())) { // GraphTask for function is no longer valid, skipping further // execution. continue; }
if (task.fn_ && !local_graph_task->has_error_.load()) { AutoGradMode grad_mode(local_graph_task->grad_mode_); try { // The guard sets the thread_local current_graph_task on construction // and restores it on exit. The current_graph_task variable helps // queue_callback() to find the target GraphTask to append final // callbacks. GraphTaskGuard guard(local_graph_task); NodeGuard ndguard(task.fn_); evaluate_function(local_graph_task, task.fn_.get(), task.inputs_, local_graph_task->cpu_ready_queue_); } catch (std::exception& e) { thread_on_exception(local_graph_task, task.fn_, e); } } }
// Decrement the outstanding tasks. --local_graph_task->outstanding_tasks_;
// Check if we've completed execution. if (local_graph_task->completed()) { local_graph_task->mark_as_completed_and_run_post_processing();
auto base_owner = local_graph_task->owner_; // The current worker thread finish the graph_task, but the owning thread // of the graph_task might be sleeping on pop() if it does not have work. // So we need to send a dummy function task to the owning thread just to // ensure that it's not sleeping, so that we can exit the thread_main. // If it has work, it might see that graph_task->outstanding_tasks_ == 0 // before it gets to the task, but it's a no-op anyway. // // NB: This is not necessary if the current thread is the owning thread. if (worker_device != base_owner) { // Synchronize outstanding_tasks_ with queue mutex std::atomic_thread_fence(std::memory_order_release); ready_queue_by_index(local_graph_task->cpu_ready_queue_, base_owner) ->push(NodeTask(local_graph_task, nullptr, InputBuffer(0))); } } }}
该函数除了检查local_ready_queue 是否被初始化之外, 其全部内部是一个循环, 该循环通过传入的graph_task是否为空或已经完成来判定是否进入循环, 该循环的内部主要是做:
1) 从local_ready_queue取出NodeTask实例, 并调用evaluate_function执行NodeTask实例.
2) 减少1)中NodeTask对应的GraphTask的outstanding_tasks_
3) 若1)中NodeTask对应的GraphTask已经计算完成, 则调用local_graph_task->mark_as_completed_and_run_post_processing()完成结束标记和后处理; 如果当前线程不为2)中GraphTask的owning thread, 给owning thread一个dummy function task从而让它离开sleeping状态.

2.8: evaluate_function

void Engine::evaluate_function( std::shared_ptr<GraphTask>& graph_task, Node* func, InputBuffer& inputs, const std::shared_ptr<ReadyQueue>& cpu_ready_queue) { // If exec_info_ is not empty, we have to instrument the execution//--------------------- 1 ---------------------------- auto& exec_info_ = graph_task->exec_info_; if (!exec_info_.empty()) { auto& fn_info = exec_info_.at(func); if (auto* capture_vec = fn_info.captures_.get()) { // Lock mutex for writing to graph_task->captured_vars_. std::lock_guard<std::mutex> lock(graph_task->mutex_); for (const auto& capture : *capture_vec) { auto& captured_grad = graph_task->captured_vars_[capture.output_idx_]; captured_grad = inputs[capture.input_idx_]; for (auto& hook : capture.hooks_) { captured_grad = (*hook)(captured_grad); } } } if (!fn_info.needed_) { // Skip execution if we don't need to execute the function. return; } }
//--------------------- 2 ---------------------------- // Switches to a function's CUDA stream (if applicable) before calling it const auto opt_parent_stream = (*func).stream(c10::DeviceType::CUDA); c10::OptionalStreamGuard parent_stream_guard{opt_parent_stream};
auto outputs = call_function(graph_task, func, inputs);
//--------------------- 3 ---------------------------- auto& fn = *func; if (!graph_task->keep_graph_) { fn.release_variables(); }
int num_outputs = outputs.size(); if (num_outputs == 0) { // Note: doesn't acquire the mutex // Records leaf stream (if applicable) // See note "Streaming backwards" if (opt_parent_stream) { std::lock_guard<std::mutex> lock(graph_task->mutex_); graph_task->leaf_streams.emplace(*opt_parent_stream); } return; }
if (AnomalyMode::is_enabled()) { AutoGradMode grad_mode(false); for (int i = 0; i < num_outputs; ++i) { auto& output = outputs[i]; at::OptionalDeviceGuard guard(device_of(output)); if (output.defined() && isnan(output).any().item<uint8_t>()) { std::stringstream ss; ss << "Function '" << fn.name() << "' returned nan values in its " << i << "th output."; throw std::runtime_error(ss.str()); } } }
//--------------------- 4 ---------------------------- // Lock mutex for the accesses to GraphTask dependencies_, not_ready_ and cpu_ready_queue_ below std::lock_guard<std::mutex> lock(graph_task->mutex_); for (int i = 0; i < num_outputs; ++i) { auto& output = outputs[i]; const auto& next = fn.next_edge(i);
if (!next.is_valid()) continue;
// Check if the next function is ready to be computed bool is_ready = false; auto& dependencies = graph_task->dependencies_; auto it = dependencies.find(next.function.get());
if (it == dependencies.end()) { auto name = next.function->name(); throw std::runtime_error(std::string("dependency not found for ") + name); } else if (--it->second == 0) { dependencies.erase(it); is_ready = true; }
auto& not_ready = graph_task->not_ready_; auto not_ready_it = not_ready.find(next.function.get()); if (not_ready_it == not_ready.end()) { // Skip functions that aren't supposed to be executed if (!exec_info_.empty()) { auto it = exec_info_.find(next.function.get()); if (it == exec_info_.end() || !it->second.should_execute()) { continue; } } // No buffers have been allocated for the function InputBuffer input_buffer(next.function->num_inputs());
// Accumulates into buffer const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA); input_buffer.add(next.input_nr, std::move(output), opt_parent_stream, opt_next_stream);
if (is_ready) { auto queue = ready_queue(cpu_ready_queue, input_buffer.device()); queue->push( NodeTask(graph_task, next.function, std::move(input_buffer))); } else { not_ready.emplace(next.function.get(), std::move(input_buffer)); } } else { // The function already has a buffer auto &input_buffer = not_ready_it->second;
// Accumulates into buffer const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA); input_buffer.add(next.input_nr, std::move(output), opt_parent_stream, opt_next_stream); if (is_ready) { auto queue = ready_queue(cpu_ready_queue, input_buffer.device()); queue->push( NodeTask(graph_task, next.function, std::move(input_buffer))); not_ready.erase(not_ready_it); } } }}
1) : 找到Node的对应的ExecInfo执行信息, 执行对应的钩子函数.
2): 调用call_function执行各个反向计算的函数以及注册在上面的钩子.
3): 记录leaf stream, 根据AnomalyMode是否设定, 进行异常处理.
4): 这一步的作用就是在计算完Node的之后, 对于它的next_edge上关联的Node, 减少该Node的dependencies.
4.1): 若该Node对应的dependencies降为0, 则从graph_task的dependecies中删除该Node, 并将该Node设为ready;
4.2): 若next_edge上关联的Node不在not_ready_里, 则说明该Node是第一次遇到, 则根据它的numinputs初始化 InputBuffer, 并向该InputBuffer中增加一个输入.若该Node设为了ready, 则将其放入ready_queue中; 若该Node没有设为ready, 则将其放入not_ready_中.
4.3): 若next_edge上关联的Node在not_ready_里, 则向其InputBuffer中增加一个输入,若该Node设为了ready, 则将其放入ready_queue中.
一个Node被送往哪个设哪个线程相关的ready_queue取决于它的input_buffer的device.

2.9: call_function

哪个设哪个线程相关该函数就是用来调用反向计算的函数以及注册在其上的hooks, 顺序为调用prehooks、调用backward函数、调用post_hooks.
static variable_list call_function( std::shared_ptr<GraphTask>& graph_task, Node* func, InputBuffer& inputBuffer) { bool prev_checkpoint_valid_state = checkpoint_valid; checkpoint_valid = graph_task->can_checkpoint() && prev_checkpoint_valid_state; auto& fn = *func; auto inputs = call_pre_hooks(fn, InputBuffer::variables(std::move(inputBuffer)));
if (!graph_task->keep_graph_) { fn.will_release_variables(); }
const auto has_post_hooks = !fn.post_hooks().empty(); variable_list outputs;
{ at::ThreadLocalStateGuard guard(graph_task->thread_locals_); if (has_post_hooks) { // In functions/accumulate_grad.cpp, there is some logic to check the // conditions under which the incoming gradient can be stolen directly // (which elides a deep copy) instead of cloned. One of these conditions // is that the incoming gradient's refcount must be 1 (nothing else is // referencing the same data). Stashing inputs_copy here bumps the // refcount, so if post hooks are employed, it's actually still ok for // accumulate_grad.cpp to steal the gradient if the refcount is 2. // // "new_grad.use_count() <= 1 + !post_hooks().empty()" in // accumulate_grad.cpp accounts for this, but also creates a silent // dependency between engine.cpp (ie, this particular engine // implementation) and accumulate_grad.cpp. // // If you change the logic here, make sure it's compatible with // accumulate_grad.cpp. auto inputs_copy = inputs; outputs = fn(std::move(inputs_copy)); } else { outputs = fn(std::move(inputs)); } }

03

涉及技术
1): thread local: 有且只有thread_local关键字修饰的变量具有线程周期(thread duration),这些变量(或者说对象)在线程开始的时候被生成(allocated),在线程结束的时候被销毁(deallocated)。
如engine.cpp中的
static thread_local int worker_device = NO_DEVICE;

static thread_local bool checkpoint_valid = true;

static thread_local int current_depth = 0;

static thread_local int total_depth = 0;

static thread_local std::shared_ptr<GraphTask> current_graph_task = nullptr;

static thread_local std::shared_ptr<ReadyQueue> local_ready_queue = nullptr;
2): singleton: 构造execute engine时, 保证整个程序生命周期内只有engine实例.
如engine.cpp中的
Engine& Engine::get_base_engine() {static Engine engine;return engine;}
3): guard:以GraphTaskGuard为例:
class GraphTaskGuard {public:explicit GraphTaskGuard(std::shared_ptr<GraphTask> graph_task);~GraphTaskGuard();
void restore_current_graph_task();
private:std::shared_ptr<GraphTask> last_graph_task_;};
在该类构造时将传入的graph_task设为当前task, 并将上一task保存, 在析构时将上一task设为当前task. 实现了task的set和restore.
4): weak_ptr: 用在AutogradMeta等中, 该智能指针可解决shared_ptr循环引用的问题.
5): move 语义: 用在torch.autograd.function、THPEngine_run_backward、Variable_list_callfunction、evaluatefunction、init_local_read_queue、execute_with_graphtask、validata_outputs来触发移动赋值运算符或相应重载函数.
6): std::call_once: 配合std::once_flag可保证在多线程环境中相应函数只被调用一次.在initialize_device_threads函数中使用.
7): RAII: 管理资源、避免内存泄露的方法。将资源的获取与释放与对象的构造和析构绑定, 它保证在任何情况下,使用资源时先构造对象,最后析构对象时释放资源.
如evaluate_function中将std::mutex与std::local_guard绑定: std::lock_guard<std::mutex>lock(graph_task->mutex_)
使lock在构造时调用mutex.lock(), 析构时调用mutex.unlock(), 解决在mutex.lock()后忘了mutex.unlocak()问题.
8): DFS/BFS:用在init_to_execute和compute_dependencies从graph_root遍历到 它结点.
9): 多线程相关的诸多知识

本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。


“他山之石”历史文章


更多他山之石专栏文章,

请点击文章底部“阅读原文”查看



分享、点赞、在看,给个三连击呗!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存