一站式解决:隐马尔可夫模型(HMM)全过程推导及实现
【导读】隐马尔可夫模型(Hidden Markov Model,HMM)是关于时许的概率模型,是一个生成模型,描述由一个隐藏的马尔科夫链随机生成不可观测的状态序列,每个状态生成一个观测,而由此产生一个观测序列。
定义抄完了,下面我们从一个简单的生成过程入手,顺便引出HMM的参数。
假设有4个盒子,每个盒子里面有不同数量的红、白两种颜色的球,具体如下表:
现在从这些盒子中抽取若干(
开始时,按照一个初始概率分布随机选择第一个盒子,这里将第一个盒子用
将
将
进一步,可以用一个矩阵(称为观测概率矩阵,也有资料叫做发射矩阵)来表示该表
其中
其中的
例如该分布是均匀分布的话,对应的向量就是
记录抽取的球的颜色后将其放回,然后在按照如下规则选择下一个盒子(
如果当前是盒子1,则选择盒子2 如果当前是盒子2或3,则分布以概率0.4和0.6选择前一个或后一个盒子 如果当前是盒子4,则各以0.5的概率停留在盒子4或者选择盒子3
import numpy as np
class HMM(object):
def __init__(self, N, M, pi=None, A=None, B=None):
self.N = N
self.M = M
self.pi = pi
self.A = A
self.B = B
def get_data_with_distribute(self, dist): # 根据给定的概率分布随机返回数据(索引)
r = np.random.rand()
for i, p in enumerate(dist):
if r < p: return i
r -= p
def generate(self, T: int):
'''
根据给定的参数生成观测序列
T: 指定要生成数据的数量
'''
z = self.get_data_with_distribute(self.pi) # 根据初始概率分布生成第一个状态
x = self.get_data_with_distribute(self.B[z]) # 生成第一个观测数据
result = [x]
for _ in range(T-1): # 依次生成余下的状态和观测数据
z = self.get_data_with_distribute(self.A[z])
x = self.get_data_with_distribute(self.B[z])
result.append(x)
return result
if __name__ == "__main__":
pi = np.array([.25, .25, .25, .25])
A = np.array([
[0, 1, 0, 0],
[.4, 0, .6, 0],
[0, .4, 0, .6],
[0, 0, .5, .5]])
B = np.array([
[.5, .5],
[.3, .7],
[.6, .4],
[.8, .2]])
hmm = HMM(4, 2, pi, A, B)
print(hmm.generate(10)) # 生成10个数据
# 生成结果如下
[0, 0, 1, 1, 1, 1, 0, 1, 0, 0] # 0代表红球,1代表白球
齐次马尔可夫假设:即任意时刻的状态只依赖于前一时刻的状态,与其他时刻的状态无关(当然,初始时刻的状态由参数π决定): 观测独立假设:即任意时刻的观测只依赖于该时刻的状态,与其他无关:概率计算算法
概率计算算法
即使不考虑内部的计算,这起码也是
也就是下图中标记的那一部分
接着,根据(2)式,还可以得到:
由此公式,遍历
class HMM(object):
def evaluate(self, X):
'''
根据给定的参数计算条件概率
X: 观测数据
'''
alpha = self.pi * self.B[:,X[0]]
for x in X[1:]:
# alpha_next = np.empty(self.N)
# for j in range(self.N):
# alpha_next[j] = np.sum(self.A[:,j] * alpha * self.B[j,x])
# alpha = alpha_next
alpha = np.sum(self.A * alpha.reshape(-1,1) * self.B[:,x].reshape(1,-1), axis=0)
return alpha.sum()
print(hmm.evaluate([0,0,1,1,0])) # 0.026862016
def evaluate_backward(self, X):
beta = np.ones(self.N)
for x in X[:0:-1]:
beta_next = np.empty(self.N)
for i in range(self.N):
beta_next[i] = np.sum(self.A[i,:] * self.B[:,x] * beta)
beta = beta_next
return np.sum(beta * self.pi * self.B[:,X[0]])
学习算法
最终得到:
同样,使用拉格朗日乘数法,构造目标函数
然后化简:
代入(7)式,得到
M是矩阵B的列数,前面已经定义过的,构造拉格朗日函数:
class HMM(object):
def fit(self, X):
'''
根据给定观测序列反推参数
'''
# 初始化参数 pi, A, B
self.pi = np.random.sample(self.N)
self.A = np.ones((self.N,self.N)) / self.N
self.B = np.ones((self.N,self.M)) / self.M
self.pi = self.pi / self.pi.sum()
T = len(X)
for _ in range(50):
# 按公式计算下一时刻的参数
alpha, beta = self.get_something(X)
gamma = alpha * beta
for i in range(self.N):
for j in range(self.N):
self.A[i,j] = np.sum(alpha[:-1,i]*beta[1:,j]*self.A[i,j]*self.B[j,X[1:]]) / gamma[:-1,i].sum()
for j in range(self.N):
for k in range(self.M):
self.B[j,k] = np.sum(gamma[:,j]*(X == k)) / gamma[:,j].sum()
self.pi = gamma[0] / gamma[-1].sum()
def get_something(self, X):
'''
根据给定数据与参数,计算所有时刻的前向概率和后向概率
'''
T = len(X)
alpha = np.zeros((T,self.N))
alpha[0,:] = self.pi * self.B[:,X[0]]
for i in range(T-1):
x = X[i+1]
alpha[i+1,:] = np.sum(self.A * alpha[i].reshape(-1,1) * self.B[:,x].reshape(1,-1), axis=0)
beta = np.ones((T,self.N))
for j in range(T-1,0,-1):
for i in range(self.N):
beta[j-1,i] = np.sum(self.A[i,:] * self.B[:,X[j]] * beta[j])
return alpha, beta
if __name__ == "__main__":
import matplotlib.pyplot as plt
def triangle_data(T): # 生成三角波形状的序列
data = []
for x in range(T):
x = x % 6
data.append(x if x <= 3 else 6-x)
return data
data = np.array(triangle_data(30))
hmm = HMM(10, 4)
hmm.fit(data) # 先根据给定数据反推参数
gen_obs = hmm.generate(30) # 再根据学习的参数生成数据
x = np.arange(30)
plt.scatter(x, gen_obs, marker='*', color='r')
plt.plot(x, data, color='g')
plt.show()
预测算法
class HMM(object):
def decode(self, X):
T = len(X)
x = X[0]
delta = self.pi * self.B[:,x]
varphi = np.zeros((T, self.N), dtype=int)
path = [0] * T
for i in range(1, T):
delta = delta.reshape(-1,1) # 转成一列方便广播
tmp = delta * self.A
varphi[i,:] = np.argmax(tmp, axis=0)
delta = np.max(tmp, axis=0) * self.B[:,X[i]]
path[-1] = np.argmax(delta)
# 回溯最优路径
for i in range(T-1,0,-1):
path[i-1] = varphi[i,path[i]]
return path
◆
精彩推荐
◆
推荐阅读
不足 20 行 Python 代码,高效实现 k-means 均值聚类算法
巨头垂涎却不能染指,loT 数据库风口已至
【建议收藏】数据中心服务器基础知识大全
从4个维度深度剖析闪电网络现状,在CKB上实现闪电网络的理由 | 博文精选
身边程序员同事竟说自己敲代码速度快!Ctrl+C、Ctrl+V ?
100 美元一行代码,开源软件到底咋赚钱?
你点的每个“在看”,我都认真当成了AI