! [ -e /content ] && pip install -Uqq fastai # 在Colab上升级fastai
文本学习者
from __future__ import annotations
from fastai.basics import *
from fastai.text.core import *
from fastai.text.data import *
from fastai.text.models.core import *
from fastai.text.models.awdlstm import *
from fastai.callback.rnn import *
from fastai.callback.progress import *
from nbdev.showdoc import *
构建适合于NLP中迁移学习的
Learner
所需的所有函数
该模块最重要的功能是 language_model_learner
和 text_classifier_learner
。它们将帮助您使用预训练模型定义一个 Learner
。有关用法示例,请参见 文本教程。
加载预训练模型
在文本中,加载预训练模型时,我们需要将用于预训练的词汇的嵌入适配到我们当前语料库的词汇中。
def match_embeds(
dict, # 嵌入权重
old_wgts:list, # 用于预训练的语料库词汇
old_vocab:list # 当前语料库词汇
new_vocab:-> dict:
) "Convert the embedding in `old_wgts` to go from `old_vocab` to `new_vocab`."
= old_wgts.get('1.decoder.bias', None), old_wgts['0.encoder.weight']
bias, wgts = wgts.mean(0)
wgts_m = wgts.new_zeros((len(new_vocab),wgts.size(1)))
new_wgts if bias is not None:
= bias.mean(0)
bias_m = bias.new_zeros((len(new_vocab),))
new_bias = old_vocab.o2i if hasattr(old_vocab, 'o2i') else {w:i for i,w in enumerate(old_vocab)}
old_o2i for i,w in enumerate(new_vocab):
= old_o2i.get(w, -1)
idx = wgts[idx] if idx>=0 else wgts_m
new_wgts[i] if bias is not None: new_bias[i] = bias[idx] if idx>=0 else bias_m
'0.encoder.weight'] = new_wgts
old_wgts[if '0.encoder_dp.emb.weight' in old_wgts: old_wgts['0.encoder_dp.emb.weight'] = new_wgts.clone()
'1.decoder.weight'] = new_wgts.clone()
old_wgts[if bias is not None: old_wgts['1.decoder.bias'] = new_bias
return old_wgts
对于 new_vocab
中没有对应匹配的 old_vocab
中的单词,我们使用所有预训练嵌入的均值。
= {'0.encoder.weight': torch.randn(5,3)}
wgts = match_embeds(wgts.copy(), ['a', 'b', 'c'], ['a', 'c', 'd', 'b'])
new_wgts = wgts['0.encoder.weight'],new_wgts['0.encoder.weight']
old,new 0], old[0])
test_eq(new[1], old[2])
test_eq(new[2], old.mean(0))
test_eq(new[3], old[1]) test_eq(new[
#带有偏见
= {'0.encoder.weight': torch.randn(5,3), '1.decoder.bias': torch.randn(5)}
wgts = match_embeds(wgts.copy(), ['a', 'b', 'c'], ['a', 'c', 'd', 'b'])
new_wgts = wgts['0.encoder.weight'],new_wgts['0.encoder.weight']
old_w,new_w = wgts['1.decoder.bias'], new_wgts['1.decoder.bias']
old_b,new_b 0], old_w[0])
test_eq(new_w[1], old_w[2])
test_eq(new_w[2], old_w.mean(0))
test_eq(new_w[3], old_w[1])
test_eq(new_w[0], old_b[0])
test_eq(new_b[1], old_b[2])
test_eq(new_b[2], old_b.mean(0))
test_eq(new_b[3], old_b[1]) test_eq(new_b[
def _get_text_vocab(dls:DataLoaders) -> list:
"Get vocabulary from `DataLoaders`"
= dls.vocab
vocab if isinstance(vocab, L): vocab = vocab[0]
return vocab
def load_ignore_keys(
# 模型架构
model, dict # 模型权重
wgts:-> tuple:
) "Load `wgts` in `model` ignoring the names of the keys, just taking parameters in order"
= model.state_dict()
sd for k1,k2 in zip(sd.keys(), wgts.keys()): sd[k1].data = wgts[k2].data.clone()
return model.load_state_dict(sd)
def _rm_module(n:str):
= n.split('.')
t for i in range(len(t)-1, -1, -1):
if t[i] == 'module':
t.pop(i)break
return '.'.join(t)
#为了兼容之前的版本,发布时请删除
def clean_raw_keys(wgts:dict):
= list(wgts.keys())
keys for k in keys:
= k.split('.module')
t if f'{_rm_module(k)}_raw' in keys: del wgts[k]
return wgts
#为了兼容之前的版本,发布时请删除
def load_model_text(
file:str, # 保存的文本模型文件名
# 模型架构
model, # `优化器`用于拟合模型
opt:Optimizer, bool=None, # 启用加载 `优化器` 状态
with_opt:int|str|torch.device=None, # Sets the device, uses 'cpu' if unspecified
device:bool=True # 是否严格要求`file`的状态字典键与模型`Module.state_dict`的键完全匹配
strict:
):"Load `model` from `file` along with `opt` (if available, and if `with_opt`)"
distrib_barrier()if isinstance(device, int): device = torch.device('cuda', device)
elif device is None: device = 'cpu'
= torch.load(file, map_location=device)
state = set(state)=={'model', 'opt'}
hasopt = state['model'] if hasopt else state
model_state =strict)
get_model(model).load_state_dict(clean_raw_keys(model_state), strictif hasopt and ifnone(with_opt,True):
try: opt.load_state_dict(state['opt'])
except:
if with_opt: warn("Could not load the optimizer state.")
elif with_opt: warn("Saved file doesn't contain an optimizer state.")
@delegates(Learner.__init__)
class TextLearner(Learner):
"Basic class for a `Learner` in NLP."
def __init__(self,
# 文本 `DataLoaders`
dls:DataLoaders, # 一个标准的PyTorch模型
model, float=2., # `RNNRegularizer`的参数
alpha:float=1., # `RNNRegularizer`的参数
beta:tuple=(0.8,0.7,0.8), # `余弦退火调度器`的势头
moms:**kwargs
):super().__init__(dls, model, moms=moms, **kwargs)
self.add_cbs(rnn_cbs())
def save_encoder(self,
file:str # `Encoder` 的文件名
):"Save the encoder to `file` in the model directory"
if rank_distrib(): return # don't save if child proc
= get_model(self.model)[0]
encoder if hasattr(encoder, 'module'): encoder = encoder.module
file, self.path/self.model_dir, ext='.pth'))
torch.save(encoder.state_dict(), join_path_file(
def load_encoder(self,
file:str, # Filename of the saved encoder
int|str|torch.device=None # 用于加载的设备,默认为 `dls` 设备
device:
):"Load the encoder `file` from the model directory, optionally ensuring it's on `device`"
= get_model(self.model)[0]
encoder if device is None: device = self.dls.device
if hasattr(encoder, 'module'): encoder = encoder.module
distrib_barrier()= torch.load(join_path_file(file,self.path/self.model_dir, ext='.pth'), map_location=device)
wgts
encoder.load_state_dict(clean_raw_keys(wgts))self.freeze()
return self
def load_pretrained(self,
str, # Filename of saved weights
wgts_fname:str, # Saved vocabulary filename in pickle format
vocab_fname:=None # Model to load parameters from, defaults to `Learner.model`
model
):"Load a pretrained model and adapt it to the data vocabulary."
= load_pickle(vocab_fname)
old_vocab = _get_text_vocab(self.dls)
new_vocab
distrib_barrier()= torch.load(wgts_fname, map_location = lambda storage,loc: storage)
wgts if 'model' in wgts: wgts = wgts['model'] #以防预训练模型是与优化器一起保存的
= match_embeds(wgts, old_vocab, new_vocab)
wgts self.model if model is None else model, clean_raw_keys(wgts))
load_ignore_keys(self.freeze()
return self
#为了兼容之前的版本。在发布时删除
@delegates(load_model_text)
def load(self,
file:str, # 保存模型的文件名
bool=None, # 启用加载 `优化器` 状态
with_opt:int|str|torch.device=None, # 用于加载的设备,默认为 `dls` 设备
device:**kwargs
):if device is None: device = self.dls.device
if self.opt is None: self.create_opt()
file = join_path_file(file, self.path/self.model_dir, ext='.pth')
file, self.model, self.opt, device=device, **kwargs)
load_model_text(return self
添加了一个 ModelResetter
和一个带有 alpha
和 beta
的 RNNRegularizer
到回调中,其余部分与 Learner
的初始化相同。
这个 Learner
为基类增加了功能:
show_doc(TextLearner.load_pretrained)
TextLearner.load_pretrained
TextLearner.load_pretrained (wgts_fname:str, vocab_fname:str, model=None)
Load a pretrained model and adapt it to the data vocabulary.
Type | Default | Details | |
---|---|---|---|
wgts_fname | str | Filename of saved weights | |
vocab_fname | str | Saved vocabulary filename in pickle format | |
model | NoneType | None | Model to load parameters from, defaults to Learner.model |
wgts_fname
应指向预训练模型的权重,vocab_fname
应指向用于预训练的词汇表。
show_doc(TextLearner.save_encoder)
TextLearner.save_encoder
TextLearner.save_encoder (file:str)
Save the encoder to file
in the model directory
Type | Details | |
---|---|---|
file | str | Filename for Encoder |
模型目录为 Learner.path/Learner.model_dir
。
show_doc(TextLearner.load_encoder)
TextLearner.load_encoder
TextLearner.load_encoder (file:str, device:int|str|torch.device=None)
Load the encoder file
from the model directory, optionally ensuring it’s on device
Type | Default | Details | |
---|---|---|---|
file | str | Filename of the saved encoder | |
device | int | str | torch.device | None | Device used to load, defaults to dls device |
语言模型预测
对于语言模型,predict 方法与其他应用有很大的不同,这就是为什么它需要自己的子类。
def decode_spec_tokens(tokens):
"Decode the special tokens in `tokens`"
= [],None,None
new_toks,rule,arg for t in tokens:
if t in [TK_MAJ, TK_UP, TK_REP, TK_WREP]: rule = t
elif rule is None: new_toks.append(t)
elif rule == TK_MAJ:
1].upper() + t[1:].lower())
new_toks.append(t[:= None
rule elif rule == TK_UP:
new_toks.append(t.upper())= None
rule elif arg is None:
try: arg = int(t)
except: rule = None
else:
if rule == TK_REP: new_toks.append(t * arg)
else: new_toks += [t] * arg
return new_toks
'xxmaj', 'text']), ['Text'])
test_eq(decode_spec_tokens(['xxup', 'text']), ['TEXT'])
test_eq(decode_spec_tokens(['xxrep', '3', 'a']), ['aaa'])
test_eq(decode_spec_tokens(['xxwrep', '3', 'word']), ['word', 'word', 'word']) test_eq(decode_spec_tokens([
class LMLearner(TextLearner):
"Add functionality to `TextLearner` when dealing with a language model"
def predict(self, text, n_words=1, no_unk=True, temperature=1., min_p=None, no_bar=False,
=decode_spec_tokens, only_last_word=False):
decoder"Return `text` and the `n_words` that come after"
self.model.reset()
= idxs_all = self.dls.test_dl([text]).items[0].to(self.dls.device)
idxs if no_unk: unk_idx = self.dls.vocab.index(UNK)
for _ in (range(n_words) if no_bar else progress_bar(range(n_words), leave=False)):
with self.no_bar(): preds,_ = self.get_preds(dl=[(idxs[None],)])
= preds[0][-1]
res if no_unk: res[unk_idx] = 0.
if min_p is not None:
if (res >= min_p).float().sum() == 0:
f"There is no item with probability >= {min_p}, try a lower value.")
warn(else: res[res < min_p] = 0.
if temperature != 1.: res.pow_(1 / temperature)
= torch.multinomial(res, 1).item()
idx = idxs_all = torch.cat([idxs_all, idxs.new([idx])])
idxs if only_last_word: idxs = idxs[-1][None]
= self.dls.train_ds.numericalize
num = [num.vocab[i] for i in idxs_all if num.vocab[i] not in [BOS, PAD]]
tokens = self.dls.train_ds.tokenizer.sep
sep return sep.join(decoder(tokens))
@delegates(Learner.get_preds)
def get_preds(self, concat_dim=1, **kwargs): return super().get_preds(concat_dim=1, **kwargs)
=3) show_doc(LMLearner, title_level
LMLearner
LMLearner (dls:DataLoaders, model, alpha:float=2.0, beta:float=1.0, moms:tuple=(0.8, 0.7, 0.8), loss_func:callable|None=None, opt_func=<function Adam>, lr=0.001, splitter:callable=<function trainable_params>, cbs=None, metrics=None, path=None, model_dir='models', wd=None, wd_bn_bias=False, train_bn=True, default_cbs:bool=True)
Add functionality to TextLearner
when dealing with a language model
Type | Default | Details | |
---|---|---|---|
dls | DataLoaders | Text DataLoaders |
|
model | A standard PyTorch model | ||
alpha | float | 2.0 | Param for RNNRegularizer |
beta | float | 1.0 | Param for RNNRegularizer |
moms | tuple | (0.8, 0.7, 0.8) | Momentum for Cosine Annealing Scheduler |
loss_func | callable | None | None | Loss function for training |
opt_func | function | Adam | Optimisation function for training |
lr | float | 0.001 | Learning rate |
splitter | callable | trainable_params | Used to split parameters into layer groups |
cbs | NoneType | None | Callbacks |
metrics | NoneType | None | Printed after each epoch |
path | NoneType | None | Parent directory to save, load, and export models |
model_dir | str | models | Subdirectory to save and load models |
wd | NoneType | None | Weight decay |
wd_bn_bias | bool | False | Apply weight decay to batchnorm bias params? |
train_bn | bool | True | Always train batchnorm layers? |
default_cbs | bool | True | Include default callbacks? |
show_doc(LMLearner.predict)
LMLearner.predict
LMLearner.predict (text, n_words=1, no_unk=True, temperature=1.0, min_p=None, no_bar=False, decoder=<function decode_spec_tokens>, only_last_word=False)
Return text
and the n_words
that come after
这些词是根据每个索引的概率随机选择的。no_unk
表示我们不会选择UNK
标记,temperature
被应用于预测中,如果传递了min_p
,我们将不考虑概率低于该值的索引。如果您不想要任何进度条,可以将no_bar
设置为True
,同时您可以传递一个自定义的decoder
来处理预测的标记。
Learner
便利函数
from fastai.text.models.core import _model_meta
def _get_text_vocab(dls):
= dls.vocab
vocab if isinstance(vocab, L): vocab = vocab[0]
return vocab
@delegates(Learner.__init__)
def language_model_learner(dls, arch, config=None, drop_mult=1., backwards=False, pretrained=True, pretrained_fnames=None, **kwargs):
"Create a `Learner` with a language model from `dls` and `arch`."
= _get_text_vocab(dls)
vocab = get_language_model(arch, len(vocab), config=config, drop_mult=drop_mult)
model = _model_meta[arch]
meta = LMLearner(dls, model, loss_func=CrossEntropyLossFlat(), splitter=meta['split_lm'], **kwargs)
learn = 'url_bwd' if backwards else 'url'
url if pretrained or pretrained_fnames:
if pretrained_fnames is not None:
= [learn.path/learn.model_dir/f'{fn}.{ext}' for fn,ext in zip(pretrained_fnames, ['pth', 'pkl'])]
fnames else:
if url not in meta:
"There are no pretrained weights for that architecture yet!")
warn(return learn
= untar_data(meta[url] , c_key='model')
model_path try: fnames = [list(model_path.glob(f'*.{ext}'))[0] for ext in ['pth', 'pkl']]
except IndexError: print(f'The model in {model_path} is incomplete, download again'); raise
= learn.load_pretrained(*fnames)
learn return learn
您可以使用 config
来自定义使用的架构(为此更改 awd_lstm_lm_config
中的值),pretrained
将使用 fastai 的预训练模型(如果可用)或您可以传递包含您自己的预训练模型和相应词汇表的具体 pretrained_fnames
。所有其他参数都将传递给 Learner
。
= untar_data(URLs.IMDB_SAMPLE)
path = pd.read_csv(path/'texts.csv')
df = TextDataLoaders.from_df(df, path=path, text_col='text', is_lm=True, valid_col='is_valid')
dls = language_model_learner(dls, AWD_LSTM) learn
您可以使用 .predict
方法生成新文本。
'This movie is about', n_words=20) learn.predict(
'This movie is about plans by Tom Cruise to win a loyalty sharing award at the Battle of Christmas'
默认情况下,每次预测一个单词后,整个句子会被重新输入到模型中,这个小技巧提升了生成文本的质量。如果你只想输入最后一个单词,请指定参数 only_last_word
。
'This movie is about', n_words=20, only_last_word=True) learn.predict(
'This movie is about the J. Intelligent , ha - agency . Griffith , and Games on the early after'
@delegates(Learner.__init__)
def text_classifier_learner(dls, arch, seq_len=72, config=None, backwards=False, pretrained=True, drop_mult=0.5, n_out=None,
=None, ps=None, max_len=72*20, y_range=None, **kwargs):
lin_ftrs"Create a `Learner` with a text classifier from `dls` and `arch`."
= _get_text_vocab(dls)
vocab if n_out is None: n_out = get_c(dls)
assert n_out, "`n_out` is not defined, and could not be inferred from data, set `dls.c` or pass `n_out`"
= get_text_classifier(arch, len(vocab), n_out, seq_len=seq_len, config=config, y_range=y_range,
model =drop_mult, lin_ftrs=lin_ftrs, ps=ps, max_len=max_len)
drop_mult= _model_meta[arch]
meta = TextLearner(dls, model, splitter=meta['split_clas'], **kwargs)
learn = 'url_bwd' if backwards else 'url'
url if pretrained:
if url not in meta:
"There are no pretrained weights for that architecture yet!")
warn(return learn
= untar_data(meta[url], c_key='model')
model_path try: fnames = [list(model_path.glob(f'*.{ext}'))[0] for ext in ['pth', 'pkl']]
except IndexError: print(f'The model in {model_path} is incomplete, download again'); raise
= learn.load_pretrained(*fnames, model=learn.model[0])
learn
learn.freeze()return learn
您可以使用 config
自定义所使用的架构(通过更改 awd_lstm_clas_config
中的值来实现),pretrained
将使用 fastai 的预训练模型来支持该 arch
(如果可用)。drop_mult
是一个全局乘子,用于控制所有的丢弃率。n_out
通常从 dls
中推断,但您也可以直接传递它。
该模型使用 SentenceEncoder
,这意味着文本一次传递 seq_len
个标记,并且只会在最后的 max_len
步骤上计算梯度。lin_ftrs
和 ps
被传递给 get_text_classifier
。
所有其他参数都被传递给 Learner
。
= untar_data(URLs.IMDB_SAMPLE)
path = pd.read_csv(path/'texts.csv')
df = TextDataLoaders.from_df(df, path=path, text_col='text', label_col='label', valid_col='is_valid')
dls = text_classifier_learner(dls, AWD_LSTM) learn
显示方法 -
@typedispatch
def show_results(x: LMTensorText, y, samples, outs, ctxs=None, max_n=10, **kwargs):
if ctxs is None: ctxs = get_empty_df(min(len(samples), max_n))
for i,l in enumerate(['input', 'target']):
= [b.show(ctx=c, label=l, **kwargs) for b,c,_ in zip(samples.itemgot(i),ctxs,range(max_n))]
ctxs = [b.show(ctx=c, label='pred', **kwargs) for b,c,_ in zip(outs.itemgot(0),ctxs,range(max_n))]
ctxs
display_df(pd.DataFrame(ctxs))return ctxs
@typedispatch
def show_results(x: TensorText, y, samples, outs, ctxs=None, max_n=10, trunc_at=150, **kwargs):
if ctxs is None: ctxs = get_empty_df(min(len(samples), max_n))
= L((s[0].truncate(trunc_at),*s[1:]) for s in samples)
samples = show_results[object](x, y, samples, outs, ctxs=ctxs, max_n=max_n, **kwargs)
ctxs
display_df(pd.DataFrame(ctxs))return ctxs
@typedispatch
def plot_top_losses(x: TensorText, y:TensorCategory, samples, outs, raws, losses, trunc_at=150, **kwargs):
= get_empty_df(len(samples))
rows = L((s[0].truncate(trunc_at),*s[1:]) for s in samples)
samples for i,l in enumerate(['input', 'target']):
= [b.show(ctx=c, label=l, **kwargs) for b,c in zip(samples.itemgot(i),rows)]
rows = L(o + (TitledFloat(r.max().item()), TitledFloat(l.item())) for o,r,l in zip(outs, raws, losses))
outs for i,l in enumerate(['predicted', 'probability', 'loss']):
= [b.show(ctx=c, label=l, **kwargs) for b,c in zip(outs.itemgot(i),rows)]
rows display_df(pd.DataFrame(rows))
导出 -
from nbdev import nbdev_export
nbdev_export()