其他
【强基固本】目标跟踪初探(DeepSORT)
“强基固本,行稳致远”,科学研究离不开理论基础,人工智能学科更是需要数学、物理、神经科学等基础学科提供有力支撑,为了紧扣时代脉搏,我们推出“强基固本”专栏,讲解AI领域的基础知识,为你的科研学习提供助力,夯实理论基础,提升原始创新能力,敬请关注。
01
目前主流的目标跟踪算法都是基于Tracking-by-Detecton策略,即基于目标检测的结果来进行目标跟踪。DeepSORT运用的就是这个策略,上面的视频是DeepSORT对人群进行跟踪的结果,每个bbox左上角的数字是用来标识某个人的唯一ID号。
02
首先,先介绍一下什么是分配问题(Assignment Problem):假设有N个人和N个任务,每个任务可以任意分配给不同的人,已知每个人完成每个任务要花费的代价不尽相同,那么如何分配可以使得总的代价最小。
import numpy as np
from sklearn.utils.linear_assignment_ import linear_assignment
from scipy.optimize import linear_sum_assignment
cost_matrix = np.array([
[15,40,45],
[20,60,35],
[20,40,25]
])
matches = linear_assignment(cost_matrix)
print('sklearn API result:\n', matches)
matches = linear_sum_assignment(cost_matrix)
print('scipy API result:\n', matches)
"""Outputs
sklearn API result:
[[0 1]
[1 0]
[2 2]]
scipy API result:
(array([0, 1, 2], dtype=int64), array([1, 0, 2], dtype=int64))
"""
# linear_assignment.py
def min_cost_matching(distance_metric, max_distance, tracks, detections,
track_indices=None, detection_indices=None):
...
# 计算代价矩阵
cost_matrix = distance_metric(tracks, detections, track_indices, detection_indices)
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
# 执行匈牙利算法,得到匹配成功的索引对,行索引为tracks的索引,列索引为detections的索引
row_indices, col_indices = linear_assignment(cost_matrix)
matches, unmatched_tracks, unmatched_detections = [], [], []
# 找出未匹配的detections
for col, detection_idx in enumerate(detection_indices):
if col not in col_indices:
unmatched_detections.append(detection_idx)
# 找出未匹配的tracks
for row, track_idx in enumerate(track_indices):
if row not in row_indices:
unmatched_tracks.append(track_idx)
# 遍历匹配的(track, detection)索引对
for row, col in zip(row_indices, col_indices):
track_idx = track_indices[row]
detection_idx = detection_indices[col]
# 如果相应的cost大于阈值max_distance,也视为未匹配成功
if cost_matrix[row, col] > max_distance:
unmatched_tracks.append(track_idx)
unmatched_detections.append(detection_idx)
else:
matches.append((track_idx, detection_idx))
return matches, unmatched_tracks, unmatched_detections
03
卡尔曼滤波被广泛应用于无人机、自动驾驶、卫星导航等领域,简单来说,其作用就是基于传感器的测量值来更新预测值,以达到更精确的估计。
# kalman_filter.py
def predict(self, mean, covariance):
"""Run Kalman filter prediction step.
Parameters
----------
mean: ndarray, the 8 dimensional mean vector of the object state at the previous time step.
covariance: ndarray, the 8x8 dimensional covariance matrix of the object state at the previous time step.
Returns
-------
(ndarray, ndarray), the mean vector and covariance matrix of the predicted state.
Unobserved velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-2,
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[3],
1e-5,
self._std_weight_velocity * mean[3]]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel])) # 初始化噪声矩阵Q
mean = np.dot(self._motion_mat, mean) # x' = Fx
covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov # P' = FPF(T) + Q
return mean, covariance
# kalman_filter.py
def project(self, mean, covariance):
"""Project state distribution to measurement space.
Parameters
----------
mean: ndarray, the state's mean vector (8 dimensional array).
covariance: ndarray, the state's covariance matrix (8x8 dimensional).
Returns
-------
(ndarray, ndarray), the projected mean and covariance matrix of the given state estimate.
"""
std = [self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-1,
self._std_weight_position * mean[3]]
innovation_cov = np.diag(np.square(std)) # 初始化噪声矩阵R
mean = np.dot(self._update_mat, mean) # 将均值向量映射到检测空间,即Hx'
covariance = np.linalg.multi_dot((
self._update_mat, covariance, self._update_mat.T)) # 将协方差矩阵映射到检测空间,即HP'H^T
return mean, covariance + innovation_cov
def update(self, mean, covariance, measurement):
"""Run Kalman filter correction step.
Parameters
----------
mean: ndarra, the predicted state's mean vector (8 dimensional).
covariance: ndarray, the state's covariance matrix (8x8 dimensional).
measurement: ndarray, the 4 dimensional measurement vector (x, y, a, h), where (x, y) is the
center position, a the aspect ratio, and h the height of the bounding box.
Returns
-------
(ndarray, ndarray), the measurement-corrected state distribution.
"""
# 将mean和covariance映射到检测空间,得到Hx'和S
projected_mean, projected_cov = self.project(mean, covariance)
# 矩阵分解(这一步没看懂)
chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
# 计算卡尔曼增益K(这一步没看明白是如何对应上公式5的,求线代大佬指教)
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
check_finite=False).T
# z - Hx'
innovation = measurement - projected_mean
# x = x' + Ky
new_mean = mean + np.dot(innovation, kalman_gain.T)
# P = (I - KH)P'
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
04
Frame 1:检测器又检测到了3个detections,对于Frame 0中的tracks,先进行预测得到新的tracks,然后使用匈牙利算法将新的tracks与detections进行匹配,得到(track, detection)匹配对,最后用每对中的detection更新对应的track
# demo_yolo3_deepsort.py
def detect(self):
while self.vdo.grab():
...
bbox_xcycwh, cls_conf, cls_ids = self.yolo3(im) # 检测到的bbox[cx,cy,w,h],置信度,类别id
if bbox_xcycwh is not None:
# 筛选出人的类别
mask = cls_ids == 0
bbox_xcycwh = bbox_xcycwh[mask]
bbox_xcycwh[:, 3:] *= 1.2
cls_conf = cls_conf[mask]
...
# deep_sort.py
def update(self, bbox_xywh, confidences, ori_img):
self.height, self.width = ori_img.shape[:2]
# 提取每个bbox的feature
features = self._get_features(bbox_xywh, ori_img)
# [cx,cy,w,h] -> [x1,y1,w,h]
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
# 过滤掉置信度小于self.min_confidence的bbox,生成detections
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i,conf in enumerate(confidences) if conf > self.min_confidence]
# NMS (这里self.nms_max_overlap的值为1,即保留了所有的detections)
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
detections = [detections[i] for i in indices]
...
# track.py
def predict(self, kf):
"""Propagate the state distribution to the current time step using a
Kalman filter prediction step.
Parameters
----------
kf: The Kalman filter.
"""
self.mean, self.covariance = kf.predict(self.mean, self.covariance) # 预测
self.age += 1 # 该track自出现以来的总帧数加1
self.time_since_update += 1 # 该track自最近一次更新以来的总帧数加1
# tracker.py
def _match(self, detections):
def gated_metric(racks, dets, track_indices, detection_indices):
"""
基于外观信息和马氏距离,计算卡尔曼滤波预测的tracks和当前时刻检测到的detections的代价矩阵
"""
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices]
# 基于外观信息,计算tracks和detections的余弦距离代价矩阵
cost_matrix = self.metric.distance(features, targets)
# 基于马氏距离,过滤掉代价矩阵中一些不合适的项 (将其设置为一个较大的值)
cost_matrix = linear_assignment.gate_cost_matrix(self.kf, cost_matrix, tracks,
dets, track_indices, detection_indices)
return cost_matrix
# 区分开confirmed tracks和unconfirmed tracks
confirmed_tracks = [i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [i for i, t in enumerate(self.tracks) if not t.is_confirmed()]
# 对confirmd tracks进行级联匹配
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
# 对级联匹配中未匹配的tracks和unconfirmed tracks中time_since_update为1的tracks进行IOU匹配
iou_track_candidates = unconfirmed_tracks + [k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1]
unmatched_tracks_a = [k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1]
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
# 整合所有的匹配对和未匹配的tracks
matches = matches_a + matches_b
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
# 级联匹配源码 linear_assignment.py
def matching_cascade(distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
...
unmatched_detections = detection_indice
matches = []
# 由小到大依次对每个level的tracks做匹配
for level in range(cascade_depth):
# 如果没有detections,退出循环
if len(unmatched_detections) == 0:
break
# 当前level的所有tracks索引
track_indices_l = [k for k in track_indices if
tracks[k].time_since_update == 1 + level]
# 如果当前level没有track,继续
if len(track_indices_l) == 0:
continue
# 匈牙利匹配
matches_l, _, unmatched_detections = min_cost_matching(distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
# tracker.py
def update(self, detections):
"""Perform measurement update and track management.
Parameters
----------
detections: List[deep_sort.detection.Detection]
A list of detections at the current time step.
"""
# 得到匹配对、未匹配的tracks、未匹配的dectections
matches, unmatched_tracks, unmatched_detections = self._match(detections)
# 对于每个匹配成功的track,用其对应的detection进行更新
for track_idx, detection_idx in matches:
self.tracks[track_idx].update(self.kf, detections[detection_idx])
# 对于未匹配的成功的track,将其标记为丢失
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
# 对于未匹配成功的detection,初始化为新的track
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx])
...
参考
本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。
“强基固本”历史文章
科普帖:深度学习中GPU和显存分析
深度学习笔记:常用的模型评估指标
三次样条(cubic spline)插值
运动规划丨轨迹规划丨基于改进Dijkstra算法的轨迹平滑方法
神经网络15分钟入门!足够通俗易懂了吧
5分钟理解Focal Loss与GHM——解决样本不平衡利器
详解残差网络
MMD:最大均值差异
目标跟踪系列--KCF算法
神经网络常用卷积总结
从零开始的自然语言处理(基于隐马尔可夫模型的词性标注)
写给新手炼丹师:2021版调参上分手册
机器学习-嵌入Embedding
各种Normalization
更多强基固本专栏文章,
请点击文章底部“阅读原文”查看
分享、点赞、在看,给个三连击呗!