查看原文
其他

NLP.TM[34] | 纠错:pycorrector的候选召回

机智的叉烧 CS的陋室 2022-08-08

【NLP.TM】


本人有关自然语言处理和文本挖掘方面的学习和笔记,欢迎大家关注。


往期回顾


上一期和大家分享了pycorrector的纠错中的错误检测,这次让我来和大家分享一下pycorrector的召回和排序。

对于这个东西不太了解的,可以先看看我上期的文章。NLP.TM[33] | 纠错:pycorrector的错误检测

召回

召回阶段在这里实现的:

def correct(self, text, include_symbol=True, num_fragment=1, threshold=57, **kwargs):
    """
    句子改错
    :param text: str, query 文本
    :param include_symbol: bool, 是否包含标点符号
    :param num_fragment: 纠错候选集分段数, 1 / (num_fragment + 1)
    :param threshold: 语言模型纠错ppl阈值
    :param kwargs: ...
    :return: text (str)改正后的句子, list(wrong, right, begin_idx, end_idx)
    """

    text_new = ''
    details = []
    self.check_corrector_initialized()
    # 编码统一,utf-8 to unicode
    text = convert_to_unicode(text)
    # 长句切分为短句
    blocks = self.split_2_short_text(text, include_symbol=include_symbol)
    for blk, idx in blocks:
        maybe_errors = self.detect_short(blk, idx)
        for cur_item, begin_idx, end_idx, err_type in maybe_errors:
            # 纠错,逐个处理
            before_sent = blk[:(begin_idx - idx)]
            after_sent = blk[(end_idx - idx):]

            # 困惑集中指定的词,直接取结果
            if err_type == ErrorType.confusion:
                corrected_item = self.custom_confusion[cur_item]
            else:
                # 取得所有可能正确的词
                candidates = self.generate_items(cur_item, fragment=num_fragment)
                if not candidates:
                    continue
                corrected_item = self.get_lm_correct_item(cur_item, candidates, before_sent, after_sent,
                                                          threshold=threshold)
            # output
            if corrected_item != cur_item:
                blk = before_sent + corrected_item + after_sent
                detail_word = [cur_item, corrected_item, begin_idx, end_idx]
                details.append(detail_word)
        text_new += blk
    details = sorted(details, key=operator.itemgetter(2))
    return text_new, details

主要就是通过candidates = self.generate_items(cur_item, fragment=num_fragment)来召回可能是正确答案的候选集。然后来看看怎么做的。

这块的实现代码不短,但是相似的东西倒是有不少,我们先来看一个完整版的:

