语料处理基础

语料处理流程

语料收集 > 语料清洗 > 句子向量编码化 > 语料问答对构建 > 语料的模型保存 > 结束

语料收集

  • 聊天记录

  • 电影对话

  • 台词片断

语料清洗

要清洗的内容

  • 多余的空格

  • 不正规的符号

  • 多余的字符,英文

清洗的方法

  • 正则化

  • 切分

  • 好坏语句判断

语料问答对的构建

  • 问答对的处理和拆分

句子向量的编码化

  • 原始文本不能直接训练

  • 将句子转化为向量

  • 将向量转换为句子

语料模型的保存

  • 使用pickle来保存模型

  • 生成pkl格式

  • 利用pkl格式进行语料的训练

  • 最后通过深度模型过后打包成restful

实操

收集语料:

收集了200M的电影台词作为语料


M 你/没/事/吧/?/

M 是/的/,/我/没/事/

M 来/吧/,/我/在/做/早/餐/

M 好/的/

E

M C/h/l/o/e/./

M J/a/c/k/!/

M 没/事/吧/?/

M 发/生/什/么/了/?/

M 杀/死/知/道/你/还/活/着/的/人/?/

M 我/不/知/道/,/

M 现/在/,/ /我/要/进/入/C/T/U/的/档/案/

M 秘/密/的

M 我/没/电/脑/不/行/

M 在/加/州/工/学/院/有/个/研/究/处/

M 我/们/能/进/去/

M 那/是/谁/?/

语料清洗


# 语料清洗

##合并和切分,去掉不常用的符号

def make_split(line):

    if re.match(r'.*([,···?!\.,!?])$', ''.join(line)):

        return []

    return [', ']



##筛选出好句子和坏句子,判断英文和数字是不是太多了

def good_line(line):

    if len(re.findall(r'[a-zA-Z0-9]', ''.join(line))) > 2:

        return False

    return True



##替换掉多余的字符

def regular(sen):

    sen = re.sub(r'\.{3,100}', '···', sen)

    sen = re.sub(r'···{2,100}', '···', sen)

    sen = re.sub(r'[,]{1,100}', ',', sen)

    sen = re.sub(r'[\.]{1,100}', '。', sen)

    sen = re.sub(r'[\?]{1,100}', '?', sen)

    sen = re.sub(r'[!]{1,100}', '!', sen)

    return sen

问答对的构建


    #解压文件

    print('extract lines')#dgk_shooter_min

    fp = open('test.conv', 'r', errors='ignore', encoding='utf-8')



    #去掉M空格,和斜杠,丢弃,无效的句子

    groups = []

    group = []

    for line in tqdm(fp):

        if line.startswith('M '): #表示这一行是句子,需要处理

            line = line.replace('\n', '')

            if '/' in line:#如果这一行有'/',我们就从第二个字符开始以'/'切分

                line = line[2:].split('/')

            else:

                line = list(line[2:])

            line = line[:-1]



            group.append(list(regular(''.join(line))))

        else:

            if group:

                groups.append(group)

                group = []

    if group:

        groups.append(group)

        group = []

    print('extract group')





    # 定义问答对

    x_data = []

    y_data = []

    for group in tqdm(groups):#tqdm可以显示进度

        for i, line in enumerate(group): #把他化为枚举类

            last_line = None

            if i > 0:#表示最少有两行

                last_line = group[i-1]

                if not good_line(last_line):

                    last_line = None

            next_line = None

            if i < len(group) - 1:

                next_line = group[i+1]

                if not good_line(next_line):

                    next_line = None



            next_next_line = None

            if i < len(group) - 2:

                next_next_line = group[i + 2]

                if not good_line(next_next_line):

                    next_next_line = None

            if next_line:

                x_data.append(line)

                y_data.append(next_line)

            if last_line and next_line:

                x_data.append(last_line + make_split(last_line) + line)

                y_data.append(next_line)

            if next_line and next_next_line:

                x_data.append(line)

                y_data.append(next_line + make_split(next_line) + next_next_line)



    print(len(x_data), len(y_data))

句子向量的编码化


# 把句子进行编码化

