! [ -e /content ] && pip install -Uqq fastai # 在Colab上升级fastai
AWD-LSTM
from __future__ import annotations
from fastai.data.all import *
from fastai.text.core import *
from nbdev.showdoc import *
::: {#cell-4 .cell 0=‘默’ 1=‘认’ 2=‘导’ 3=‘出’ 4=’ ’ 5=‘t’ 6=‘e’ 7=‘x’ 8=‘t’ 9=‘.’ 10=‘m’ 11=‘o’ 12=‘d’ 13=‘e’ 14=‘l’ 15=‘s’ 16=‘.’ 17=‘a’ 18=‘w’ 19=‘d’ 20=‘l’ 21=‘s’ 22=‘t’ 23=‘m’}
### 默认类级别 3
:::
AWD LSTM 来自 Smerity et al.
基础自然语言处理模块
在pytorch或fastai的layers
之上,语言模型使用了一些特定于自然语言处理的自定义层。
def dropout_mask(
# 源张量,输出将与`x`的类型相同。
x:Tensor, list, # dropout 掩码的大小为 `int` 类型
sz:float # 丢弃概率
p:-> Tensor: # 乘性丢弃掩码
) "Return a dropout mask of the same type as `x`, size `sz`, with probability `p` to cancel an element."
return x.new_empty(*sz).bernoulli_(1-p).div_(1-p)
= dropout_mask(torch.randn(3,4), [4,3], 0.25)
t 4,3])
test_eq(t.shape, [assert ((t == 4/3) + (t==0)).all()
class RNNDropout(Module):
"Dropout with probability `p` that is consistent on the seq_len dimension."
def __init__(self, p:float=0.5): self.p=p
def forward(self, x):
if not self.training or self.p == 0.: return x
return x * dropout_mask(x.data, (x.size(0), 1, *x.shape[2:]), self.p)
= RNNDropout(0.3)
dp = torch.randn(4,3,7)
tst_inp = dp(tst_inp)
tst_out for i in range(4):
for j in range(7):
if tst_out[i,0,j] == 0: assert (tst_out[i,:,j] == 0).all()
else: test_close(tst_out[i,:,j], tst_inp[i,:,j]/(1-0.3))
它还支持对一个序列的图像进行.dropout,其中时间维度是第一个轴,包含10张3通道、32x32的图像。
= dp(torch.rand(4,10,3,32,32)) _
class WeightDropout(Module):
"A module that wraps another layer in which some weights will be replaced by 0 during training."
def __init__(self,
# 包装模块
module:nn.Module, float, # 权重丢弃概率
weight_p:str|MutableSequence='weight_hh_l0' # 要应用dropout的参数名称
layer_names:
):self.module,self.weight_p,self.layer_names = module,weight_p,L(layer_names)
for layer in self.layer_names:
#复制所选层的权重。
= getattr(self.module, layer)
w delattr(self.module, layer)
self.register_parameter(f'{layer}_raw', nn.Parameter(w.data))
setattr(self.module, layer, w.clone())
if isinstance(self.module, (nn.RNNBase, nn.modules.rnn.RNNBase)):
self.module.flatten_parameters = self._do_nothing
def _setweights(self):
"Apply dropout to the raw weights."
for layer in self.layer_names:
= getattr(self, f'{layer}_raw')
raw_w if self.training: w = F.dropout(raw_w, p=self.weight_p)
else: w = raw_w.clone()
setattr(self.module, layer, w)
def forward(self, *args):
self._setweights()
with warnings.catch_warnings():
# To avoid the warning that comes because the weights aren't flattened.
"ignore", category=UserWarning)
warnings.simplefilter(return self.module(*args)
def reset(self):
for layer in self.layer_names:
= getattr(self, f'{layer}_raw')
raw_w setattr(self.module, layer, raw_w.clone())
if hasattr(self.module, 'reset'): self.module.reset()
def _do_nothing(self): pass
= nn.LSTM(5,7)
module = WeightDropout(module, 0.4)
dp_module = dp_module.module.weight_hh_l0
wgts = torch.randn(10,20,5)
tst_inp = torch.zeros(1,20,7), torch.zeros(1,20,7)
h
dp_module.reset()= dp_module(tst_inp,h)
x,h = x.sum()
loss
loss.backward()= getattr(dp_module.module, 'weight_hh_l0')
new_wgts getattr(dp_module, 'weight_hh_l0_raw'))
test_eq(wgts, assert 0.2 <= (new_wgts==0).sum().float()/new_wgts.numel() <= 0.6
assert dp_module.weight_hh_l0_raw.requires_grad
assert dp_module.weight_hh_l0_raw.grad is not None
assert ((dp_module.weight_hh_l0_raw.grad == 0.) & (new_wgts == 0.)).any()
class EmbeddingDropout(Module):
"Apply dropout with probability `embed_p` to an embedding layer `emb`."
def __init__(self,
# 包装嵌入层
emb:nn.Embedding, float # 嵌入层丢弃概率
embed_p:
):self.emb,self.embed_p = emb,embed_p
def forward(self, words, scale=None):
if self.training and self.embed_p != 0:
= (self.emb.weight.size(0),1)
size = dropout_mask(self.emb.weight.data, size, self.embed_p)
mask = self.emb.weight * mask
masked_embed else: masked_embed = self.emb.weight
if scale: masked_embed.mul_(scale)
return F.embedding(words, masked_embed, ifnone(self.emb.padding_idx, -1), self.emb.max_norm,
self.emb.norm_type, self.emb.scale_grad_by_freq, self.emb.sparse)
= nn.Embedding(10, 7, padding_idx=1)
enc = EmbeddingDropout(enc, 0.5)
enc_dp = torch.randint(0,10,(8,))
tst_inp = enc_dp(tst_inp)
tst_out for i in range(8):
assert (tst_out[i]==0).all() or torch.allclose(tst_out[i], 2*enc.weight[tst_inp[i]])
class AWD_LSTM(Module):
"AWD-LSTM inspired by https://arxiv.org/abs/1708.02182"
=0.1
initrange
def __init__(self,
int, # 词汇量大小
vocab_sz:int, # 嵌入向量的尺寸
emb_sz:int, # 隐藏状态中的特征数量
n_hid:int, # LSTM层数
n_layers:int=1, # 填充标记ID
pad_token:float=0.2, # 隐藏层之间状态的丢弃概率
hidden_p:float=0.6, # LSTM堆栈输入的丢弃概率
input_p:float=0.1, # 嵌入层丢弃概率
embed_p:float=0.5, # LSTM层中隐藏到隐藏权重的丢弃概率
weight_p:bool=False # 如果设置为 `True`,则使用双向 LSTM 层。
bidir:
):'emb_sz,n_hid,n_layers,pad_token')
store_attr(self.bs = 1
self.n_dir = 2 if bidir else 1
self.encoder = nn.Embedding(vocab_sz, emb_sz, padding_idx=pad_token)
self.encoder_dp = EmbeddingDropout(self.encoder, embed_p)
self.rnns = nn.ModuleList([self._one_rnn(emb_sz if l == 0 else n_hid, (n_hid if l != n_layers - 1 else emb_sz)//self.n_dir,
for l in range(n_layers)])
bidir, weight_p, l) self.encoder.weight.data.uniform_(-self.initrange, self.initrange)
self.input_dp = RNNDropout(input_p)
self.hidden_dps = nn.ModuleList([RNNDropout(hidden_p) for l in range(n_layers)])
self.reset()
def forward(self, inp:Tensor, from_embeds:bool=False):
= inp.shape[:2] if from_embeds else inp.shape
bs,sl if bs!=self.bs: self._change_hidden(bs)
= self.input_dp(inp if from_embeds else self.encoder_dp(inp))
output = []
new_hidden for l, (rnn,hid_dp) in enumerate(zip(self.rnns, self.hidden_dps)):
= rnn(output, self.hidden[l])
output, new_h
new_hidden.append(new_h)if l != self.n_layers - 1: output = hid_dp(output)
self.hidden = to_detach(new_hidden, cpu=False, gather=False)
return output
def _change_hidden(self, bs):
self.hidden = [self._change_one_hidden(l, bs) for l in range(self.n_layers)]
self.bs = bs
def _one_rnn(self, n_in, n_out, bidir, weight_p, l):
"Return one of the inner rnn"
= nn.LSTM(n_in, n_out, 1, batch_first=True, bidirectional=bidir)
rnn return WeightDropout(rnn, weight_p)
def _one_hidden(self, l):
"Return one hidden state"
= (self.n_hid if l != self.n_layers - 1 else self.emb_sz) // self.n_dir
nh return (one_param(self).new_zeros(self.n_dir, self.bs, nh), one_param(self).new_zeros(self.n_dir, self.bs, nh))
def _change_one_hidden(self, l, bs):
if self.bs < bs:
= (self.n_hid if l != self.n_layers - 1 else self.emb_sz) // self.n_dir
nh return tuple(torch.cat([h, h.new_zeros(self.n_dir, bs-self.bs, nh)], dim=1) for h in self.hidden[l])
if self.bs > bs: return (self.hidden[l][0][:,:bs].contiguous(), self.hidden[l][1][:,:bs].contiguous())
return self.hidden[l]
def reset(self):
"Reset the hidden states"
for r in self.rnns if hasattr(r, 'reset')]
[r.reset() self.hidden = [self._one_hidden(l) for l in range(self.n_layers)]
这是AWD-LSTM模型的核心,嵌入层使用来自vocab_sz
和emb_sz
的嵌入,n_layers
个LSTM(可能是双向的)堆叠,第一个LSTM从emb_sz
到n_hid
,最后一个从n_hid
到emb_sz
,而所有内部的LSTM则是从n_hid
到n_hid
。pad_token
被传递给PyTorch的嵌入层。Dropout的应用如下:
- 嵌入被包装在概率为
embed_p
的EmbeddingDropout
中; - 这个嵌入层的结果经过概率为
input_p
的RNNDropout
; - 每个LSTM应用了概率为
weight_p
的WeightDropout
; - 在两个内部LSTM之间,应用了概率为
hidden_p
的RNNDropout
。
该模块返回两个列表:每个内部LSTM的原始输出(未应用hidden_p
的dropout)和应用了dropout的输出列表。由于最后一个输出没有应用dropout,这两个列表的最后一个元素是相同的,即应传递给解码器的输出(在语言模型的情况下)。
= AWD_LSTM(100, 20, 10, 2, hidden_p=0.2, embed_p=0.02, input_p=0.1, weight_p=0.2)
tst = torch.randint(0, 100, (10,5))
x = tst(x)
r 10)
test_eq(tst.bs, len(tst.hidden), 2)
test_eq(for h_ in tst.hidden[0]], [[1,10,10], [1,10,10]])
test_eq([h_.shape for h_ in tst.hidden[1]], [[1,10,20], [1,10,20]])
test_eq([h_.shape
10,5,20])
test_eq(r.shape, [-1], tst.hidden[-1][0][0]) #隐藏状态是原始输出中的最后一个时间步
test_eq(r[:,
eval()
tst.
tst.reset();
tst(x); tst(x)
#测试浏览器设置更改
= torch.randint(0, 100, (6,5))
x = tst(x)
r 6) test_eq(tst.bs,
#|CUDA
= AWD_LSTM(100, 20, 10, 2, bidir=True).to('cuda')
tst
tst.reset()= torch.randint(0, 100, (10,5)).to('cuda')
x = tst(x)
r
= torch.randint(0, 100, (6,5), device='cuda')
x = tst(x) r
def awd_lstm_lm_split(model):
"Split a RNN `model` in groups for differential learning rates."
= [nn.Sequential(rnn, dp) for rnn, dp in zip(model[0].rnns, model[0].hidden_dps)]
groups = L(groups + [nn.Sequential(model[0].encoder, model[0].encoder_dp, model[1])])
groups return groups.map(params)
= dict(emb_sz=400, n_hid=1152, n_layers=3, pad_token=1, bidir=False, output_p=0.1,
awd_lstm_lm_config =0.15, input_p=0.25, embed_p=0.02, weight_p=0.2, tie_weights=True, out_bias=True) hidden_p
def awd_lstm_clas_split(model):
"Split a RNN `model` in groups for differential learning rates."
= [nn.Sequential(model[0].module.encoder, model[0].module.encoder_dp)]
groups += [nn.Sequential(rnn, dp) for rnn, dp in zip(model[0].module.rnns, model[0].module.hidden_dps)]
groups = L(groups + [model[1]])
groups return groups.map(params)
= dict(emb_sz=400, n_hid=1152, n_layers=3, pad_token=1, bidir=False, output_p=0.4,
awd_lstm_clas_config =0.3, input_p=0.4, embed_p=0.05, weight_p=0.5) hidden_p
导出 -
from nbdev import nbdev_export
nbdev_export()
Converted 00_torch_core.ipynb.
Converted 01_layers.ipynb.
Converted 01a_losses.ipynb.
Converted 02_data.load.ipynb.
Converted 03_data.core.ipynb.
Converted 04_data.external.ipynb.
Converted 05_data.transforms.ipynb.
Converted 06_data.block.ipynb.
Converted 07_vision.core.ipynb.
Converted 08_vision.data.ipynb.
Converted 09_vision.augment.ipynb.
Converted 09b_vision.utils.ipynb.
Converted 09c_vision.widgets.ipynb.
Converted 10_tutorial.pets.ipynb.
Converted 10b_tutorial.albumentations.ipynb.
Converted 11_vision.models.xresnet.ipynb.
Converted 12_optimizer.ipynb.
Converted 13_callback.core.ipynb.
Converted 13a_learner.ipynb.
Converted 13b_metrics.ipynb.
Converted 14_callback.schedule.ipynb.
Converted 14a_callback.data.ipynb.
Converted 15_callback.hook.ipynb.
Converted 15a_vision.models.unet.ipynb.
Converted 16_callback.progress.ipynb.
Converted 17_callback.tracker.ipynb.
Converted 18_callback.fp16.ipynb.
Converted 18a_callback.training.ipynb.
Converted 18b_callback.preds.ipynb.
Converted 19_callback.mixup.ipynb.
Converted 20_interpret.ipynb.
Converted 20a_distributed.ipynb.
Converted 21_vision.learner.ipynb.
Converted 22_tutorial.imagenette.ipynb.
Converted 23_tutorial.vision.ipynb.
Converted 24_tutorial.image_sequence.ipynb.
Converted 24_tutorial.siamese.ipynb.
Converted 24_vision.gan.ipynb.
Converted 30_text.core.ipynb.
Converted 31_text.data.ipynb.
Converted 32_text.models.awdlstm.ipynb.
Converted 33_text.models.core.ipynb.
Converted 34_callback.rnn.ipynb.
Converted 35_tutorial.wikitext.ipynb.
Converted 37_text.learner.ipynb.
Converted 38_tutorial.text.ipynb.
Converted 39_tutorial.transformers.ipynb.
Converted 40_tabular.core.ipynb.
Converted 41_tabular.data.ipynb.
Converted 42_tabular.model.ipynb.
Converted 43_tabular.learner.ipynb.
Converted 44_tutorial.tabular.ipynb.
Converted 45_collab.ipynb.
Converted 46_tutorial.collab.ipynb.
Converted 50_tutorial.datablock.ipynb.
Converted 60_medical.imaging.ipynb.
Converted 61_tutorial.medical_imaging.ipynb.
Converted 65_medical.text.ipynb.
Converted 70_callback.wandb.ipynb.
Converted 71_callback.tensorboard.ipynb.
Converted 72_callback.neptune.ipynb.
Converted 73_callback.captum.ipynb.
Converted 74_callback.azureml.ipynb.
Converted 97_test_utils.ipynb.
Converted 99_pytorch_doc.ipynb.
Converted dev-setup.ipynb.
Converted app_examples.ipynb.
Converted camvid.ipynb.
Converted migrating_catalyst.ipynb.
Converted migrating_ignite.ipynb.
Converted migrating_lightning.ipynb.
Converted migrating_pytorch.ipynb.
Converted migrating_pytorch_verbose.ipynb.
Converted ulmfit.ipynb.
Converted index.ipynb.
Converted index_original.ipynb.
Converted quick_start.ipynb.
Converted tutorial.ipynb.