def generate_items(self, word, fragment=1):
    """
    生成纠错候选集
    :param word:
    :param fragment: 分段
    :return:
    """

    self.check_corrector_initialized()
    # 1字
    candidates_1 = []
    # 2字
    candidates_2 = []
    # 多于2字
    candidates_3 = []

    # same pinyin word
    candidates_1.extend(self._confusion_word_set(word))
    # custom confusion word
    candidates_1.extend(self._confusion_custom_set(word))
    # same pinyin char
    if len(word) == 1:
        # same one char pinyin
        confusion = [i for i in self._confusion_char_set(word[0]) if i]
        candidates_1.extend(confusion)
    if len(word) == 2:
        # same first char pinyin
        confusion = [i + word[1:] for i in self._confusion_char_set(word[0]) if i]
        candidates_2.extend(confusion)
        # same last char pinyin
        confusion = [word[:-1] + i for i in self._confusion_char_set(word[-1]) if i]
        candidates_2.extend(confusion)
    if len(word) > 2:
        # same mid char pinyin
        confusion = [word[0] + i + word[2:] for i in self._confusion_char_set(word[1])]
        candidates_3.extend(confusion)

        # same first word pinyin
        confusion_word = [i + word[-1for i in self._confusion_word_set(word[:-1])]
        candidates_3.extend(confusion_word)

        # same last word pinyin
        confusion_word = [word[0] + i for i in self._confusion_word_set(word[1:])]
        candidates_3.extend(confusion_word)

    # add all confusion word list
    confusion_word_set = set(candidates_1 + candidates_2 + candidates_3)
    confusion_word_list = [item for item in confusion_word_set if is_chinese_string(item)]
    confusion_sorted = sorted(confusion_word_list, key=lambda k: self.word_frequency(k), reverse=True)
    return confusion_sorted[:len(confusion_word_list) // fragment + 1]

可以从这结构上是这样的思路:

  • 首先可以看到好几种召回的长度,1,2,2个以上,看完代码个人认为没有必要分3个,欢迎大家一起讨论。
  • 首先有基于拼音的,这个拼音是针对整个词的。
  • 然后是根据混淆词直接变换的。
  • 然后分成了3步骤,基于单字进行拼音召回。
  • 最后做了一个基于词频的粗排。

好了来看分解动作。

基于整个词的拼音

就是这句:

candidates_1.extend(self._confusion_word_set(word))

它的定义在这里:

def _confusion_word_set(self, word):
    confusion_word_set = set()
    candidate_words = list(self.known(edit_distance_word(word, self.cn_char_set)))
    for candidate_word in candidate_words:
        if lazy_pinyin(candidate_word) == lazy_pinyin(word):
            # same pinyin
            confusion_word_set.add(candidate_word)
    return confusion_word_set

这里有比较深的套娃,在上面第三行,里面是一个基于编辑距离的,外面再来了一个known函数(这个名字起的有点奇怪,看不出来是干啥的)。

edit_distance_word的操作看着很优雅,但是理解起来有点头疼。

def edit_distance_word(word, char_set):
    """
    all edits that are one edit away from 'word'
    :param word:
    :param char_set:
    :return:
    """

    splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
    transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]
    replaces = [L + c + R[1:] for L, R in splits if R for c in char_set]
    return set(transposes + replaces)

看着说是打印这个word有关的所有编辑距离为1的词汇,不过既然看着难受,那就打印下来看看呗,首先我构造了一些假数据,对一些变量打印下来看看结果:

a='我是天才'
c=set(['不','一','就'])
edit_distance_word(a,c)

打印出来的结果是这样的:

splits=[('''我是天才'), ('我''是天才'), ('我是''天才'), ('我是天''才'), ('我是天才''')]
transposes=['是我天才''我天是才''我是才天']
replaces={['就是天才''不是天才''一是天才''我就天才''我不天才''我一天才''我是就才''我是不才''我是一才''我是天就''我是天不''我是天一']

科普一下,编辑距离不止有一种定义,莱文斯坦距离定义的变化方式只有删除插入和修改,DL距离是允许交换的,这里作者只取了两种编辑距离的变化方式,交换和替换,常规的删除和添加是没有的(提个issue?)。这种比较巧妙的方式还是可以记录下来的。

但这里其实有一些问题,在替换的时候,实质上作者是用了整个字表来对错误位点每个位置进行替换尝试,这个可能会让召回集非常大而导致召回质量下降且复杂度高。

说完就被打脸,作者召回了这么多,是经过过滤的,要保证替换后的新词他能成为词,判断规则就是他得在词典里(已登录):

def known(self, words):
    """
    取得词序列中属于常用词部分
    :param words:
    :return:
    """

    self.check_detector_initialized()
    return set(word for word in words if word in self.word_freq)

有了这个就能过滤大量乱替换产生的候选内容,但是有一说一,召回替换导致的时间和空间复杂度是在有点大,O(MN)了,M是词典大小,N是词大小。

过滤还没结束,后续还需要判断和原句的拼音是否一致:

for candidate_word in candidate_words:
    if lazy_pinyin(candidate_word) == lazy_pinyin(word):
        # same pinyin
        confusion_word_set.add(candidate_word)

说实话,这里我挺迷的,要求拼音一致这个操作没什么大毛病,严格是严格了点,但是编辑距离为1,做了句内交换后拼音基本就不一致了,这个操作到了此步基本就没有能往下传的东西了,意义也就不大了吧,至于换词,能解一些同音别字的问题,尚且可以接受。

基于混淆词典

混淆词典其实还是比较简单的,就是个简单的查词典,直接召回。

def _confusion_custom_set(self, word):
    confusion_word_set = set()
    if word in self.custom_confusion:
        confusion_word_set = {self.custom_confusion[word]}
    return confusion_word_set

这块比较简单,也多说不了啥。

单字拼音召回

这块作者在主函数里面写了很长,看他搞什么名堂。

首先看他的几个核心的条件,实际上是在区分错误词汇的长度:

# same pinyin char
if len(word) == 1:
    pass
if len(word) == 2:
    pass
if len(word) > 2:
    pass

然后是看简单的长度为1的时候怎么处理:

# same one char pinyin
confusion = [i for i in self._confusion_char_set(word[0]) if i]
candidates_1.extend(confusion)

其实就是拿这个单字去召回:

def _confusion_char_set(self, c):
    return self.get_same_pinyin(c).union(self.get_same_stroke(c))

又开始套娃了哈哈。这两个函数直接看字就能明白了。

def get_same_pinyin(self, char):
    """
    取同音字
    :param char:
    :return:
    """

    self.check_corrector_initialized()
    return self.same_pinyin.get(char, set())

def get_same_stroke(self, char):
    """
    取形似字
    :param char:
    :return:
    """

    self.check_corrector_initialized()
    return self.same_stroke.get(char, set())

单字的明白了,那么多字其实就是类似的,取句子中每个字来做对应的操作。

粗排

作者是做了个粗排:

confusion_sorted = sorted(confusion_word_list, key=lambda k: self.word_frequency(k), reverse=True)

简单的,其实就是就是根据词频做一个简单的排序。

小结

召回这一层决定了整个纠错模块的改错能力,能有更多的召回模式,则说明纠错模块能改错的能力越强,pycorrector虽然有一些待改进的地方,但本身还是提供了不少的启发,经过代码研读,也能进一步了解整个纠错是怎么做的。



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存