论文地址:https://arxiv.org/abs/2105.07148
代码地址:https://github.com/liuwei1206/LEBERT
模型创新
LEBRT采用句子中的词语对(论文中称为Char-Word Pair)的特征作为输入
作者设计Lexicon adapter,在BERT的中间某一层注入词汇特征
左图是在BERT之后的架构上面引入词汇信息
右图是在BERT底层时引入词汇信息
Char-Word Pair
首先,对于给定的中文句子
sc={c1,c2,...,cn}ci代表句子中的第i个字符
利用词典D匹配出句子中包含的潜在词汇(这里设定最多匹配出相关性最强的三个,不足三个则用PAD填充)。然后,每个字符和包含该字符的词汇组成词汇对,表示为
scw={(c1,ws1),(c2,ws2),...,(cn,wsn)}wsi表示包含ci词汇组成的集合
此时就构成了Char-Words Pair Sequence
Lexicon adapter
将输入数据构建成Char-Words Pair Sequence后,句子中的每个位置包含了字符特征和词汇特征,为了把词汇特征注入到BERT中,作者设计了Lexicon adapter
Char-Words Pair Sequence中的第i个位置char-words pair表示为(h_i^c,x_i^{ws})
h_i^c:第i个位置的字符特征,该特征是BERT的某个Transformer层的输出
x_i^{ws} = \{x_{i1}^w,x_{i2}^w,...,x_{im}^w\} :第i个位置字符对应m个词汇的词向量
对词向量使用非线性变换,以至于和字符向量进行维度对齐
vwij=W2(tanh(W1xwij+b1))+b2j=1,...,m
d_c:字符特征维度
d_w:词向量维度
W_1\in R ^{d_c*d_w}
W_2\in R ^{d_c*d_c}
v_{ij}^w \in R^{d_c}
此时,对于Char-Words Pair Sequence中的第i个位置,进行维度变换后的词向量集合为
Vi=(vwi1,...,vwim)∈Rm∗dc
此时,使用注意力机制对V_i中的m个词向量进行融合
ai=softmax(hciWattnVTi)
h_i^c:为query向量
V_i:为value
a_i:使用双线性变换矩阵计算相似度得分得到
之后,再利用相似度得分对V_i进行加权求和得到融合后词特征:
zwi=m∑j=1aijvwij
最后,再把字符特征和融合后的词特征相加得到:
ˆh=ihci+zwi
Lexicon Enhanced BERT
对于给定的中文句子s_c = \{c_1,c_2,...,c_n\},将其构建成character-words pair sequence形式
scw={(c1,ws1),(c2,ws2),...,(cn,wsn)}
将\{c_1,c_2,...,c_n\}输入到BERT的Input Embedder当中,得到输出E = \{e_1,e_2,...,e_n\},之后将E(H^0 = E )输入到BERT的Transformer encoder中,每个Transformer encoder表示为如下形式:
G &= Layernormalization(H^{l-1 } + Multiheadattention(H^{l-1}))\\ H &= Layernormalization(G + FFN(G))
之后,通过Lexicon Adapter把词汇信息注入到第k层和第k+1层的Transformer层之间
第k层Transformer层的输出为 H^k = \{h_1^k,h_2^k,...,h_n^k\}。将其中的每一个Char-Words Pair(h_i^k,x_i^{ws})利用Lexicon Adapter进行转化得到:
ˆhk=LA(hki,xwsi)
代码讲解
词向量处理
加载词向量(load_word_embedding)
input
word_embed_path:词向量的路径,这里选取腾讯的tencent-ailab-embedding-zh-d200-v0.2.0-s.txt
词的个数为2000000,向量维度为200
max_scan_num:最多加载多少个词向量
output
word_embed_dict:每个词对应的词向量 200000 * 200
word_list:词集合,2000000
word_embed_dim:词的维度,200
@classmethod
def build_trie_tree(cls, word_list, save_path):
"""
# todo 是否不将单字加入字典树中
构建字典树
:return:
"""
logger.info('building trie tree')
trie_tree = Trie()
for word in word_list:
trie_tree.insert(word)
write_pickle(trie_tree, save_path)
return trie_tree
def load_word_embedding(cls, word_embed_path, max_scan_num):
"""
todo 存在许多单字的,考虑是否去掉
"""
logger.info('loading word embedding from pretrain')
word_embed_dict = dict()
word_list = list()
with open(word_embed_path, 'r', encoding='utf8') as f:
for idx, line in tqdm(enumerate(f)):
# 只扫描前max_scan_num个词向量
if idx > max_scan_num:
break
items = line.strip().split()
if idx == 0:
assert len(items) == 2
num_embed, word_embed_dim = items
num_embed, word_embed_dim = int(num_embed), int(word_embed_dim)
else:
assert len(items) == word_embed_dim + 1
word = items[0]
embedding = np.empty([1, word_embed_dim])
embedding[:] = items[1:]
word_embed_dict[word] = embedding
word_list.append(word)
logger.info('word_embed_dim:{}'.format(word_embed_dim))
logger.info('size of word_embed_dict:{}'.format(len(word_embed_dict)))
logger.info('size of word_list:{}'.format(len(word_list)))
return word_embed_dict, word_list, word_embed_dim
构建字典树(build_trie_tree)
input
word_list:word_list:词集合,2000000
save_path:字典树的保存路径(方便复用)
output
trie_tree:字典树
@classmethod
def build_trie_tree(cls, word_list, save_path):
"""
# todo 是否不将单字加入字典树中
"""
logger.info('building trie tree')
trie_tree = Trie()
for word in word_list:
trie_tree.insert(word)
write_pickle(trie_tree, save_path)
return trie_tree
def write_pickle(x, path):
with open(path, 'wb') as f:
pickle.dump(x, f)
找到数据集中的所有单词(get_words_from_corpus)
input
files:训练、验证、测试的文件路径
save_file:文件保存路径
trie_tree:字典树
output
all_matched_words:找到了所有跟我们训练、验证、测试数据有关的所有词
@classmethod
def get_words_from_corpus(cls, files, save_file, trie_tree):
"""
找出文件中所有匹配的单词
"""
logger.info('getting words from corpus')
all_matched_words = set()
for file in files:
with open(file, 'r', encoding='utf8') as f:
lines = f.readlines()
for idx in trange(len(lines)):
line = lines[idx].strip()
data = json.loads(line)
text = data['text']
matched_words = cls.get_words_from_text(text, trie_tree)
_ = [all_matched_words.add(word) for word in matched_words]
all_matched_words = list(all_matched_words)
all_matched_words = sorted(all_matched_words)
write_lines(all_matched_words, save_file)
return all_matched_words
@classmethod
def get_words_from_text(cls, text, trie_tree):
"""
找出text中所有的单词
"""
length = len(text)
matched_words_set = set() # 存储匹配到的单词
for idx in range(length):
sub_text = text[idx:idx + trie_tree.max_depth]
words = trie_tree.enumerateMatch(sub_text)
_ = [matched_words_set.add(word) for word in words]
matched_words_set = list(matched_words_set)
matched_words_set = sorted(matched_words_set)
return matched_words_set
def write_lines(lines, path, encoding='utf8'):
with open(path, 'w', encoding=encoding) as f:
for line in lines:
f.writelines('{}\n'.format(line))
初始化模型的词向量(init_model_word_embedding)
input
corpus_words:所有跟我们训练、验证、测试数据有关的所有词
word_embed_dict:每个词对应的词向量 200000 * 200
save_embed_path:词向量的保存路径
save_word_vocab_path:词表的保存保存
output
model_word_embedding:模型的嵌入向量 --> 20857 * 200
word_vocab:模型的词表 --> 20857
embed_dim:嵌入维度 --> 200
def init_model_word_embedding(self, corpus_words, word_embed_dict, save_embed_path, save_word_vocab_path):
logger.info('initializing model word embedding')
# 构建单词和id的映射
word_vocab = Vocabulary(corpus_words, vocab_type='word')
# embed_dim = len(word_embed_dict.items()[1].size)
embed_dim = next(iter(word_embed_dict.values())).size
scale = np.sqrt(3.0 / embed_dim)
model_word_embedding = np.empty([word_vocab.size, embed_dim])
matched = 0
not_matched = 0
for idx, word in enumerate(word_vocab.idx2token):
if word in word_embed_dict:
model_word_embedding[idx, :] = word_embed_dict[word]
matched += 1
else:
model_word_embedding[idx, :] = np.random.uniform(-scale, scale, [1, embed_dim])
not_matched += 1
logger.info('num of match:{}, num of not_match:{}'.format(matched, not_matched))
write_pickle(model_word_embedding, save_embed_path)
write_pickle(word_vocab, save_word_vocab_path)
return model_word_embedding, word_vocab, embed_dim
数据加载格式
每个汉字对应的单词列表(get_char2words)
input:
text:文本
output:
char_index2words:文本中每个汉字所对应的词
def get_char2words(self, text):
"""
获取每个汉字,对应的单词列表
"""
text_len = len(text)
char_index2words = [[] for _ in range(text_len)]
for idx in range(text_len):
sub_sent = text[idx:idx + self.trie_tree.max_depth] # speed using max depth
words = self.trie_tree.enumerateMatch(sub_sent) # 找到以text[idx]开头的所有单词
for word in words:
start_pos = idx
end_pos = idx + len(word)
for i in range(start_pos, end_pos):
char_index2words[i].append(word)
return char_index2words
数据加载格式(collate)
output
特征输入为:
'text': text,
'input_ids': input_ids,
'attention_mask': input_mask,
'token_type_ids': token_type_ids,
'word_ids': word_ids,
'word_mask': word_mask,
'label_ids': label_ids
def get_input_data(self, file):
lines = load_lines(file)
features = []
cls_token_id = self.tokenizer.cls_token_id
sep_token_id = self.tokenizer.sep_token_id
pad_token_id = self.tokenizer.pad_token_id
o_label_id = self.label_vocab.convert_token_to_id('O')
pad_label_id = self.label_vocab.convert_token_to_id('[PAD]')
for line in tqdm(lines):
data = json.loads(line)
text = data['text']
labels = data['label']
char_index2words = self.get_char2words(text)
# 在开头与结尾分别添加[CLS]与[SEP]
input_ids = [cls_token_id] + self.tokenizer.convert_tokens_to_ids(text) + [sep_token_id]
label_ids = [o_label_id] + self.label_vocab.convert_tokens_to_ids(labels) + [o_label_id]
word_ids_list = []
word_pad_id = self.word_vocab.convert_token_to_id('[PAD]')
for words in char_index2words:
words = words[:self.max_word_num]
word_ids = self.word_vocab.convert_tokens_to_ids(words)
word_pad_num = self.max_word_num - len(words)
word_ids = word_ids + [word_pad_id] * word_pad_num
word_ids_list.append(word_ids)
# 开头和结尾进行padding
word_ids_list = [[word_pad_id]*self.max_word_num] + word_ids_list + [[word_pad_id]*self.max_word_num]
if len(input_ids) > self.max_seq_len:
input_ids = input_ids[: self.max_seq_len]
label_ids = label_ids[: self.max_seq_len]
word_ids_list = word_ids_list[: self.max_seq_len]
input_mask = [1] * len(input_ids)
token_type_ids = [0] * len(input_ids)
assert len(input_ids) == len(label_ids) == len(word_ids_list)
# padding
padding_length = self.max_seq_len - len(input_ids)
input_ids += [pad_token_id] * padding_length
input_mask += [0] * padding_length
token_type_ids += [0] * padding_length
label_ids += [pad_label_id] * padding_length
word_ids_list += [[word_pad_id]*self.max_word_num] * padding_length
text = ''.join(text)
input_ids = torch.LongTensor(input_ids)
label_ids = torch.LongTensor(label_ids)
input_mask = torch.LongTensor(input_mask)
token_type_ids = torch.LongTensor(token_type_ids)
word_ids = torch.LongTensor(word_ids_list)
word_mask = (word_ids != word_pad_id).long()
feature = {
'text': text, 'input_ids': input_ids, 'attention_mask': input_mask, 'token_type_ids': token_type_ids,
'word_ids': word_ids, 'word_mask': word_mask, 'label_ids': label_ids
}
features.append(feature)
return features
模型运行
模型初步加载
config.word_vocab_size:20857
config.word_embed_dim:200
LEBertModel:对其中的BertEncoder模块进行改造,后续会详细介绍
class LEBertSoftmaxForNer(BertPreTrainedModel):
def __init__(self, config):
super(LEBertSoftmaxForNer, self).__init__(config)
self.word_embeddings = nn.Embedding(config.word_vocab_size, config.word_embed_dim)
self.num_labels = config.num_labels
self.bert = LEBertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
self.loss_type = config.loss_type
self.init_weights()
def forward(self, input_ids, attention_mask, token_type_ids, word_ids, word_mask, ignore_index, labels=None):
word_embeddings = self.word_embeddings(word_ids)
outputs = self.bert(
input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids,
word_embeddings=word_embeddings, word_mask=word_mask
)
sequence_output = outputs[0]
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output)
outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here
if labels is not None:
assert self.loss_type in ['lsr', 'focal', 'ce']
if self.loss_type == 'lsr':
loss_fct = LabelSmoothingCrossEntropy(ignore_index=ignore_index)
elif self.loss_type == 'focal':
loss_fct = FocalLoss(ignore_index=ignore_index)
else:
loss_fct = CrossEntropyLoss(ignore_index=ignore_index)
# Only keep active parts of the loss
if attention_mask is not None:
active_loss = attention_mask.contiguous().view(-1) == 1
active_logits = logits.contiguous().view(-1, self.num_labels)[active_loss]
active_labels = labels.contiguous().view(-1)[active_loss]
loss = loss_fct(active_logits, active_labels)
else:
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
outputs = (loss,) + outputs
return outputs # (loss), scores, (hidden_states), (attentions)
model.word_embeddings.weight.data.copy_(torch.from_numpy(processor.word_embedding))
把词向量的word_embedding赋给LEBertSoftmaxForNer
简要概述
class LEBertModel(BertPreTrainedModel):
def __init__(self, config):
self.encoder = BertEncoder(config)
class BertEncoder(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])
self.word_embedding_adapter = WordEmbeddingAdapter(config)
def forward():
# 在第i层之后,进行融合
if i == self.config.add_layer:
hidden_states = self.word_embedding_adapter(hidden_states, word_embeddings, word_mask)
核心代码
class WordEmbeddingAdapter(nn.Module):
def __init__(self, config):
super(WordEmbeddingAdapter, self).__init__()
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.tanh = nn.Tanh()
self.linear1 = nn.Linear(config.word_embed_dim, config.hidden_size)
self.linear2 = nn.Linear(config.hidden_size, config.hidden_size)
attn_W = torch.zeros(config.hidden_size, config.hidden_size)
self.attn_W = nn.Parameter(attn_W)
self.attn_W.data.normal_(mean=0.0, std=config.initializer_range)
self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
def forward(self, layer_output, word_embeddings, word_mask):
"""
:param layer_output:bert layer的输出,[b_size, len_input, d_model]
:param word_embeddings:每个汉字对应的词向量集合,[b_size, len_input, num_word, d_word]
:param word_mask:每个汉字对应的词向量集合的attention mask, [b_size, len_input, num_word]
"""
# transform
# 将词向量,与字符向量进行维度对齐
word_outputs = self.linear1(word_embeddings)
word_outputs = self.tanh(word_outputs)
word_outputs = self.linear2(word_outputs)
word_outputs = self.dropout(word_outputs) # word_outputs:[b_size, len_input, num_word, d_model]
# 计算每个字符向量,与其对应的所有词向量的注意力权重,然后加权求和。采用双线性映射计算注意力权重
# layer_output = layer_output.unsqueeze(2) # layer_output:[b_size, len_input, 1, d_model]
socres = torch.matmul(layer_output.unsqueeze(2), self.attn_W) # [b_size, len_input, 1, d_model]
socres = torch.matmul(socres, torch.transpose(word_outputs, 2, 3)) # [b_size, len_input, 1, num_word]
socres = socres.squeeze(2) # [b_size, len_input, num_word]
socres.masked_fill_(word_mask, -1e9) # 将pad的注意力设为很小的数
socres = F.softmax(socres, dim=-1) # [b_size, len_input, num_word]
attn = socres.unsqueeze(-1) # [b_size, len_input, num_word, 1]
weighted_word_embedding = torch.sum(word_outputs * attn, dim=2) # [N, L, D] # 加权求和,得到每个汉字对应的词向量集合的表示
layer_output = layer_output + weighted_word_embedding
layer_output = self.dropout(layer_output)
layer_output = self.layer_norm(layer_output)
return layer_output
# transform
# 将词向量,与字符向量进行维度对齐
word_outputs = self.linear1(word_embeddings)
word_outputs = self.tanh(word_outputs)
word_outputs = self.linear2(word_outputs)
word_outputs = self.dropout(word_outputs) # word_outputs:[b_size, len_input, num_word, d_model]
# 计算每个字符向量,与其对应的所有词向量的注意力权重,然后加权求和。采用双线性映射计算注意力权重
# layer_output = layer_output.unsqueeze(2) # layer_output:[b_size, len_input, 1, d_model]
socres = torch.matmul(layer_output.unsqueeze(2), self.attn_W) # [b_size, len_input, 1, d_model]
socres = torch.matmul(socres, torch.transpose(word_outputs, 2, 3)) # [b_size, len_input, 1, num_word]
socres = socres.squeeze(2) # [b_size, len_input, num_word]
socres.masked_fill_(word_mask, -1e9) # 将pad的注意力设为很小的数
socres = F.softmax(socres, dim=-1) # [b_size, len_input, num_word]
attn = socres.unsqueeze(-1) # [b_size, len_input, num_word, 1]
weighted_word_embedding = torch.sum(word_outputs * attn, dim=2)
layer_output = layer_output + weighted_word_embedding
参照资料
论文解说:https://zhuanlan.zhihu.com/p/374720213
复现代码:https://github.com/yangjianxin1/LEBERT-NER-Chinese