突发!员工跳楼!只拿低保工资!央企设计院集体罢工!

突发!北京某院集体罢工!

淄博向东,惠泊向西:在人民与人民币之间,惠泊停车选择了人民币

【少儿禁】马建《亮出你的舌苔或空空荡荡》

10部适合女性看的唯美情色电影

生成图片,分享到微信朋友圈

自由微信安卓APP发布,立即下载! | 提交文章网址
查看原文

机器学习实战之隐马尔可夫模型

糖甜甜甜 DataGo数据狗 2022-07-01

1 HMM模型介绍

隐马尔可夫模型是可用于标注问题的统计学习模型,描述有隐藏的马尔可夫链随机生成观测序列的过程,属于生成模型,隐马尔可夫模型在语音识别、自然语言处理、生物信息、模式识别等领域有着广泛的应用。具体的推导过程参考李航的《统计学习方法》。

2 算法流程

针对HMM中的三个问题,提出以下三个算法解决:

(1) 评估问题:前向算法与后向算法;

(2) 学习问题:Baum-Welch算法(EM算法);

(3) 解码问题:Viterbi算法。

3 代码

import numpy as npclass HMM():    """    hidden markov model    attributes:    --------------------    A:numpy.ndarray    state transition probability matrix    B: numpy.ndarray    output emission probability matrix with shape(N, number of output type)    pi:numpy.ndarray    initial state probability vector    """    def __init__(self, A, B, pi):        self.A = A        self.B = B        self.pi = pi    def simulate(self, T):        def draw_from(probs):            return np.where(np.random.multinomial(1, probs) == 1)[0][0]        observations = np.zeros(T, dtype=int)        states = np.zeros(T, dtype=int)        states[0] = draw_from(self.pi)        observations[0] = draw_from(self.B[states[0], :])        for t in range(1, T):            states[t] = draw_from(self.A[states[t-1], :])            observations[t] = draw_from(self.B[states[t], :])        return states, observations    def forward(self, obs_seq):        """前向算法"""        N = self.A.shape[0]        T = len(obs_seq)        F = np.zeros((N, T))        F[:, 0] = self.pi * self.B[:, obs_seq[0]]        for t in range(1, T):            for n in range(N):                F[n, t] = np.dot(F[:, t-1], (self.A[:, n])) * self.B[n, obs_seq[t]]        return F    def backward(self, obs_seq):        """后向算法"""        N = self.A.shape[0]        T = len(obs_seq)        M = np.zeros((N, T))        M[:, T-1] = 1        # 或者M[:, -1:] = 1,列上表示最后一行        for t in reversed(range(T-1)):            for n in range(N):                M[n, t] = np.dot(self.A[n, :], M[:, t+1]) * self.B[n, obs_seq[t+1]]        return M    def EM(self, observation, criterion=0.05):        """EM算法进行参数学习"""        n_state = self.A.shape[0]        n_sample = len(observation)        done = 1        while done:            Alpha = self.forward(observation)            Beta = self.backward(observation)            xi = np.zeros((n_state, n_state, n_sample-1))            gamma = np.zeros((n_state, n_sample))            for t in range(n_sample-1):                denom = np.sum(np.dot(Alpha[:, t].T, self.A) * self.B[:, observation[t+1]].T * Beta[:, t+1].T)                sum_gamma1 = np.sum(Alpha[:, t] * Beta[:, t])                for i in range(n_state):                    numer = Alpha[i, t] * self.A[i, :] * self.B[:, observation[t+1]].T * Beta[:, t+1].T                    xi[i, :, t] = numer/denom                gamma[i, t] = Alpha[i, t] * Beta[i, t] / sum_gamma1            last_col = Alpha[:, n_sample-1].T * Beta[:, n_sample-1]            gamma[:, n_sample-1] = last_col / np.sum(last_col)            # 更新参数            n_pi = gamma[:, 0]            n_A = np.sum(xi, 2) / np.sum(gamma[:, :-1], 1)            n_B = np.copy(self.B)            num_level = self.B.shape[1]            sum_gamma = 0            a = 0            for lev in range(num_level):                for h in range(n_state):                    for t in range(n_sample):                        sum_gamma = sum_gamma + gamma[h, t]                        if observation[t] == lev:                            a = a + gamma[h, t]                    n_B[h, lev] = a / sum_gamma                    a = 0            # 检查阈值            if np.max(np.abs(self.pi-n_pi)) < criterion and np.max(np.abs(self.B-n_B)) < criterion \                    and np.max(np.abs(self.A-n_A)) < criterion:                done = 0            self.A, self.B, self.pi = n_A, n_B, n_pi        return self.A, self.B, self.pi    def viterbi(self, obs_seq):        """viterbi算法预测状态序列"""        N = self.A.shape[0]        T = len(obs_seq)        P = np.zeros((N, T))        prev_point = np.zeros((N, T))        prev_point[:, 0] = 0        P[:, 0] = self.pi * self.B[:, obs_seq[0]]        for t in range(1, T):            for n in range(N):                P[n, t] = np.max(P[:, t - 1] * self.A[:, n]) * self.B[n, obs_seq[t]]                prev_point[n, t] = np.argmax(P[:, t - 1] * self.A[:, n] * self.B[n, obs_seq[t]])        return P, prev_point    def build_path(self, obs_seq):        """return the optimal path"""        P, prev_point = self.viterbi(obs_seq)        T = len(obs_seq)        opt_path = np.zeros(T)        last_state = np.argmax(P[:, T-1])        opt_path[T-1] = last_state        for t in reversed(range(T-1)):            opt_path[t] = prev_point[int(opt_path[t+1]), t+1]        last_path = reversed(opt_path)        return last_pathif __name__ == '__main__':    # 用《统计学习方法》中的案例进行测试    A = np.array([[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]])    B = np.array([[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]])    pi = np.array([0.2, 0.4, 0.4])    test1 = HMM(A, B, pi)    obs_seq = [0, 1, 0]    print(test1.forward(obs_seq))    print(test1.backward(obs_seq))    print(test1.viterbi(obs_seq))    print(test1.build_path(obs_seq))    print(test1.EM(obs_seq))

文章有问题?点此查看未经处理的缓存