DCRNN模型源码解析
点关注,不迷路,用心整理每一篇算法干货~
完整代码:
https://github.com/liyaguang/DCRNN/blob/master/model/dcrnn_cell.py
使用方法:将上述目录下的代码拷贝到自己的项目空间下,即完成了DCGRUcell类的定义。然后可以和调用RNNcell一样,使用tf.contrib.rnn.MultiRNNCell等函数初始化DCGRUcell,详见第4节。
注意点:输入样本的维度,对于普通的RNN,输入维度为[batch_size, seq_len, hidden_size],而对于DCRNN,一次输入多个序列,输入维度为[batch_size * num_node, seq_len, hidden_size]。
值得注意的是,后续很多Spatial-Temporal开源代码,都是基于DCRNN的代码基础上搭建的,包括数据处理、基础的图卷积模型实现等。这篇文章重点介绍DCRNN模型内部的实现代码逻辑。
DCRNN网络的实现依赖于每个DCRNN的单元(DCGRUCell)。在DCGRUCell的整体实现上,继承了RNNCell,只要我们在DCGRUCell的代码中把RNNCell的call方法等重写,即可实现DCRNN单元。call方法是核心,定义了RNNcell每个时间步的计算逻辑,即通过当前时刻输入的特征、上一时刻的state,如何计算出下一时刻的state和当前时刻的输出。DCRNN cell的整体代码骨架如下,如果想自己实现其他对RNN的魔改,也可以基于这个结构编写:
class DCGRUCell(RNNCell):
"""Graph Convolution Gated Recurrent Unit cell.
"""
def call(self, inputs, **kwargs):
pass
def compute_output_shape(self, input_shape):
pass
def __init__(self, num_units):
pass
@property
def state_size(self):
return self._num_nodes * self._num_units
@property
def output_size(self):
output_size = self._num_nodes * self._num_units
if self._num_proj is not None:
output_size = self._num_nodes * self._num_proj
return output_size
def __call__(self, inputs, state, scope=None):
pass
call方法即RNN每个时刻的核心运行逻辑。在普通的RNN中,每个时刻的输入为当前时刻的特征,以及上一时刻的状态,RNN会对这两部分进行一系列计算,得到当前时刻的输出和用于下一个时刻的状态。
在DCRNN中,也遵循着这个逻辑,只是在具体计算逻辑上有一定差异,下面是call方法的主体代码:
def __call__(self, inputs, state, scope=None):
"""Gated recurrent unit (GRU) with Graph Convolution.
:param inputs: (B, num_nodes * input_dim)
:return
- Output: A `2-D` tensor with shape `[batch_size x self.output_size]`.
- New state: Either a single `2-D` tensor, or a tuple of tensors matching
the arity and shapes of `state`
"""
with tf.variable_scope(scope or "dcgru_cell"):
with tf.variable_scope("gates"): # Reset gate and update gate.
output_size = 2 * self._num_units
# We start with bias of 1.0 to not reset and not update.
if self._use_gc_for_ru:
fn = self._gconv
else:
fn = self._fc
value = tf.nn.sigmoid(fn(inputs, state, output_size, bias_start=1.0))
value = tf.reshape(value, (-1, self._num_nodes, output_size))
r, u = tf.split(value=value, num_or_size_splits=2, axis=-1)
r = tf.reshape(r, (-1, self._num_nodes * self._num_units))
u = tf.reshape(u, (-1, self._num_nodes * self._num_units))
with tf.variable_scope("candidate"):
c = self._gconv(inputs, r * state, self._num_units)
if self._activation is not None:
c = self._activation(c)
output = new_state = u * state + (1 - u) * c
if self._num_proj is not None:
with tf.variable_scope("projection"):
w = tf.get_variable('w', shape=(self._num_units, self._num_proj))
batch_size = inputs.get_shape()[0].value
output = tf.reshape(new_state, shape=(-1, self._num_units))
output = tf.reshape(tf.matmul(output, w), shape=(batch_size, self.output_size))
return output, new_stateDCRNN基于GRU模型的基础上实现,这里面有一个参数_use_gc_for_ru,可以控制是在输入部分是否使用图卷积。对于普通GRU,计算逻辑很简单:将上一时刻state和当前时刻输入拼接后,使用全连接+isgmoid函数生成reset gate(r)和update gate(u),其中reset state对上一时刻state做处理,结合当前时刻输入得到当前时刻临时的state;而reset gate则根据一定比例保留上一时刻的state和当前时刻的state,GRU的基础公式如下:
在上面的代码中,把_gconv全部换成_fc,即简单的全连接,就是普通的GRU。DCRNN相比GRU的主要区别即在于特征输入部分可以选择加入图卷积、当前时刻的临时state可以选择加入图卷积,核心函数就是_gconv,其计算公式如下:
DCRNN和GRU的最大区别,一个在于输入,另一个就在于fg和gconv。首先在输入上,GRU的输入是[batch_size, seq_len, hidden_size]的,而DCRNN则是每次输入一批样本,即[batch_size * num_nodes, sseq_len, hiddden_size]。如果使用普通的fc,只要不展开第一个维度,每个样本单独独立的用全连接处理就可以了,普通fc的代码如下:
def _fc(self, inputs, state, output_size, bias_start=0.0):
dtype = inputs.dtype
batch_size = inputs.get_shape()[0].value
inputs = tf.reshape(inputs, (batch_size * self._num_nodes, -1))
state = tf.reshape(state, (batch_size * self._num_nodes, -1))
inputs_and_state = tf.concat([inputs, state], axis=-1)
input_size = inputs_and_state.get_shape()[-1].value
weights = tf.get_variable(
'weights', [input_size, output_size], dtype=dtype,
initializer=tf.contrib.layers.xavier_initializer())
value = tf.nn.sigmoid(tf.matmul(inputs_and_state, weights))
biases = tf.get_variable("biases", [output_size], dtype=dtype,
initializer=tf.constant_initializer(bias_start, dtype=dtype))
value = tf.nn.bias_add(value, biases)
return value升级成DCRNN,则需要把input的batch_ize和num_node两个维度拆分开,加入图卷积,代码如下:
def _gconv(self, inputs, state, output_size, bias_start=0.0):
"""Graph convolution between input and the graph matrix.
:param args: a 2D Tensor or a list of 2D, batch x n, Tensors.
:param output_size:
:param bias:
:param bias_start:
:param scope:
:return:
"""
# Reshape input and state to (batch_size, num_nodes, input_dim/state_dim)
batch_size = inputs.get_shape()[0].value
//将num_nodes维度拆开,用于进行后续的图卷积
inputs = tf.reshape(inputs, (batch_size, self._num_nodes, -1))
state = tf.reshape(state, (batch_size, self._num_nodes, -1))
inputs_and_state = tf.concat([inputs, state], axis=2)
input_size = inputs_and_state.get_shape()[2].value
dtype = inputs.dtype
x = inputs_and_state
x0 = tf.transpose(x, perm=[1, 2, 0]) # (num_nodes, total_arg_size, batch_size)
x0 = tf.reshape(x0, shape=[self._num_nodes, input_size * batch_size])
x = tf.expand_dims(x0, axis=0)
scope = tf.get_variable_scope()
with tf.variable_scope(scope):
if self._max_diffusion_step == 0:
pass
else:
//support对应不同的图卷积计算方法,根据邻接矩阵生成
for support in self._supports:
x1 = tf.sparse_tensor_dense_matmul(support, x0)
x = self._concat(x, x1)
//循环多次,模拟扩散过程
for k in range(2, self._max_diffusion_step + 1):
x2 = 2 * tf.sparse_tensor_dense_matmul(support, x1) - x0
x = self._concat(x, x2)
x1, x0 = x2, x1
num_matrices = len(self._supports) * self._max_diffusion_step + 1 # Adds for x itself.
x = tf.reshape(x, shape=[num_matrices, self._num_nodes, input_size, batch_size])
x = tf.transpose(x, perm=[3, 1, 2, 0]) # (batch_size, num_nodes, input_size, order)
x = tf.reshape(x, shape=[batch_size * self._num_nodes, input_size * num_matrices])
weights = tf.get_variable(
'weights', [input_size * num_matrices, output_size], dtype=dtype,
initializer=tf.contrib.layers.xavier_initializer())
x = tf.matmul(x, weights) # (batch_size * self._num_nodes, output_size)
biases = tf.get_variable("biases", [output_size], dtype=dtype,
initializer=tf.constant_initializer(bias_start, dtype=dtype))
x = tf.nn.bias_add(x, biases)
# Reshape res back to 2D: (batch_size, num_node, state_dim) -> (batch_size, num_node * state_dim)
return tf.reshape(x, [batch_size, self._num_nodes * output_size])其中support代表不同的卷积计算方法,文中都做了实现,这部分也可以比较容易的应用到其他模型中:
supports = []
if filter_type == "laplacian":
supports.append(utils.calculate_scaled_laplacian(adj_mx, lambda_max=None))
elif filter_type == "random_walk":
supports.append(utils.calculate_random_walk_matrix(adj_mx).T)
elif filter_type == "dual_random_walk":
supports.append(utils.calculate_random_walk_matrix(adj_mx).T)
supports.append(utils.calculate_random_walk_matrix(adj_mx.T).T)
else:
supports.append(utils.calculate_scaled_laplacian(adj_mx))
for support in supports:
self._supports.append(self._build_sparse_matrix(support))最终的模型以DCRNN的cell为基础,建立起整个Encoder-Decoder结构,主要代码如下。由于DCRNN的cell继承了RNNcell,因此可以直接调用tf.contrib.rnn.MultiRNNCell构造多层的RNN网络,以及tf.contrib.rnn.static_rnn传入RNNcell创建神经网络,使用legacy_seq2seq.rnn_decoder构造Encoder-Decoder的架构,输出最终结果。
# Input (batch_size, timesteps, num_sensor, input_dim)
self._inputs = tf.placeholder(tf.float32, shape=(batch_size, seq_len, num_nodes, input_dim), name='inputs')
# Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension.
self._labels = tf.placeholder(tf.float32, shape=(batch_size, horizon, num_nodes, input_dim), name='labels')
# GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * input_dim))
GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * output_dim))
cell = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, num_nodes=num_nodes,
filter_type=filter_type)
cell_with_projection = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, num_nodes=num_nodes,
num_proj=output_dim, filter_type=filter_type)
encoding_cells = [cell] * num_rnn_layers
decoding_cells = [cell] * (num_rnn_layers - 1) + [cell_with_projection]
encoding_cells = tf.contrib.rnn.MultiRNNCell(encoding_cells, state_is_tuple=True)
decoding_cells = tf.contrib.rnn.MultiRNNCell(decoding_cells, state_is_tuple=True)
global_step = tf.train.get_or_create_global_step()
# Outputs: (batch_size, timesteps, num_nodes, output_dim)
with tf.variable_scope('DCRNN_SEQ'):
inputs = tf.unstack(tf.reshape(self._inputs, (batch_size, seq_len, num_nodes * input_dim)), axis=1)
labels = tf.unstack(
tf.reshape(self._labels[..., :output_dim], (batch_size, horizon, num_nodes * output_dim)), axis=1)
labels.insert(0, GO_SYMBOL)
def _loop_function(prev, i):
if is_training:
# Return either the model's prediction or the previous ground truth in training.
if use_curriculum_learning:
c = tf.random_uniform((), minval=0, maxval=1.)
threshold = self._compute_sampling_threshold(global_step, cl_decay_steps)
result = tf.cond(tf.less(c, threshold), lambda: labels[i], lambda: prev)
else:
result = labels[i]
else:
# Return the prediction of the model in testing.
result = prev
return result
_, enc_state = tf.contrib.rnn.static_rnn(encoding_cells, inputs, dtype=tf.float32)
outputs, final_state = legacy_seq2seq.rnn_decoder(labels, enc_state, decoding_cells,
loop_function=_loop_function)
# Project the output to output_dim.
outputs = tf.stack(outputs[:-1], axis=1)
self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), name='outputs')