class WordSequence(object):

    # 字典定义

    PAD_TAG = '<pad>'   #填充标签

    UNK_TAG = '<unk>'   #未知标签

    START_TAG = '<s>'   #开始标记

    END_TAG = '</S>'    #结束标记



    PAD = 0

    UNK = 1

    START = 2

    END = 3



    def __init__(self):

        self.fited = False  #是否训练

        self.dict = {WordSequence.PAD_TAG: WordSequence.PAD, WordSequence.UNK_TAG: WordSequence.UNK,

                     WordSequence.START_TAG: WordSequence.START, WordSequence.END_TAG: WordSequence.END}

    # 编码转换

    def to_index(self, word):   #word 2 index

        assert self.fited, 'WordSequence尚未进行fit操作'

        if word in self.dict:

            return self.dict[word]

        return WordSequence.UNK



    def to_word(self, index):   #index 2 word

        assert self.fited, 'WordSequence尚未进行fit操作'

        for k, v in self.dict.items():

            if v == index:

                return k

        return WordSequence.UNK_TAG



    def size(self):

        assert self.fited, 'WordSequence尚未进行fit操作'

        return len(self.dict) + 1



    def __len__(self):  # 返回字典长度

        return self.size()



    # 训练函数

    def fit(self, sentences, min_count=5, max_count=None, max_features=None):

        assert not self.fited, 'WordSequence只能fit一次'



        count = {}

        for sentence in sentences:

            arr = list(sentence)

            for a in arr:

                if a not in count:

                    count[a] = 0

                count[a] += 1

        if min_count is not None:

            count = {k: v for k, v in count.items() if v >= min_count}

        if max_count is not None:

            count = {k: v for k, v in count.items() if v <= max_count}



        self.dict = {WordSequence.PAD_TAG: WordSequence.PAD, WordSequence.UNK_TAG: WordSequence.UNK,

                     WordSequence.START_TAG: WordSequence.START, WordSequence.END_TAG: WordSequence.END}



        if isinstance(max_features, int):

            count = sorted(list(count.items()), key=lambda x: x[1])#排序,找到最大特征的数目

            if max_features is not None and len(count) > max_features:

                count = count[-int(max_features):]

            for w, _ in count:

                self.dict[w] = len(self.dict)

        else:

            for w in sorted(count.keys()):

                self.dict[w] = len(self.dict)



        self.fited = True



    #把句子编码化

    def transform(self, sentence, max_len=None):

        assert self.fited, 'WordSequence尚未进行fit操作'



        if max_len is not None:

            r = [self.PAD] * max_len

        else:

            r = [self.PAD] * len(sentence)



        for index, a in enumerate(sentence):

            if max_len is not None and index >= len(r):

                break

            r[index] = self.to_index(a)



        return np.array(r)



    # 把句子解码

    def inverse_transform(self, indices, ignore_pad=False, ignore_unk=False, ignore_start=False, ignore_end=False):

        ret = []

        for i in indices:

            word = self.to_word(i)

            if word == WordSequence.PAD_TAG and ignore_pad:

                continue

            if word == WordSequence.UNK_TAG and ignore_unk:

                continue

            if word == WordSequence.START_TAG and ignore_start:

                continue

            if word == WordSequence.END_TAG and ignore_end:

                continue



            ret.append(word)



        return ret

效果:


    ws = WordSequence()

    ws.fit([

        ['你', '好', '啊'],

        ['你', '好', '哦']

    ])



    indice = ws.transform(['我', '们', '好'])

    print(indice)



    back = ws.inverse_transform(indice,ignore_pad=True,ignore_unk=False,ignore_start=True,ignore_end=True)

    print(back)

output:


[1 1 1]

['<unk>', '<unk>', '<unk>']

语料模型的训练和保存


def dump(x_data, y_data):

	from word_sequence import WordSequence

    # 生成pkl文件备用

    data = list(zip(x_data, y_data))

    data = [

        (x, y) for x, y in data if limit > len(x) >= x_limit and limit > len(y) >= y_limit

    ]

    x_data, y_data = zip(*data)

    

    ## 训练以及词向量模型保存

    ws_input = WordSequence()

    ws_input.fit(x_data + y_data)

    print('dump')

    pickle.dump((x_data, y_data), open('chatbot_test.pkl', 'wb'))

    pickle.dump(ws_input, open('ws_test.pkl', 'wb'))

    print('done')

CC BY-NC 4.0

逆波兰式
简易服务器

Comments