! [ -e /content ] && pip install -Uqq fastai # 在Colab上升级fastai
from __future__ import annotations
from fastai.basics import *
from fastai.text.core import *
from fastai.text.data import *
from fastai.text.models.core import *
from fastai.text.models.awdlstm import *
from fastai.callback.rnn import *
from fastai.callback.progress import *
from nbdev.showdoc import *
该模块最重要的功能是 language_model_learner
和 text_classifier_learner
。它们将帮助您使用预训练模型定义一个 Learner
。有关用法示例,请参见 文本教程。
def match_embeds(
dict, # 嵌入权重
old_wgts:list, # 用于预训练的语料库词汇
old_vocab:list # 当前语料库词汇
new_vocab:-> dict:
) "Convert the embedding in `old_wgts` to go from `old_vocab` to `new_vocab`."
= old_wgts.get('1.decoder.bias', None), old_wgts['0.encoder.weight']
bias, wgts = wgts.mean(0)
wgts_m = wgts.new_zeros((len(new_vocab),wgts.size(1)))
new_wgts if bias is not None:
= bias.mean(0)
bias_m = bias.new_zeros((len(new_vocab),))
new_bias = old_vocab.o2i if hasattr(old_vocab, 'o2i') else {w:i for i,w in enumerate(old_vocab)}
old_o2i for i,w in enumerate(new_vocab):
= old_o2i.get(w, -1)
idx = wgts[idx] if idx>=0 else wgts_m
new_wgts[i] if bias is not None: new_bias[i] = bias[idx] if idx>=0 else bias_m
'0.encoder.weight'] = new_wgts
old_wgts[if '0.encoder_dp.emb.weight' in old_wgts: old_wgts['0.encoder_dp.emb.weight'] = new_wgts.clone()
'1.decoder.weight'] = new_wgts.clone()
old_wgts[if bias is not None: old_wgts['1.decoder.bias'] = new_bias
return old_wgts
对于 new_vocab
中没有对应匹配的 old_vocab
= {'0.encoder.weight': torch.randn(5,3)}
wgts = match_embeds(wgts.copy(), ['a', 'b', 'c'], ['a', 'c', 'd', 'b'])
new_wgts = wgts['0.encoder.weight'],new_wgts['0.encoder.weight']
old,new 0], old[0])
test_eq(new[1], old[2])
test_eq(new[2], old.mean(0))
test_eq(new[3], old[1]) test_eq(new[
= {'0.encoder.weight': torch.randn(5,3), '1.decoder.bias': torch.randn(5)}
wgts = match_embeds(wgts.copy(), ['a', 'b', 'c'], ['a', 'c', 'd', 'b'])
new_wgts = wgts['0.encoder.weight'],new_wgts['0.encoder.weight']
old_w,new_w = wgts['1.decoder.bias'], new_wgts['1.decoder.bias']
old_b,new_b 0], old_w[0])
test_eq(new_w[1], old_w[2])
test_eq(new_w[2], old_w.mean(0))
test_eq(new_w[3], old_w[1])
test_eq(new_w[0], old_b[0])
test_eq(new_b[1], old_b[2])
test_eq(new_b[2], old_b.mean(0))
test_eq(new_b[3], old_b[1]) test_eq(new_b[
def _get_text_vocab(dls:DataLoaders) -> list:
"Get vocabulary from `DataLoaders`"
= dls.vocab
vocab if isinstance(vocab, L): vocab = vocab[0]
return vocab
def load_ignore_keys(
# 模型架构
model, dict # 模型权重
wgts:-> tuple:
) "Load `wgts` in `model` ignoring the names of the keys, just taking parameters in order"
= model.state_dict()
sd for k1,k2 in zip(sd.keys(), wgts.keys()): sd[k1].data = wgts[k2].data.clone()
return model.load_state_dict(sd)
def _rm_module(n:str):
= n.split('.')
t for i in range(len(t)-1, -1, -1):
if t[i] == 'module':
return '.'.join(t)
def clean_raw_keys(wgts:dict):
= list(wgts.keys())
keys for k in keys:
= k.split('.module')
t if f'{_rm_module(k)}_raw' in keys: del wgts[k]
return wgts
def load_model_text(
file:str, # 保存的文本模型文件名
# 模型架构
model, # `优化器`用于拟合模型
opt:Optimizer, bool=None, # 启用加载 `优化器` 状态
with_opt:int|str|torch.device=None, # Sets the device, uses 'cpu' if unspecified
device:bool=True # 是否严格要求`file`的状态字典键与模型`Module.state_dict`的键完全匹配
):"Load `model` from `file` along with `opt` (if available, and if `with_opt`)"
distrib_barrier()if isinstance(device, int): device = torch.device('cuda', device)
elif device is None: device = 'cpu'
= torch.load(file, map_location=device)
state = set(state)=={'model', 'opt'}
hasopt = state['model'] if hasopt else state
model_state =strict)
get_model(model).load_state_dict(clean_raw_keys(model_state), strictif hasopt and ifnone(with_opt,True):
try: opt.load_state_dict(state['opt'])
if with_opt: warn("Could not load the optimizer state.")
elif with_opt: warn("Saved file doesn't contain an optimizer state.")
class TextLearner(Learner):
"Basic class for a `Learner` in NLP."
def __init__(self,
# 文本 `DataLoaders`
dls:DataLoaders, # 一个标准的PyTorch模型
model, float=2., # `RNNRegularizer`的参数
alpha:float=1., # `RNNRegularizer`的参数
beta:tuple=(0.8,0.7,0.8), # `余弦退火调度器`的势头
):super().__init__(dls, model, moms=moms, **kwargs)
def save_encoder(self,
file:str # `Encoder` 的文件名
):"Save the encoder to `file` in the model directory"
if rank_distrib(): return # don't save if child proc
= get_model(self.model)[0]
encoder if hasattr(encoder, 'module'): encoder = encoder.module
file, self.path/self.model_dir, ext='.pth'))
torch.save(encoder.state_dict(), join_path_file(
def load_encoder(self,
file:str, # Filename of the saved encoder
int|str|torch.device=None # 用于加载的设备,默认为 `dls` 设备
):"Load the encoder `file` from the model directory, optionally ensuring it's on `device`"
= get_model(self.model)[0]
encoder if device is None: device = self.dls.device
if hasattr(encoder, 'module'): encoder = encoder.module
distrib_barrier()= torch.load(join_path_file(file,self.path/self.model_dir, ext='.pth'), map_location=device)
return self
def load_pretrained(self,
str, # Filename of saved weights
wgts_fname:str, # Saved vocabulary filename in pickle format
vocab_fname:=None # Model to load parameters from, defaults to `Learner.model`
):"Load a pretrained model and adapt it to the data vocabulary."
= load_pickle(vocab_fname)
old_vocab = _get_text_vocab(self.dls)
distrib_barrier()= torch.load(wgts_fname, map_location = lambda storage,loc: storage)
wgts if 'model' in wgts: wgts = wgts['model'] #以防预训练模型是与优化器一起保存的
= match_embeds(wgts, old_vocab, new_vocab)
wgts self.model if model is None else model, clean_raw_keys(wgts))
return self
def load(self,
file:str, # 保存模型的文件名
bool=None, # 启用加载 `优化器` 状态
with_opt:int|str|torch.device=None, # 用于加载的设备,默认为 `dls` 设备
):if device is None: device = self.dls.device
if self.opt is None: self.create_opt()
file = join_path_file(file, self.path/self.model_dir, ext='.pth')
file, self.model, self.opt, device=device, **kwargs)
load_model_text(return self
添加了一个 ModelResetter
和一个带有 alpha
和 beta
的 RNNRegularizer
到回调中,其余部分与 Learner
这个 Learner
TextLearner.load_pretrained (wgts_fname:str, vocab_fname:str, model=None)
Load a pretrained model and adapt it to the data vocabulary.
Type | Default | Details | |
wgts_fname | str | Filename of saved weights | |
vocab_fname | str | Saved vocabulary filename in pickle format | |
model | NoneType | None | Model to load parameters from, defaults to Learner.model |
TextLearner.save_encoder (file:str)
Save the encoder to file
in the model directory
Type | Details | |
file | str | Filename for Encoder |
模型目录为 Learner.path/Learner.model_dir
TextLearner.load_encoder (file:str, device:int|str|torch.device=None)
Load the encoder file
from the model directory, optionally ensuring it’s on device
Type | Default | Details | |
file | str | Filename of the saved encoder | |
device | int | str | torch.device | None | Device used to load, defaults to dls device |
对于语言模型,predict 方法与其他应用有很大的不同,这就是为什么它需要自己的子类。
def decode_spec_tokens(tokens):
"Decode the special tokens in `tokens`"
= [],None,None
new_toks,rule,arg for t in tokens:
if t in [TK_MAJ, TK_UP, TK_REP, TK_WREP]: rule = t
elif rule is None: new_toks.append(t)
elif rule == TK_MAJ:
1].upper() + t[1:].lower())
new_toks.append(t[:= None
rule elif rule == TK_UP:
new_toks.append(t.upper())= None
rule elif arg is None:
try: arg = int(t)
except: rule = None
if rule == TK_REP: new_toks.append(t * arg)
else: new_toks += [t] * arg
return new_toks
'xxmaj', 'text']), ['Text'])
test_eq(decode_spec_tokens(['xxup', 'text']), ['TEXT'])
test_eq(decode_spec_tokens(['xxrep', '3', 'a']), ['aaa'])
test_eq(decode_spec_tokens(['xxwrep', '3', 'word']), ['word', 'word', 'word']) test_eq(decode_spec_tokens([
class LMLearner(TextLearner):
"Add functionality to `TextLearner` when dealing with a language model"
def predict(self, text, n_words=1, no_unk=True, temperature=1., min_p=None, no_bar=False,
=decode_spec_tokens, only_last_word=False):
decoder"Return `text` and the `n_words` that come after"
= idxs_all = self.dls.test_dl([text]).items[0].to(self.dls.device)
idxs if no_unk: unk_idx = self.dls.vocab.index(UNK)
for _ in (range(n_words) if no_bar else progress_bar(range(n_words), leave=False)):
with self.no_bar(): preds,_ = self.get_preds(dl=[(idxs[None],)])
= preds[0][-1]
res if no_unk: res[unk_idx] = 0.
if min_p is not None:
if (res >= min_p).float().sum() == 0:
f"There is no item with probability >= {min_p}, try a lower value.")
warn(else: res[res < min_p] = 0.
if temperature != 1.: res.pow_(1 / temperature)
= torch.multinomial(res, 1).item()
idx = idxs_all = torch.cat([idxs_all, idxs.new([idx])])
idxs if only_last_word: idxs = idxs[-1][None]
= self.dls.train_ds.numericalize
num = [num.vocab[i] for i in idxs_all if num.vocab[i] not in [BOS, PAD]]
tokens = self.dls.train_ds.tokenizer.sep
sep return sep.join(decoder(tokens))
def get_preds(self, concat_dim=1, **kwargs): return super().get_preds(concat_dim=1, **kwargs)
=3) show_doc(LMLearner, title_level
LMLearner (dls:DataLoaders, model, alpha:float=2.0, beta:float=1.0, moms:tuple=(0.8, 0.7, 0.8), loss_func:callable|None=None, opt_func=<function Adam>, lr=0.001, splitter:callable=<function trainable_params>, cbs=None, metrics=None, path=None, model_dir='models', wd=None, wd_bn_bias=False, train_bn=True, default_cbs:bool=True)
Add functionality to TextLearner
when dealing with a language model
Type | Default | Details | |
dls | DataLoaders | Text DataLoaders |
model | A standard PyTorch model | ||
alpha | float | 2.0 | Param for RNNRegularizer |
beta | float | 1.0 | Param for RNNRegularizer |
moms | tuple | (0.8, 0.7, 0.8) | Momentum for Cosine Annealing Scheduler |
loss_func | callable | None | None | Loss function for training |
opt_func | function | Adam | Optimisation function for training |
lr | float | 0.001 | Learning rate |
splitter | callable | trainable_params | Used to split parameters into layer groups |
cbs | NoneType | None | Callbacks |
metrics | NoneType | None | Printed after each epoch |
path | NoneType | None | Parent directory to save, load, and export models |
model_dir | str | models | Subdirectory to save and load models |
wd | NoneType | None | Weight decay |
wd_bn_bias | bool | False | Apply weight decay to batchnorm bias params? |
train_bn | bool | True | Always train batchnorm layers? |
default_cbs | bool | True | Include default callbacks? |
LMLearner.predict (text, n_words=1, no_unk=True, temperature=1.0, min_p=None, no_bar=False, decoder=<function decode_spec_tokens>, only_last_word=False)
Return text
and the n_words
that come after
from fastai.text.models.core import _model_meta
def _get_text_vocab(dls):
= dls.vocab
vocab if isinstance(vocab, L): vocab = vocab[0]
return vocab
def language_model_learner(dls, arch, config=None, drop_mult=1., backwards=False, pretrained=True, pretrained_fnames=None, **kwargs):
"Create a `Learner` with a language model from `dls` and `arch`."
= _get_text_vocab(dls)
vocab = get_language_model(arch, len(vocab), config=config, drop_mult=drop_mult)
model = _model_meta[arch]
meta = LMLearner(dls, model, loss_func=CrossEntropyLossFlat(), splitter=meta['split_lm'], **kwargs)
learn = 'url_bwd' if backwards else 'url'
url if pretrained or pretrained_fnames:
if pretrained_fnames is not None:
= [learn.path/learn.model_dir/f'{fn}.{ext}' for fn,ext in zip(pretrained_fnames, ['pth', 'pkl'])]
fnames else:
if url not in meta:
"There are no pretrained weights for that architecture yet!")
warn(return learn
= untar_data(meta[url] , c_key='model')
model_path try: fnames = [list(model_path.glob(f'*.{ext}'))[0] for ext in ['pth', 'pkl']]
except IndexError: print(f'The model in {model_path} is incomplete, download again'); raise
= learn.load_pretrained(*fnames)
learn return learn
您可以使用 config
来自定义使用的架构(为此更改 awd_lstm_lm_config
将使用 fastai 的预训练模型(如果可用)或您可以传递包含您自己的预训练模型和相应词汇表的具体 pretrained_fnames
。所有其他参数都将传递给 Learner
= untar_data(URLs.IMDB_SAMPLE)
path = pd.read_csv(path/'texts.csv')
df = TextDataLoaders.from_df(df, path=path, text_col='text', is_lm=True, valid_col='is_valid')
dls = language_model_learner(dls, AWD_LSTM) learn
您可以使用 .predict
'This movie is about', n_words=20) learn.predict(
'This movie is about plans by Tom Cruise to win a loyalty sharing award at the Battle of Christmas'
默认情况下,每次预测一个单词后,整个句子会被重新输入到模型中,这个小技巧提升了生成文本的质量。如果你只想输入最后一个单词,请指定参数 only_last_word
'This movie is about', n_words=20, only_last_word=True) learn.predict(
'This movie is about the J. Intelligent , ha - agency . Griffith , and Games on the early after'
def text_classifier_learner(dls, arch, seq_len=72, config=None, backwards=False, pretrained=True, drop_mult=0.5, n_out=None,
=None, ps=None, max_len=72*20, y_range=None, **kwargs):
lin_ftrs"Create a `Learner` with a text classifier from `dls` and `arch`."
= _get_text_vocab(dls)
vocab if n_out is None: n_out = get_c(dls)
assert n_out, "`n_out` is not defined, and could not be inferred from data, set `dls.c` or pass `n_out`"
= get_text_classifier(arch, len(vocab), n_out, seq_len=seq_len, config=config, y_range=y_range,
model =drop_mult, lin_ftrs=lin_ftrs, ps=ps, max_len=max_len)
drop_mult= _model_meta[arch]
meta = TextLearner(dls, model, splitter=meta['split_clas'], **kwargs)
learn = 'url_bwd' if backwards else 'url'
url if pretrained:
if url not in meta:
"There are no pretrained weights for that architecture yet!")
warn(return learn
= untar_data(meta[url], c_key='model')
model_path try: fnames = [list(model_path.glob(f'*.{ext}'))[0] for ext in ['pth', 'pkl']]
except IndexError: print(f'The model in {model_path} is incomplete, download again'); raise
= learn.load_pretrained(*fnames, model=learn.model[0])
learn.freeze()return learn
您可以使用 config
自定义所使用的架构(通过更改 awd_lstm_clas_config
将使用 fastai 的预训练模型来支持该 arch
通常从 dls
该模型使用 SentenceEncoder
,这意味着文本一次传递 seq_len
个标记,并且只会在最后的 max_len
和 ps
被传递给 get_text_classifier
所有其他参数都被传递给 Learner
= untar_data(URLs.IMDB_SAMPLE)
path = pd.read_csv(path/'texts.csv')
df = TextDataLoaders.from_df(df, path=path, text_col='text', label_col='label', valid_col='is_valid')
dls = text_classifier_learner(dls, AWD_LSTM) learn
显示方法 -
def show_results(x: LMTensorText, y, samples, outs, ctxs=None, max_n=10, **kwargs):
if ctxs is None: ctxs = get_empty_df(min(len(samples), max_n))
for i,l in enumerate(['input', 'target']):
= [b.show(ctx=c, label=l, **kwargs) for b,c,_ in zip(samples.itemgot(i),ctxs,range(max_n))]
ctxs = [b.show(ctx=c, label='pred', **kwargs) for b,c,_ in zip(outs.itemgot(0),ctxs,range(max_n))]
display_df(pd.DataFrame(ctxs))return ctxs
def show_results(x: TensorText, y, samples, outs, ctxs=None, max_n=10, trunc_at=150, **kwargs):
if ctxs is None: ctxs = get_empty_df(min(len(samples), max_n))
= L((s[0].truncate(trunc_at),*s[1:]) for s in samples)
samples = show_results[object](x, y, samples, outs, ctxs=ctxs, max_n=max_n, **kwargs)
display_df(pd.DataFrame(ctxs))return ctxs
def plot_top_losses(x: TensorText, y:TensorCategory, samples, outs, raws, losses, trunc_at=150, **kwargs):
= get_empty_df(len(samples))
rows = L((s[0].truncate(trunc_at),*s[1:]) for s in samples)
samples for i,l in enumerate(['input', 'target']):
= [b.show(ctx=c, label=l, **kwargs) for b,c in zip(samples.itemgot(i),rows)]
rows = L(o + (TitledFloat(r.max().item()), TitledFloat(l.item())) for o,r,l in zip(outs, raws, losses))
outs for i,l in enumerate(['predicted', 'probability', 'loss']):
= [b.show(ctx=c, label=l, **kwargs) for b,c in zip(outs.itemgot(i),rows)]
rows display_df(pd.DataFrame(rows))
导出 -
from nbdev import nbdev_export