其他
【他山之石】autograd源码剖析
“他山之石,可以攻玉”,站在巨人的肩膀才能看得更高,走得更远。在科研的道路上,更需借助东风才能更快前行。为此,我们特别搜集整理了一些实用的代码链接,数据集,软件,编程技巧等,开辟“他山之石”专栏,助你乘风破浪,一路奋勇向前,敬请关注。
地址:https://www.zhihu.com/people/xiao-yao-81-19-18
01
网络forward计算出损失loss; 调用loss(Tensor及其子类)的backward()函数进行backward; 计算得出叶子节点的梯度 更新梯度
tensor.backward() // tensor.py
||call
torch.autograd.backward() // torch/autograd/__init__.py
||call
Variable._execution_engine.run_backward() // torch/autograd/__init__.py
||return
ImperativeEngine() // variable.py
||run_backward方法实际call
THPEngine_run_backward 该步为backward的核心步骤, 具体实现见2:关键细节 // python_engine.cpp
02
2.1: backward()
def backward(self, gradient=None, retain_graph=None, create_graph=False):
r"""Computes the gradient of current tensor w.r.t. graph leaves.
The graph is differentiated using the chain rule. If the tensor is
non-scalar (i.e. its data has more than one element) and requires
gradient, the function additionally requires specifying ``gradient``.
It should be a tensor of matching type and location, that contains
the gradient of the differentiated function w.r.t. ``self``.
This function accumulates gradients in the leaves - you might need to zero
``.grad`` attributes or set them to ``None`` before calling it.
See :ref:`Default gradient layouts<default-grad-layouts>`
for details on the memory layout of accumulated gradients.
Arguments:
gradient (Tensor or None): Gradient w.r.t. the
tensor. If it is a tensor, it will be automatically converted
to a Tensor that does not require grad unless ``create_graph`` is True.
None values can be specified for scalar Tensors or ones that
don't require grad. If a None value would be acceptable then
this argument is optional.
retain_graph (bool, optional): If ``False``, the graph used to compute
the grads will be freed. Note that in nearly all cases setting
this option to True is not needed and often can be worked around
in a much more efficient way. Defaults to the value of
``create_graph``.
create_graph (bool, optional): If ``True``, graph of the derivative will
be constructed, allowing to compute higher order derivative
products. Defaults to ``False``.
"""
relevant_args = (self,)
from torch.overrides import has_torch_function, handle_torch_function
if type(self) is not Tensor and has_torch_function(relevant_args):
return handle_torch_function(
Tensor.backward,
relevant_args,
self,
gradient=gradient,
retain_graph=retain_graph,
create_graph=create_graph)
torch.autograd.backward(self, gradient, retain_graph, create_graph)
2.2: torch.autograd.backward
def backward(
tensors: _TensorOrTensors,
grad_tensors: Optional[_TensorOrTensors] = None,
retain_graph: Optional[bool] = None,
create_graph: bool = False,
grad_variables: Optional[_TensorOrTensors] = None,
) -> None:
r"""Computes the sum of gradients of given tensors w.r.t. graph leaves.
The graph is differentiated using the chain rule. If any of ``tensors``
are non-scalar (i.e. their data has more than one element) and require
gradient, then the Jacobian-vector product would be computed, in this
case the function additionally requires specifying ``grad_tensors``.
It should be a sequence of matching length, that contains the "vector"
in the Jacobian-vector product, usually the gradient of the differentiated
function w.r.t. corresponding tensors (``None`` is an acceptable value for
all tensors that don't need gradient tensors).
This function accumulates gradients in the leaves - you might need to zero
``.grad`` attributes or set them to ``None`` before calling it.
See :ref:`Default gradient layouts<default-grad-layouts>`
for details on the memory layout of accumulated gradients.
.. note::
Using this method with ``create_graph=True`` will create a reference cycle
between the parameter and its gradient which can cause a memory leak.
We recommend using ``autograd.grad`` when creating the graph to avoid this.
If you have to use this function, make sure to reset the ``.grad`` fields of your
parameters to ``None`` after use to break the cycle and avoid the leak.
Arguments:
tensors (sequence of Tensor): Tensors of which the derivative will be
computed.
grad_tensors (sequence of (Tensor or None)): The "vector" in the Jacobian-vector
product, usually gradients w.r.t. each element of corresponding tensors.
None values can be specified for scalar Tensors or ones that don't require
grad. If a None value would be acceptable for all grad_tensors, then this
argument is optional.
retain_graph (bool, optional): If ``False``, the graph used to compute the grad
will be freed. Note that in nearly all cases setting this option to ``True``
is not needed and often can be worked around in a much more efficient
way. Defaults to the value of ``create_graph``.
create_graph (bool, optional): If ``True``, graph of the derivative will
be constructed, allowing to compute higher order derivative products.
Defaults to ``False``.
"""
if grad_variables is not None:
warnings.warn("'grad_variables' is deprecated. Use 'grad_tensors' instead.")
if grad_tensors is None:
grad_tensors = grad_variables
else:
raise RuntimeError("'grad_tensors' and 'grad_variables' (deprecated) "
"arguments both passed to backward(). Please only "
"use 'grad_tensors'.")
tensors = (tensors,) if isinstance(tensors, torch.Tensor) else tuple(tensors)
grad_tensors_ = _tensor_or_tensors_to_tuple(grad_tensors, len(tensors))
grad_tensors_ = _make_grads(tensors, grad_tensors_)
if retain_graph is None:
retain_graph = create_graph
Variable._execution_engine.run_backward(
tensors, grad_tensors_, retain_graph, create_graph,
allow_unreachable=True) # allow_unreachable flag
2.3 Variable._execution_engine
// torch/autograd/variable.py
from torch._C import _ImperativeEngine as ImperativeEngine
Variable._execution_engine = ImperativeEngine()
static struct PyMethodDef THPEngine_methods[] = {
{(char*)"run_backward", (PyCFunction)(void(*)(void))THPEngine_run_backward, METH_VARARGS | METH_KEYWORDS, nullptr},
{(char*)"queue_callback", (PyCFunction)THPEngine_queue_callback, METH_O, nullptr},
{(char*)"is_checkpoint_valid", (PyCFunction)THPEngine_is_checkpoint_valid, METH_NOARGS, nullptr},
{nullptr}
};
2.4 THPEngine_run_backward
// Implementation of torch._C._EngineBase.run_backward
PyObject *THPEngine_run_backward(THPEngine *self, PyObject *args, PyObject *kwargs)
{
// --------------------- 1 ----------------------------
HANDLE_TH_ERRORS
PyObject *tensors = nullptr;
PyObject *grad_tensors = nullptr;
unsigned char keep_graph = 0;
unsigned char create_graph = 0;
PyObject *inputs = nullptr;
unsigned char allow_unreachable = 0;
const char *accepted_kwargs[] = {
"tensors", "grad_tensors", "keep_graph", "create_graph", "inputs",
"allow_unreachable", nullptr
};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OObb|Ob", (char**)accepted_kwargs,
&tensors, &grad_tensors, &keep_graph, &create_graph, &inputs, &allow_unreachable))
return nullptr;
// --------------------- 2 ----------------------------
THPUtils_assert(PyTuple_Check(tensors), "tensors argument is expected to "
"be a tuple, but got %s", THPUtils_typename(tensors));
THPUtils_assert(PyTuple_Check(grad_tensors), "grad_tensors argument is "
"expected to be a tuple, but got %s", THPUtils_typename(grad_tensors));
Py_ssize_t num_tensors = PyTuple_GET_SIZE(tensors);
Py_ssize_t num_gradients = PyTuple_GET_SIZE(grad_tensors);
THPUtils_assert(num_tensors == num_gradients, "got %ld tensors and %ld "
"gradients", num_tensors, num_gradients);
// The user either called autograd.backward(...) or autograd.grad(...) to get here
bool backward_api_called = inputs == nullptr;
TORCH_CHECK(!backward_api_called || at::impl::VmapMode::current_vmap_level() == 0,
"backward() called inside torch.vmap. This is not supported, "
"please call backward() outside torch.vmap or instead use "
"torch.autograd.grad inside torch.vmap");
// --------------------- 3 ----------------------------
edge_list roots;
roots.reserve(num_tensors);
variable_list grads;
grads.reserve(num_tensors);
for (int i = 0; i < num_tensors; i++) {
PyObject *_tensor = PyTuple_GET_ITEM(tensors, i);
THPUtils_assert(THPVariable_Check(_tensor), "element %d of tensors "
"tuple is not a Tensor", i);
auto& variable = ((THPVariable*)_tensor)->cdata;
TORCH_CHECK(!isBatchedTensor(variable),
"torch.autograd.grad(outputs, inputs, grad_outputs) called inside ",
"torch.vmap. We do not support the case where any outputs are ",
"vmapped tensors (output ", i, " is being vmapped over). Please "
"call autograd.grad() outside torch.vmap or file a bug report "
"with your use case.")
auto gradient_edge = torch::autograd::impl::gradient_edge(variable);
THPUtils_assert(gradient_edge.function,
"element %d of tensors does not require grad and does not have a grad_fn", i);
roots.push_back(std::move(gradient_edge));
PyObject *grad = PyTuple_GET_ITEM(grad_tensors, i);
if (THPVariable_Check(grad)) {
const Variable& grad_var = ((THPVariable*)grad)->cdata;
if (grad_var.has_names()) {
TORCH_WARN(
"Autograd was passed a named grad tensor with dims ", grad_var.names(),
". Autograd does not yet support named tensor semantics, so all names ",
"will be ignored. In practice all computed gradients will still be correct "
"according to regular tensor semantics.");
}
grads.push_back(grad_var);
} else {
THPUtils_assert(grad == Py_None,
"element %d of gradients tuple is not a Tensor or None", i);
THPUtils_assert(!variable.requires_grad(),
"element %d of gradients tuple is None, but the corresponding Tensor requires grad");
}
}
// --------------------- 4 ----------------------------
std::vector<Edge> output_edges;
if (!backward_api_called) {
int num_inputs = PyTuple_GET_SIZE(inputs);
output_edges.reserve(num_inputs);
for (int i = 0; i < num_inputs; ++i) {
PyObject *input = PyTuple_GET_ITEM(inputs, i);
THPUtils_assert(THPVariable_Check(input),
"all inputs have to be Tensors, but got %s", THPUtils_typename(input));
THPVariable *input_var = (THPVariable*)input;
TORCH_CHECK(!isBatchedTensor(input_var->cdata),
"torch.autograd.grad(outputs, inputs, grad_outputs) called inside ",
"torch.vmap. We do not support the case where any inputs are ",
"vmapped tensors (input ", i, " is being vmapped over). Please "
"call autograd.grad() outside torch.vmap or file a bug report "
"with your use case.")
const auto output_nr = input_var->cdata.output_nr();
auto grad_fn = input_var->cdata.grad_fn();
if (!grad_fn) {
grad_fn = torch::autograd::impl::try_get_grad_accumulator(input_var->cdata);
}
THPUtils_assert(input_var->cdata.requires_grad(),
"One of the differentiated Tensors does not require grad");
if (!grad_fn) {
output_edges.emplace_back();
} else {
output_edges.emplace_back(grad_fn, output_nr);
}
}
}
// --------------------- 5 ----------------------------
variable_list outputs;
{
pybind11::gil_scoped_release no_gil;
auto& engine = python::PythonEngine::get_python_engine();
outputs = engine.execute(roots, grads, keep_graph, create_graph, output_edges);
}
//--------------------- 6 ----------------------------
if (!backward_api_called) {
int num_inputs = PyTuple_GET_SIZE(inputs);
THPObjectPtr py_outputs {PyTuple_New(num_inputs)};
if (!py_outputs) return nullptr;
for (int i = 0; i < num_inputs; i++) {
THPUtils_assert(allow_unreachable || outputs[i].defined(), "One of the "
"differentiated Tensors appears to not have been used "
"in the graph. Set allow_unused=True if this is the "
"desired behavior.");
PyTuple_SET_ITEM(py_outputs.get(), i, THPVariable_Wrap(outputs[i]));
}
return py_outputs.release();
} else {
Py_RETURN_NONE;
}
END_HANDLE_TH_ERRORS
}
// torch/csrc/autograd/function.h
using tensor_list = std::vector<at::Tensor>;
using variable_list = std::vector<Variable>;
using edge_list = std::vector<Edge>;
using saved_variable_list = std::vector<SavedVariable>;
using IndexRange = std::pair<size_t, size_t>;
// torch/csrc/autograd/variable.h
/// `Variable` is exactly the same as `Tensor` (i.e. we have `using Variable = at::Tensor`).
/// This means you can perform all the usual mathematical and other
/// operations you can perform on `Tensor`s also on `Variable`s.
///
/// The only reason we are keeping the `Variable` class is backward compatibility
/// with external user's legacy C++ frontend code. Our intention is to eliminate
/// the `Variable` class in the near future.
using Variable = at::Tensor;
// torch/csrc/autograd/edge.h
/// Represents a particular input of a function.
struct Edge {
Edge() noexcept : function(nullptr), input_nr(0) {}
Edge(std::shared_ptr<Node> function_, uint32_t input_nr_) noexcept
: function(std::move(function_)), input_nr(input_nr_) {}
/// Convenience method to test if an edge is valid.
bool is_valid() const noexcept {
return function != nullptr;
}
// Required for use in associative containers.
bool operator==(const Edge& other) const noexcept {
return this->function == other.function && this->input_nr == other.input_nr;
}
bool operator!=(const Edge& other) const noexcept {
return !(*this == other);
}
/// The function this `Edge` points to.
std::shared_ptr<Node> function;
/// The identifier of a particular input to the function.
uint32_t input_nr;
};
}} // namespace torch::autograd
//torch/csrc/autograd/variable.cpp
std::shared_ptr<Node> try_get_grad_accumulator(const Variable& self) {
if (get_autograd_meta(self)) {
return get_autograd_meta(self)->grad_accumulator_.lock();
} else {
return nullptr;
}
}
//torch/csrc/autograd/variable.h
struct TORCH_API AutogradMeta : public c10::AutogradMetaInterface {
std::string name_;
Variable grad_;
std::shared_ptr<Node> grad_fn_;
std::weak_ptr<Node> grad_accumulator_;
std::vector<std::shared_ptr<FunctionPreHook>> hooks_;
std::shared_ptr<hooks_list> cpp_hooks_list;
// Only meaningful on leaf variables (must be false otherwise)
bool requires_grad_;
// Only meaningful on non-leaf variables (must be false otherwise)
bool retains_grad_;
bool is_view_;
// The "output number" of this variable; e.g., if this variable
// was the second output of a function, then output_nr == 1.
// We use this to make sure we can setup the backwards trace
// correctly when this variable is passed to another function.
uint32_t output_nr_;
// Mutex to ensure that concurrent read operations that modify internal
// state are still thread-safe. Used by grad_fn() and
// grad_accumulator().
std::mutex mutex_;
/// Sets the `requires_grad` property of `Variable`. This should be true for
/// leaf variables that want to accumulate gradients, and false for all other
/// variables.
void set_requires_grad(bool requires_grad, at::TensorImpl* self_impl) override {
TORCH_CHECK(
!requires_grad || isDifferentiableType(at::typeMetaToScalarType(self_impl->dtype())),
"Only Tensors of floating point and complex dtype can require gradients");
requires_grad_ = requires_grad;
}
bool requires_grad() const override {
return requires_grad_ || grad_fn_;
}
/// Accesses the gradient `Variable` of this `Variable`.
Variable& mutable_grad() override {
return grad_;
}
const Variable& grad() const override {
return grad_;
}
AutogradMeta(at::TensorImpl* self_impl = nullptr, bool requires_grad = false, Edge gradient_edge = Edge() ) {
grad_fn_ = std::move(gradient_edge.function);
requires_grad_ = false;
retains_grad_ = false;
is_view_ = false;
output_nr_ = gradient_edge.input_nr;
// set_requires_grad also checks error conditions.
if (requires_grad) {
TORCH_INTERNAL_ASSERT(self_impl);
set_requires_grad(requires_grad, self_impl);
}
TORCH_CHECK(
!grad_fn_ || !requires_grad_,
"requires_grad should be false if grad_fn is set");
}
};
//torch/csrc/autograd/python_engine.cpp
Engine& PythonEngine::get_python_engine() {
static PythonEngine engine;
// This is "probably" thread-safe because the flag is set in a fork handler
// before any threads are created, and this function is only called with the
// GIL held. However, using fork + threads is playing with fire so this is
// more of a "best effort" thing. For example, if the fork occurs while the
// backwards threads hold a lock, we'll probably deadlock in the engine
// destructor.
if (_reinitialize_engine) {
engine.release_workers();
engine.~PythonEngine();
new (&engine) torch::autograd::python::PythonEngine();
_reinitialize_engine = false;
}
return engine;
}
//torch/csrc/autograd/python_engine.cpp
variable_list PythonEngine::execute(
const edge_list& roots,
const variable_list& inputs,
bool keep_graph,
bool create_graph,
const edge_list& outputs) {
TORCH_CHECK(!PyGILState_Check(), "The autograd engine was called while holding the GIL. If you are using the C++ "
"API, the autograd engine is an expensive operation that does not require the "
"GIL to be held so you should release it with 'pybind11::gil_scoped_release no_gil;'"
". If you are not using the C++ API, please report a bug to the pytorch team.")
try {
return Engine::execute(roots, inputs, keep_graph, create_graph, outputs);
} catch (python_error& e) {
e.restore();
throw;
}
}
struct PythonEngine : public Engine {
static Engine& get_python_engine();
void thread_init(int device,
const std::shared_ptr<ReadyQueue>& ready_queue,
bool should_increment) override;
void thread_on_exception(
std::shared_ptr<GraphTask> graph_task,
const std::shared_ptr<Node>& fn,
std::exception& e) override;
variable_list execute(
const edge_list& roots,
const variable_list& inputs,
bool keep_graph,
bool create_graph,
const edge_list& outputs = {}) override;
std::shared_ptr<at::ivalue::Future> execute_with_graph_task(
const std::shared_ptr<GraphTask>& graph_task,
std::shared_ptr<Node> graph_root) override;
std::unique_ptr<AnomalyMetadata> make_anomaly_metadata() override;
private:
PythonEngine();
};
2.5: execute
auto Engine::execute(const edge_list& roots,
const variable_list& inputs,
bool keep_graph,
bool create_graph,
const edge_list& outputs) -> variable_list
//--------------------- 1 ----------------------------
void validate_outputs(
const edge_list& edges,
variable_list& grads,
const std::function<std::string(const std::string&)>& format_error)
//--------------------- 2 ----------------------------
void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue)
bool not_reentrant_backward_call = worker_device == NO_DEVICE;
auto graph_task = std::make_shared<GraphTask>(
/* keep_graph */ keep_graph,
/* create_graph */ create_graph,
/* depth */ not_reentrant_backward_call ? 0 : total_depth + 1,
/* cpu_ready_queue */ local_ready_queue);
//--------------------- 3 ----------------------------
auto graph_root = std::make_shared<GraphRoot>(roots, inputs);
auto Engine::compute_dependencies(Node* root, GraphTask& task) -> void
//--------------------- 4 ----------------------------
if (!outputs.empty()) {
grad_task->init_to_execute(*graph_root, outputs);
}
void GraphTask::init_to_execute(Node& graph_root, const edge_list& outputs)
//--------------------- 5 ----------------------------
std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task(
const std::shared_ptr<GraphTask>& graph_task,
std::shared_ptr<Node> graph_root)
grad = at::sum_to(std::move(grad), metadata.shape());
grad = grad.to(metadata.device());
/* Computes the number of dependencies for each function which requires grad */
auto Engine::compute_dependencies(Node* root, GraphTask& task) -> void {
// Just to make sure that they will never be added to the queue again
std::unordered_set<Node*> seen;
std::vector<Node*> queue { root };
// Queue contains all nodes that will start propagating gradients.
// We no longer have to expand functions that don't require grad.
auto& dependencies = task.dependencies_;
while (!queue.empty()) {
auto fn = queue.back(); queue.pop_back();
for (const auto& edge : fn->next_edges()) {
if (auto next_ptr = edge.function.get()) {
dependencies[next_ptr] += 1;
const bool was_inserted = seen.insert(next_ptr).second;
if (was_inserted) queue.push_back(next_ptr);
}
}
}
}
void GraphTask::init_to_execute(Node& graph_root, const edge_list& outputs) {
exec_info_[&graph_root].needed_ = true;
int output_idx = 0;
for (auto & output_edge : outputs) {
Node *output = output_edge.function.get();
auto & info = exec_info_[output];
if (!info.captures_)
info.captures_ = make_unique<std::vector<ExecInfo::Capture>>();
info.captures_->emplace_back(output_edge.input_nr, output_idx++);
}
captured_vars_.resize(output_idx);
// NB: this is an uglier version (recursion replaced with iteration) of the following code:
// is_needed = {}
// def compute_is_needed(fn):
// if fn not in is_needed:
// is_needed[fn] = any(compute_is_needed(next_edge)
// for next_edge in fn.next_edges)
// return is_needed[fn]
struct Frame {
Frame (Node *fn) : fn_(fn), next_next_fn_(0) {}
Node *fn_;
size_t next_next_fn_;
Node* get_next_fn() {
const auto & next = fn_->next_edges();
auto num_next = next.size();
while (next_next_fn_ < num_next) {
auto fn = next[next_next_fn_++].function.get();
if (fn) return fn;
}
return nullptr;
}
};
std::vector<Frame> stack;
std::unordered_set<Node*> seen;
for (const auto & input : graph_root.next_edges()) {
if (seen.count(input.function.get()) > 0) continue;
stack.emplace_back(input.function.get());
while (!stack.empty()) {
auto &frame = stack.back();
if (Node *next_fn = frame.get_next_fn()) {
if (/* bool unseen = */ seen.emplace(next_fn).second) {
stack.emplace_back(next_fn);
continue; // recurse
}
} else {
// NB: if we were using real recursion we could have saved some lookups
// using a return value from recursive call. It would make this manually unrolled
// version a lot more complicated, so I skipped that.
const auto & next_edges = frame.fn_->next_edges();
const bool needed = std::any_of(
next_edges.begin(), next_edges.end(), [&](const Edge& edge) {
auto it = exec_info_.find(edge.function.get());
return it != exec_info_.end() && it->second.should_execute();
});
exec_info_[frame.fn_].needed_ = needed;
stack.pop_back();
}
}
}
}
//--------------------- 1 ----------------------------
initialize_device_threads_pool()
//--------------------- 2 ----------------------------
ready_queue(graph_task->cpu_ready_queue_, at::kCPU)->push(
NodeTask(graph_task, std::move(graph_root), InputBuffer(0)));
//--------------------- 3 ----------------------------, 若为cpu线程
// We set the worker_device to CPU_DEVICE only if worker_device was previously
// NO_DEVICE. Setting it to CPU afterwards allow us to detect whether this is
// a re-entrant call or not.
set_device(CPU_DEVICE);
// set the graph_task owner to the current device
graph_task->owner_ = worker_device;
// The owning thread start to drive the engine execution with the GraphTask
// that has already been pushed to the current CPU thread's ready_queue
lock.unlock();
thread_main(graph_task);
TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed());
// reset the worker_device after the completion of the graph_task, this is so
// that the initial state of the engine remains the same across every backward()
// or grad() call, we don't need to reset local_ready_queue as we could possibly
// reuse it for new backward calls.
worker_device = NO_DEVICE;
//--------------------- 4 ----------------------------, 若为其它线程
// If worker_device is any devices (i.e. CPU, CUDA): this is a re-entrant
// backward call from that device.
graph_task->owner_ = worker_device;
if (current_depth >= max_recursion_depth_) {
// See Note [Reentrant backwards]
// If reached the max depth, switch to a different thread
add_thread_pool_task(graph_task);
} else {
// Total depth needs to be updated only in this codepath, since it is
// not used in the block above (when we call add_thread_pool_task).
// In the codepath above, GraphTask.reentrant_depth_ is used to
// bootstrap total_depth in the other thread.
++total_depth;
// Get back to work while we wait for our new graph_task to
// complete!
++current_depth;
lock.unlock();
thread_main(graph_task);
--current_depth;
--total_depth;
// The graph task should have completed and the associated future should
// be marked completed as well since 'thread_main' above is a call
// blocking an autograd engine thread.
TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed());
}
2.6: start_device_threads
// engine.cpp
auto Engine::start_device_threads() -> void {
// See Note [Allocating GPUs to autograd threads]
c10::DeviceIndex num_devices = 0;
for (const auto& impl_atomic : c10::impl::device_guard_impl_registry) {
auto* impl = impl_atomic.load();
if (impl) {
num_devices = std::max(num_devices, impl->deviceCount());
}
}
// allocate one thread for every GPU device (but colocate GPUs of different
// types), and pre-allocate the device_ready_queues_ to ensure safe reading on it.
device_ready_queues_ = std::vector<std::shared_ptr<ReadyQueue>>(num_devices);
for (auto& queue : device_ready_queues_) {
queue.reset(new ReadyQueue());
}
thread_pool_shared_ = std::make_shared<ThreadPoolShared>();
for (int i = 0; i < num_devices; ++i) {
std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true);
t.detach();
}
// Wait for the threads to start
{
std::unique_lock<std::mutex> lk(non_reentrant_device_thread_mutex_);
while(non_reentrant_device_thread_count_.load() != num_devices) {
non_reentrant_device_thread_condvar_.wait(lk);
}
}
}
void Engine::thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue, bool should_increment) {
if (should_increment) {
increment_non_reentrant_thread_count();
}
at::init_num_threads();
// Note [Allocating GPUs to autograd threads]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// What's our strategy here? Originally, the autograd engine was written
// with only CUDA in mind. We allocate one thread to handle all CPU
// operations, and a thread per CUDA device.
//
// But what if we have OTHER devices? There are two plausible
// strategies:
//
// - We can allocate threads equal to max(num_cuda_devices, num_xla_devices,
// ...) and colocate cuda device 0 with xla device 0
// - We can allocate threads equal to sum(num_cuda_devices, num_xla_devices,
// ...) keeping everyone separate.
//
// We don't have any good reason to prefer one or the other, so we've
// arbitrarily picked to colocate devices. Maybe the other approach is
// better.
set_device(device);
// initialize each device thread's thread local ready queue with the ready queue
// that is created before the thread initialization
init_local_ready_queue(ready_queue);
std::shared_ptr<GraphTask> graph_task = nullptr;
thread_main(graph_task);
if (should_increment) {
// Decrement the count during shutdown if we incremented earlier.
decrement_non_reentrant_thread_count();
}
}
void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue) {
if (ready_queue) {
// if ready_queue provided in the caller, use the caller's ready_queue to initialize local_ready_queue
local_ready_queue = std::move(ready_queue);
} else if (!local_ready_queue){
// otherwise if local_ready_queue not allocated, allocate a new ready_queue
local_ready_queue = std::make_shared<ReadyQueue>();
}
}
2.7: thread_main
// NOTE: graph_tasks do not necessarily form a stack. Imagine this
// case:
//
// +----> Eval1
// Root
// +----> Eval2
//
// Once Root is executed, both Eval1 and Eval2 are added to the ready queue.
// Next, Eval1 is run and this causes the worker to enter thread_main again.
// Then, it pops the next task from the queue, but at this point it is Eval2.
// It enters thread_main once again, but now with graph_task of Eval2, which is
// completely unrelated to that of Eval1 (it's not a recursive call).
// It's all ok and is handled right now, but it should be accounted for
// in case this code is to be changed.
//
// thread_main is used by:
// 1). autograd threads for devices (i.e. CUDA, XLA)
// 2). the caller/owning thread of the backward call on CPU (sync mode)
// 3). Renetrant backward that invoked by either 1) or 2)
// The exit conditions are different for the above three cases.
// For 1), we are spinning on running the thread_main on device autograd
// threads throughout the Engine lifetime, thread_main will get
// terminated during Engine destruction by pushing shutdown tasks
// For 2), the owning thread of the backward call drives the thread_main
// synchronously until the graph_task of that owning thread is
// completed and exit the thread_main to continue executing the
// result of caller's code.
// For 3), the reentrant backward that invokes
// thread_main, either from 1) or 2), will not spin and will exit as
// long as graph_task is completed and notify the owning thread as
// needed.
auto Engine::thread_main(const std::shared_ptr<GraphTask>& graph_task) -> void {
// When graph_task is nullptr, this is a long running thread that processes
// tasks (ex: device threads). When graph_task is non-null (ex: reentrant
// backwards, user thread), this function is expected to exit once that
// graph_task complete.
// local_ready_queue should already been initialized when we get into thread_main
TORCH_INTERNAL_ASSERT(local_ready_queue != nullptr);
while (graph_task == nullptr || !graph_task->future_result_->completed()) {
// local_graph_task represents the graph_task we retrieve from the queue.
// The outer graph_task represents the overall graph_task we need to execute
// for reentrant execution.
std::shared_ptr<GraphTask> local_graph_task;
{
// Scope this block of execution since NodeTask is not needed after this
// block and can be deallocated (release any references to grad tensors
// as part of inputs_).
NodeTask task = local_ready_queue->pop();
// This will only work if the worker is running a non backward task
// TODO Needs to be fixed this to work in all cases
if (task.isShutdownTask_) {
C10_LOG_API_USAGE_ONCE("torch.autograd.thread_shutdown");
break;
}
if (!(local_graph_task = task.base_.lock())) {
// GraphTask for function is no longer valid, skipping further
// execution.
continue;
}
if (task.fn_ && !local_graph_task->has_error_.load()) {
AutoGradMode grad_mode(local_graph_task->grad_mode_);
try {
// The guard sets the thread_local current_graph_task on construction
// and restores it on exit. The current_graph_task variable helps
// queue_callback() to find the target GraphTask to append final
// callbacks.
GraphTaskGuard guard(local_graph_task);
NodeGuard ndguard(task.fn_);
evaluate_function(local_graph_task, task.fn_.get(), task.inputs_, local_graph_task->cpu_ready_queue_);
} catch (std::exception& e) {
thread_on_exception(local_graph_task, task.fn_, e);
}
}
}
// Decrement the outstanding tasks.
--local_graph_task->outstanding_tasks_;
// Check if we've completed execution.
if (local_graph_task->completed()) {
local_graph_task->mark_as_completed_and_run_post_processing();
auto base_owner = local_graph_task->owner_;
// The current worker thread finish the graph_task, but the owning thread
// of the graph_task might be sleeping on pop() if it does not have work.
// So we need to send a dummy function task to the owning thread just to
// ensure that it's not sleeping, so that we can exit the thread_main.
// If it has work, it might see that graph_task->outstanding_tasks_ == 0
// before it gets to the task, but it's a no-op anyway.
//
// NB: This is not necessary if the current thread is the owning thread.
if (worker_device != base_owner) {
// Synchronize outstanding_tasks_ with queue mutex
std::atomic_thread_fence(std::memory_order_release);
ready_queue_by_index(local_graph_task->cpu_ready_queue_, base_owner)
->push(NodeTask(local_graph_task, nullptr, InputBuffer(0)));
}
}
}
}
2.8: evaluate_function
void Engine::evaluate_function(
std::shared_ptr<GraphTask>& graph_task,
Node* func,
InputBuffer& inputs,
const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
// If exec_info_ is not empty, we have to instrument the execution
//--------------------- 1 ----------------------------
auto& exec_info_ = graph_task->exec_info_;
if (!exec_info_.empty()) {
auto& fn_info = exec_info_.at(func);
if (auto* capture_vec = fn_info.captures_.get()) {
// Lock mutex for writing to graph_task->captured_vars_.
std::lock_guard<std::mutex> lock(graph_task->mutex_);
for (const auto& capture : *capture_vec) {
auto& captured_grad = graph_task->captured_vars_[capture.output_idx_];
captured_grad = inputs[capture.input_idx_];
for (auto& hook : capture.hooks_) {
captured_grad = (*hook)(captured_grad);
}
}
}
if (!fn_info.needed_) {
// Skip execution if we don't need to execute the function.
return;
}
}
//--------------------- 2 ----------------------------
// Switches to a function's CUDA stream (if applicable) before calling it
const auto opt_parent_stream = (*func).stream(c10::DeviceType::CUDA);
c10::OptionalStreamGuard parent_stream_guard{opt_parent_stream};
auto outputs = call_function(graph_task, func, inputs);
//--------------------- 3 ----------------------------
auto& fn = *func;
if (!graph_task->keep_graph_) {
fn.release_variables();
}
int num_outputs = outputs.size();
if (num_outputs == 0) { // Note: doesn't acquire the mutex
// Records leaf stream (if applicable)
// See note "Streaming backwards"
if (opt_parent_stream) {
std::lock_guard<std::mutex> lock(graph_task->mutex_);
graph_task->leaf_streams.emplace(*opt_parent_stream);
}
return;
}
if (AnomalyMode::is_enabled()) {
AutoGradMode grad_mode(false);
for (int i = 0; i < num_outputs; ++i) {
auto& output = outputs[i];
at::OptionalDeviceGuard guard(device_of(output));
if (output.defined() && isnan(output).any().item<uint8_t>()) {
std::stringstream ss;
ss << "Function '" << fn.name() << "' returned nan values in its " << i << "th output.";
throw std::runtime_error(ss.str());
}
}
}
//--------------------- 4 ----------------------------
// Lock mutex for the accesses to GraphTask dependencies_, not_ready_ and cpu_ready_queue_ below
std::lock_guard<std::mutex> lock(graph_task->mutex_);
for (int i = 0; i < num_outputs; ++i) {
auto& output = outputs[i];
const auto& next = fn.next_edge(i);
if (!next.is_valid()) continue;
// Check if the next function is ready to be computed
bool is_ready = false;
auto& dependencies = graph_task->dependencies_;
auto it = dependencies.find(next.function.get());
if (it == dependencies.end()) {
auto name = next.function->name();
throw std::runtime_error(std::string("dependency not found for ") + name);
} else if (--it->second == 0) {
dependencies.erase(it);
is_ready = true;
}
auto& not_ready = graph_task->not_ready_;
auto not_ready_it = not_ready.find(next.function.get());
if (not_ready_it == not_ready.end()) {
// Skip functions that aren't supposed to be executed
if (!exec_info_.empty()) {
auto it = exec_info_.find(next.function.get());
if (it == exec_info_.end() || !it->second.should_execute()) {
continue;
}
}
// No buffers have been allocated for the function
InputBuffer input_buffer(next.function->num_inputs());
// Accumulates into buffer
const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
input_buffer.add(next.input_nr,
std::move(output),
opt_parent_stream,
opt_next_stream);
if (is_ready) {
auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
queue->push(
NodeTask(graph_task, next.function, std::move(input_buffer)));
} else {
not_ready.emplace(next.function.get(), std::move(input_buffer));
}
} else {
// The function already has a buffer
auto &input_buffer = not_ready_it->second;
// Accumulates into buffer
const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
input_buffer.add(next.input_nr,
std::move(output),
opt_parent_stream,
opt_next_stream);
if (is_ready) {
auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
queue->push(
NodeTask(graph_task, next.function, std::move(input_buffer)));
not_ready.erase(not_ready_it);
}
}
}
}
2.9: call_function
static variable_list call_function(
std::shared_ptr<GraphTask>& graph_task,
Node* func,
InputBuffer& inputBuffer) {
bool prev_checkpoint_valid_state = checkpoint_valid;
checkpoint_valid =
graph_task->can_checkpoint() && prev_checkpoint_valid_state;
auto& fn = *func;
auto inputs =
call_pre_hooks(fn, InputBuffer::variables(std::move(inputBuffer)));
if (!graph_task->keep_graph_) {
fn.will_release_variables();
}
const auto has_post_hooks = !fn.post_hooks().empty();
variable_list outputs;
{
at::ThreadLocalStateGuard guard(graph_task->thread_locals_);
if (has_post_hooks) {
// In functions/accumulate_grad.cpp, there is some logic to check the
// conditions under which the incoming gradient can be stolen directly
// (which elides a deep copy) instead of cloned. One of these conditions
// is that the incoming gradient's refcount must be 1 (nothing else is
// referencing the same data). Stashing inputs_copy here bumps the
// refcount, so if post hooks are employed, it's actually still ok for
// accumulate_grad.cpp to steal the gradient if the refcount is 2.
//
// "new_grad.use_count() <= 1 + !post_hooks().empty()" in
// accumulate_grad.cpp accounts for this, but also creates a silent
// dependency between engine.cpp (ie, this particular engine
// implementation) and accumulate_grad.cpp.
//
// If you change the logic here, make sure it's compatible with
// accumulate_grad.cpp.
auto inputs_copy = inputs;
outputs = fn(std::move(inputs_copy));
} else {
outputs = fn(std::move(inputs));
}
}
03
static thread_local bool checkpoint_valid = true;
static thread_local int current_depth = 0;
static thread_local int total_depth = 0;
static thread_local std::shared_ptr<GraphTask> current_graph_task = nullptr;
static thread_local std::shared_ptr<ReadyQueue> local_ready_queue = nullptr;
Engine& Engine::get_base_engine() {
static Engine engine;
return engine;
}
class GraphTaskGuard {
public:
explicit GraphTaskGuard(std::shared_ptr<GraphTask> graph_task);
~GraphTaskGuard();
void restore_current_graph_task();
private:
std::shared_ptr<GraphTask> last_graph_task_;
};
本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。
“他山之石”历史文章
怎样才能让你的模型更加高效运行?
来自日本程序员的纯C++深度学习库tiny-dnn
MMTracking: OpenMMLab 一体化视频目标感知平台
深度学习和机器视觉top组都在研究什么
pytorch常见的坑汇总
pytorch 中张量基本操作
pytorch计算模型FLOPs和Params
保姆级教程:个人深度学习工作站配置指南
整理 Deep Learning 调参 tricks
Tensorflow模型保存方式大汇总
利用Tensorflow构建CNN图像多分类模型及图像参数、数据维度变化情况实例分析
pytorch中optimizer对loss的影响
使用PyTorch 1.6 for Android
神经网络解微分方程实例:三体问题
pytorch 实现双边滤波
更多他山之石专栏文章,
请点击文章底部“阅读原文”查看
分享、点赞、在看,给个三连击呗!