语料处理流程
语料收集 > 语料清洗 > 句子向量编码化 > 语料问答对构建 > 语料的模型保存 > 结束
语料收集
聊天记录
电影对话
台词片断
语料清洗
要清洗的内容
多余的空格
不正规的符号
多余的字符,英文
清洗的方法
正则化
切分
好坏语句判断
语料问答对的构建
- 问答对的处理和拆分
句子向量的编码化
原始文本不能直接训练
将句子转化为向量
将向量转换为句子
语料模型的保存
使用pickle来保存模型
生成pkl格式
利用pkl格式进行语料的训练
最后通过深度模型过后打包成restful
实操
收集语料:
收集了200M的电影台词作为语料
M 你/没/事/吧/?/
M 是/的/,/我/没/事/
M 来/吧/,/我/在/做/早/餐/
M 好/的/
E
M C/h/l/o/e/./
M J/a/c/k/!/
M 没/事/吧/?/
M 发/生/什/么/了/?/
M 杀/死/知/道/你/还/活/着/的/人/?/
M 我/不/知/道/,/
M 现/在/,/ /我/要/进/入/C/T/U/的/档/案/
M 秘/密/的
M 我/没/电/脑/不/行/
M 在/加/州/工/学/院/有/个/研/究/处/
M 我/们/能/进/去/
M 那/是/谁/?/
语料清洗
# 语料清洗
##合并和切分,去掉不常用的符号
def make_split(line):
if re.match(r'.*([,···?!\.,!?])$', ''.join(line)):
return []
return [', ']
##筛选出好句子和坏句子,判断英文和数字是不是太多了
def good_line(line):
if len(re.findall(r'[a-zA-Z0-9]', ''.join(line))) > 2:
return False
return True
##替换掉多余的字符
def regular(sen):
sen = re.sub(r'\.{3,100}', '···', sen)
sen = re.sub(r'···{2,100}', '···', sen)
sen = re.sub(r'[,]{1,100}', ',', sen)
sen = re.sub(r'[\.]{1,100}', '。', sen)
sen = re.sub(r'[\?]{1,100}', '?', sen)
sen = re.sub(r'[!]{1,100}', '!', sen)
return sen
问答对的构建
#解压文件
print('extract lines')#dgk_shooter_min
fp = open('test.conv', 'r', errors='ignore', encoding='utf-8')
#去掉M空格,和斜杠,丢弃,无效的句子
groups = []
group = []
for line in tqdm(fp):
if line.startswith('M '): #表示这一行是句子,需要处理
line = line.replace('\n', '')
if '/' in line:#如果这一行有'/',我们就从第二个字符开始以'/'切分
line = line[2:].split('/')
else:
line = list(line[2:])
line = line[:-1]
group.append(list(regular(''.join(line))))
else:
if group:
groups.append(group)
group = []
if group:
groups.append(group)
group = []
print('extract group')
# 定义问答对
x_data = []
y_data = []
for group in tqdm(groups):#tqdm可以显示进度
for i, line in enumerate(group): #把他化为枚举类
last_line = None
if i > 0:#表示最少有两行
last_line = group[i-1]
if not good_line(last_line):
last_line = None
next_line = None
if i < len(group) - 1:
next_line = group[i+1]
if not good_line(next_line):
next_line = None
next_next_line = None
if i < len(group) - 2:
next_next_line = group[i + 2]
if not good_line(next_next_line):
next_next_line = None
if next_line:
x_data.append(line)
y_data.append(next_line)
if last_line and next_line:
x_data.append(last_line + make_split(last_line) + line)
y_data.append(next_line)
if next_line and next_next_line:
x_data.append(line)
y_data.append(next_line + make_split(next_line) + next_next_line)
print(len(x_data), len(y_data))
句子向量的编码化
# 把句子进行编码化
class WordSequence(object):
# 字典定义
PAD_TAG = '<pad>' #填充标签
UNK_TAG = '<unk>' #未知标签
START_TAG = '<s>' #开始标记
END_TAG = '</S>' #结束标记
PAD = 0
UNK = 1
START = 2
END = 3
def __init__(self):
self.fited = False #是否训练
self.dict = {WordSequence.PAD_TAG: WordSequence.PAD, WordSequence.UNK_TAG: WordSequence.UNK,
WordSequence.START_TAG: WordSequence.START, WordSequence.END_TAG: WordSequence.END}
# 编码转换
def to_index(self, word): #word 2 index
assert self.fited, 'WordSequence尚未进行fit操作'
if word in self.dict:
return self.dict[word]
return WordSequence.UNK
def to_word(self, index): #index 2 word
assert self.fited, 'WordSequence尚未进行fit操作'
for k, v in self.dict.items():
if v == index:
return k
return WordSequence.UNK_TAG
def size(self):
assert self.fited, 'WordSequence尚未进行fit操作'
return len(self.dict) + 1
def __len__(self): # 返回字典长度
return self.size()
# 训练函数
def fit(self, sentences, min_count=5, max_count=None, max_features=None):
assert not self.fited, 'WordSequence只能fit一次'
count = {}
for sentence in sentences:
arr = list(sentence)
for a in arr:
if a not in count:
count[a] = 0
count[a] += 1
if min_count is not None:
count = {k: v for k, v in count.items() if v >= min_count}
if max_count is not None:
count = {k: v for k, v in count.items() if v <= max_count}
self.dict = {WordSequence.PAD_TAG: WordSequence.PAD, WordSequence.UNK_TAG: WordSequence.UNK,
WordSequence.START_TAG: WordSequence.START, WordSequence.END_TAG: WordSequence.END}
if isinstance(max_features, int):
count = sorted(list(count.items()), key=lambda x: x[1])#排序,找到最大特征的数目
if max_features is not None and len(count) > max_features:
count = count[-int(max_features):]
for w, _ in count:
self.dict[w] = len(self.dict)
else:
for w in sorted(count.keys()):
self.dict[w] = len(self.dict)
self.fited = True
#把句子编码化
def transform(self, sentence, max_len=None):
assert self.fited, 'WordSequence尚未进行fit操作'
if max_len is not None:
r = [self.PAD] * max_len
else:
r = [self.PAD] * len(sentence)
for index, a in enumerate(sentence):
if max_len is not None and index >= len(r):
break
r[index] = self.to_index(a)
return np.array(r)
# 把句子解码
def inverse_transform(self, indices, ignore_pad=False, ignore_unk=False, ignore_start=False, ignore_end=False):
ret = []
for i in indices:
word = self.to_word(i)
if word == WordSequence.PAD_TAG and ignore_pad:
continue
if word == WordSequence.UNK_TAG and ignore_unk:
continue
if word == WordSequence.START_TAG and ignore_start:
continue
if word == WordSequence.END_TAG and ignore_end:
continue
ret.append(word)
return ret
效果:
ws = WordSequence()
ws.fit([
['你', '好', '啊'],
['你', '好', '哦']
])
indice = ws.transform(['我', '们', '好'])
print(indice)
back = ws.inverse_transform(indice,ignore_pad=True,ignore_unk=False,ignore_start=True,ignore_end=True)
print(back)
output:
[1 1 1]
['<unk>', '<unk>', '<unk>']
语料模型的训练和保存
def dump(x_data, y_data):
from word_sequence import WordSequence
# 生成pkl文件备用
data = list(zip(x_data, y_data))
data = [
(x, y) for x, y in data if limit > len(x) >= x_limit and limit > len(y) >= y_limit
]
x_data, y_data = zip(*data)
## 训练以及词向量模型保存
ws_input = WordSequence()
ws_input.fit(x_data + y_data)
print('dump')
pickle.dump((x_data, y_data), open('chatbot_test.pkl', 'wb'))
pickle.dump(ws_input, open('ws_test.pkl', 'wb'))
print('done')
Comments