核心文本模块

! [ -e /content ] && pip install -Uqq fastai  # 在Colab上升级fastai
from __future__ import annotations
from fastai.data.all import *
from fastai.text.core import *
from fastai.text.models.awdlstm import *
from nbdev.showdoc import *

::: {#cell-4 .cell 0=‘默’ 1=‘认’ 2=‘导’ 3=‘出’ 4=’ ’ 5=‘t’ 6=‘e’ 7=‘x’ 8=‘t’ 9=‘.’ 10=‘m’ 11=‘o’ 12=‘d’ 13=‘e’ 14=‘l’ 15=‘s’ 16=‘.’ 17=‘c’ 18=‘o’ 19=‘r’ 20=‘e’}

### 默认类级别 3

:::

包含在不同架构之间共有的模块和获取模型的通用函数

::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

_model_meta = {AWD_LSTM: {'hid_name':'emb_sz', 'url':URLs.WT103_FWD, 'url_bwd':URLs.WT103_BWD,
                          'config_lm':awd_lstm_lm_config, 'split_lm': awd_lstm_lm_split,
                          'config_clas':awd_lstm_clas_config, 'split_clas': awd_lstm_clas_split},}
              # Transformer: {'hid_name':'d_model', 'url':URLs.OPENAI_TRANSFORMER,
              #               'config_lm':tfmer_lm_config, 'split_lm': tfmer_lm_split,
              #               'config_clas':tfmer_clas_config, 'split_clas': tfmer_clas_split},
              # TransformerXL: {'hid_name':'d_model',
              #                'config_lm':tfmerXL_lm_config, 'split_lm': tfmerXL_lm_split,
              #                'config_clas':tfmerXL_clas_config, 'split_clas': tfmerXL_clas_split}}

:::

语言模型

class LinearDecoder(Module):
    "To go on top of a RNNCore module and create a Language Model."
    initrange=0.1

    def __init__(self, 
        n_out:int, # 输出通道数量 
        n_hid:int, # 编码器最后一层输出中的特征数量
        output_p:float=0.1, # 输入丢弃概率
        tie_encoder:nn.Module=None, # 如果提供了 `module`,则会将解码器的权重绑定到 `tie_encoder.weight`。  
        bias:bool=True # 如果为 `False`,该层将不会学习加性偏置。
    ):
        self.decoder = nn.Linear(n_hid, n_out, bias=bias)
        self.decoder.weight.data.uniform_(-self.initrange, self.initrange)
        self.output_dp = RNNDropout(output_p)
        if bias: self.decoder.bias.data.zero_()
        if tie_encoder: self.decoder.weight = tie_encoder.weight

    def forward(self, input):
        dp_inp = self.output_dp(input)
        return self.decoder(dp_inp), input, dp_inp
from fastai.text.models.awdlstm import *
enc = AWD_LSTM(100, 20, 10, 2)
x = torch.randint(0, 100, (10,5))
r = enc(x)

tst = LinearDecoder(100, 20, 0.1)
y = tst(r)
test_eq(y[1], r)
test_eq(y[2].shape, r.shape)
test_eq(y[0].shape, [10, 5, 100])

tst = LinearDecoder(100, 20, 0.1, tie_encoder=enc.encoder)
test_eq(tst.decoder.weight, enc.encoder.weight)
class SequentialRNN(nn.Sequential):
    "A sequential module that passes the reset call to its children."
    def reset(self):
        for c in self.children(): getcallable(c, 'reset')()
class _TstMod(Module):
    def reset(self): print('reset')

tst = SequentialRNN(_TstMod(), _TstMod())
test_stdout(tst.reset, 'reset\nreset')
def get_language_model(
    arch, # 能够生成语言模型架构的函数或类
    vocab_sz:int, # 词汇量大小
    config:dict=None, # 模型配置字典
    drop_mult:float=1. # 用于缩放`config`中所有丢弃概率的乘法因子
) -> SequentialRNN: # 具有`arch`编码器和线性解码器的语言模型
    "Create a language model from `arch` and its `config`."
    meta = _model_meta[arch]
    config = ifnone(config, meta['config_lm']).copy()
    for k in config.keys():
        if k.endswith('_p'): config[k] *= drop_mult
    tie_weights,output_p,out_bias = map(config.pop, ['tie_weights', 'output_p', 'out_bias'])
    init = config.pop('init') if 'init' in config else None
    encoder = arch(vocab_sz, **config)
    enc = encoder.encoder if tie_weights else None
    decoder = LinearDecoder(vocab_sz, config[meta['hid_name']], output_p, tie_encoder=enc, bias=out_bias)
    model = SequentialRNN(encoder, decoder)
    return model if init is None else model.apply(init)

默认的 config 可以在 _model_meta[arch]['config_lm'] 中找到。drop_mult 应用于该配置中所有的 dropout 概率。

config = awd_lstm_lm_config.copy()
config.update({'n_hid':10, 'emb_sz':20})

tst = get_language_model(AWD_LSTM, 100, config=config)
x = torch.randint(0, 100, (10,5))
y = tst(x)
test_eq(y[0].shape, [10, 5, 100])
test_eq(y[1].shape, [10, 5, 20])
test_eq(y[2].shape, [10, 5, 20])
test_eq(tst[1].decoder.weight, tst[0].encoder.weight)
#测试丢弃倍数
tst = get_language_model(AWD_LSTM, 100, config=config, drop_mult=0.5)
test_eq(tst[1].output_dp.p, config['output_p']*0.5)
for rnn in tst[0].rnns: test_eq(rnn.weight_p, config['weight_p']*0.5)
for dp in tst[0].hidden_dps: test_eq(dp.p, config['hidden_p']*0.5)
test_eq(tst[0].encoder_dp.embed_p, config['embed_p']*0.5)
test_eq(tst[0].input_dp.p, config['input_p']*0.5)

分类模型

def _pad_tensor(t:Tensor, bs:int) -> Tensor:
    if t.size(0) < bs: return torch.cat([t, t.new_zeros(bs-t.size(0), *t.shape[1:])])
    return t
class SentenceEncoder(Module):
    "Create an encoder over `module` that can process a full sentence."
    def __init__(self, 
        bptt:int, # 时间反向传播
        module:nn.Module, # 一个能够处理最多 [`bs`, `bptt`] 个标记的模块
        pad_idx:int=1, # 填充标记ID 
        max_len:int=None # 最大输出长度
    ): 
        store_attr('bptt,module,pad_idx,max_len')
    
    def reset(self): getcallable(self.module, 'reset')()

    def forward(self, input):
        bs,sl = input.size()
        self.reset()
        mask = input == self.pad_idx
        outs,masks = [],[]
        for i in range(0, sl, self.bptt):
            #注意:这要求序列确实从bptt的整数倍轮次开始
            real_bs = (input[:,i] != self.pad_idx).long().sum()
            o = self.module(input[:real_bs,i: min(i+self.bptt, sl)])
            if self.max_len is None or sl-i <= self.max_len:
                outs.append(o)
                masks.append(mask[:,i: min(i+self.bptt, sl)])
        outs = torch.cat([_pad_tensor(o, bs) for o in outs], dim=1)
        mask = torch.cat(masks, dim=1)
        return outs,mask
Warning

此模块期望输入数据首部填充大多数填充部分,序列从 bptt 的整数倍开始(其余的填充在末尾)。请使用 pad_input_chunk 将数据转换为适当的格式。

mod = nn.Embedding(5, 10)
tst = SentenceEncoder(5, mod, pad_idx=0)
x = torch.randint(1, 5, (3, 15))
x[2,:5]=0
out,mask = tst(x)

test_eq(out[:1], mod(x)[:1])
test_eq(out[2,5:], mod(x)[2,5:])
test_eq(mask, x==0)
def masked_concat_pool(
    output:Tensor, # 句子编码器的输出
    mask:Tensor, # 由句子编码器返回的布尔掩码
    bptt:int # 时间反向传播
) -> Tensor: # [last_hidden, max_pool, avg_pool] 的串联
    "Pool `MultiBatchEncoder` outputs into one vector [last_hidden, max_pool, avg_pool]"
    lens = output.shape[1] - mask.long().sum(dim=1)
    last_lens = mask[:,-bptt:].long().sum(dim=1)
    avg_pool = output.masked_fill(mask[:, :, None], 0).sum(dim=1)
    avg_pool.div_(lens.type(avg_pool.dtype)[:,None])
    max_pool = output.masked_fill(mask[:,:,None], -float('inf')).max(dim=1)[0]
    x = torch.cat([output[torch.arange(0, output.size(0)),-last_lens-1], max_pool, avg_pool], 1) #连接池化。
    return x
out = torch.randn(2,4,5)
mask = tensor([[True,True,False,False], [False,False,False,True]])
x = masked_concat_pool(out, mask, 2)

test_close(x[0,:5], out[0,-1])
test_close(x[1,:5], out[1,-2])
test_close(x[0,5:10], out[0,2:].max(dim=0)[0])
test_close(x[1,5:10], out[1,:3].max(dim=0)[0])
test_close(x[0,10:], out[0,2:].mean(dim=0))
test_close(x[1,10:], out[1,:3].mean(dim=0))
#通过将填充部分替换为随机内容来测试结果是否与填充无关
out1 = torch.randn(2,4,5)
out1[0,2:] = out[0,2:].clone()
out1[1,:3] = out[1,:3].clone()
x1 = masked_concat_pool(out1, mask, 2)
test_eq(x, x1)
class PoolingLinearClassifier(Module):
    "Create a linear classifier with pooling"
    def __init__(self, 
        dims:list, # MLP的隐藏层大小列表,以`int`类型表示 
        ps:list, # 以`float`类型表示的辍学概率列表
        bptt:int, # 时间反向传播
        y_range:tuple=None # (低, 高)输出值范围的元组
     ):
        if len(ps) != len(dims)-1: raise ValueError("Number of layers and dropout values do not match.")
        acts = [nn.ReLU(inplace=True)] * (len(dims) - 2) + [None]
        layers = [LinBnDrop(i, o, p=p, act=a) for i,o,p,a in zip(dims[:-1], dims[1:], ps, acts)]
        if y_range is not None: layers.append(SigmoidRange(*y_range))
        self.layers = nn.Sequential(*layers)
        self.bptt = bptt

    def forward(self, input):
        out,mask = input
        x = masked_concat_pool(out, mask, self.bptt)
        x = self.layers(x)
        return x, out, out
mod = nn.Embedding(5, 10)
tst = SentenceEncoder(5, mod, pad_idx=0)
x = torch.randint(1, 5, (3, 15))
x[2,:5]=0
out,mask = tst(x)

test_eq(out[:1], mod(x)[:1])
test_eq(out[2,5:], mod(x)[2,5:])
test_eq(mask, x==0)
mod = nn.Embedding(5, 10)
tst = nn.Sequential(SentenceEncoder(5, mod, pad_idx=0), PoolingLinearClassifier([10*3,4], [0.], 5))

x = torch.randint(1, 5, (3, 14))
x[2,:5] = 0
res,raw,out = tst(x) 

test_eq(raw[:1], mod(x)[:1])
test_eq(raw[2,5:], mod(x)[2,5:])
test_eq(out[:1], mod(x)[:1])
test_eq(out[2,5:], mod(x)[2,5:])
test_eq(res.shape, [3,4])

x1 = torch.cat([x, tensor([0,0,0])[:,None]], dim=1)
res1,raw1,out1 = tst(x1) 
test_eq(res, res1)
def get_text_classifier(
    arch:callable, # 能够生成语言模型架构的函数或类
    vocab_sz:int, # 词汇量大小 
    n_class:int, # 班级数量
    seq_len:int=72, # 时间反向传播
    config:dict=None, # 编码器配置字典
    drop_mult:float=1., # 乘法因子,用于按比例调整 `config` 中所有 dropout 概率
    lin_ftrs:list=None, # 分类器头隐藏层大小的列表,以 `int` 类型表示
    ps:list=None, # 分类器头部的退出概率列表,以 `float` 类型表示
    pad_idx:int=1, # 填充标记ID
    max_len:int=72*20, # `SentenceEncoder`的最大输出长度
    y_range:tuple=None # 元组形式的(低,高)输出值范围
):
    "Create a text classifier from `arch` and its `config`, maybe `pretrained`"
    meta = _model_meta[arch]
    cfg = meta['config_clas'].copy()
    cfg.update(ifnone(config, {}))
    config = cfg
    for k in config.keys():
        if k.endswith('_p'): config[k] *= drop_mult
    if lin_ftrs is None: lin_ftrs = [50]
    if ps is None:  ps = [0.1]*len(lin_ftrs)
    layers = [config[meta['hid_name']] * 3] + lin_ftrs + [n_class]
    ps = [config.pop('output_p')] + ps
    init = config.pop('init') if 'init' in config else None
    encoder = SentenceEncoder(seq_len, arch(vocab_sz, **config), pad_idx=pad_idx, max_len=max_len)
    model = SequentialRNN(encoder, PoolingLinearClassifier(layers, ps, bptt=seq_len, y_range=y_range))
    return model if init is None else model.apply(init)
config = awd_lstm_clas_config.copy()
config.update({'n_hid':10, 'emb_sz':20})

tst = get_text_classifier(AWD_LSTM, 100, 3, config=config)
x = torch.randint(2, 100, (10,5))
y = tst(x)
test_eq(y[0].shape, [10, 3])
test_eq(y[1].shape, [10, 5, 20])
test_eq(y[2].shape, [10, 5, 20])
#测试填充得到相同结果
tst.eval()
y = tst(x)
x1 = torch.cat([x, tensor([2,1,1,1,1,1,1,1,1,1])[:,None]], dim=1)
y1 = tst(x1)
test_close(y[0][1:],y1[0][1:])
#测试丢弃倍数
tst = get_text_classifier(AWD_LSTM, 100, 3, config=config, drop_mult=0.5)
test_eq(tst[1].layers[1][1].p, 0.1)
test_eq(tst[1].layers[0][1].p, config['output_p']*0.5)
for rnn in tst[0].module.rnns: test_eq(rnn.weight_p, config['weight_p']*0.5)
for dp in tst[0].module.hidden_dps: test_eq(dp.p, config['hidden_p']*0.5)
test_eq(tst[0].module.encoder_dp.embed_p, config['embed_p']*0.5)
test_eq(tst[0].module.input_dp.p, config['input_p']*0.5)

导出 -

from nbdev import nbdev_export
nbdev_export()