视觉学习器

! [ -e /content ] && pip install -Uqq fastai  # 在Colab上升级fastai
from __future__ import annotations
from packaging.version import parse

from fastai.basics import *
from fastai.vision.core import *
from fastai.vision.data import *
from fastai.vision.augment import *
from fastai.vision import models

import torchvision
try: import timm
except ModuleNotFoundError: pass
from nbdev.showdoc import *

构建适合于计算机视觉中迁移学习的 Learner 所需的所有功能

该模块最重要的功能是 vision_learnerunet_learner。它们将帮助您使用预训练模型定义一个 Learner。请参阅视觉教程以获取使用示例。

剪裁预训练模型

def _is_pool_type(l): return re.search(r'Pool[123]d$', l.__class__.__name__)
m = nn.Sequential(nn.AdaptiveAvgPool2d(5), nn.Linear(2,3), nn.Conv2d(2,3,1), nn.MaxPool3d(5))
test_eq([bool(_is_pool_type(m_)) for m_ in m.children()], [True,False,False,True])

默认情况下,fastai库在池化层处截断预训练模型。此函数帮助检测它。

def has_pool_type(m):
    "Return `True` if `m` is a pooling layer or has one in its children"
    if _is_pool_type(m): return True
    for l in m.children():
        if has_pool_type(l): return True
    return False
m = nn.Sequential(nn.AdaptiveAvgPool2d(5), nn.Linear(2,3), nn.Conv2d(2,3,1), nn.MaxPool3d(5))
assert has_pool_type(m)
test_eq([has_pool_type(m_) for m_ in m.children()], [True,False,False,True])
def _get_first_layer(m):
    "Access first layer of a model"
    c,p,n = m,None,None  # 孩子,父母,名字
    for n in next(m.named_parameters())[0].split('.')[:-1]:
        p,c=c,getattr(c,n)
    return c,p,n
def _load_pretrained_weights(new_layer, previous_layer):
    "Load pretrained weights based on number of input channels"
    n_in = getattr(new_layer, 'in_channels')
    if n_in==1:
        # 我们求和。
        new_layer.weight.data = previous_layer.weight.data.sum(dim=1, keepdim=True)
    elif n_in==2:
        # 我们首先取前两个通道的50%。
        new_layer.weight.data = previous_layer.weight.data[:,:2] * 1.5
    else:
        # 保留3个通道的权重并将其他通道的权重设为空值
        new_layer.weight.data[:,:3] = previous_layer.weight.data
        new_layer.weight.data[:,3:].zero_()
def _update_first_layer(model, n_in, pretrained):
    "Change first layer based on number of input channels"
    if n_in == 3: return
    first_layer, parent, name = _get_first_layer(model)
    assert isinstance(first_layer, nn.Conv2d), f'Change of input channels only supported with Conv2d, found {first_layer.__class__.__name__}'
    assert getattr(first_layer, 'in_channels') == 3, f'Unexpected number of input channels, found {getattr(first_layer, "in_channels")} while expecting 3'
    params = {attr:getattr(first_layer, attr) for attr in 'out_channels kernel_size stride padding dilation groups padding_mode'.split()}
    params['bias'] = getattr(first_layer, 'bias') is not None
    params['in_channels'] = n_in
    new_layer = nn.Conv2d(**params)
    if pretrained:
        _load_pretrained_weights(new_layer, first_layer)
    setattr(parent, name, new_layer)
def cut_model(model, cut):
    "Cut an instantiated model"
    if   isinstance(cut, int): return nn.Sequential(*list(model.children())[:cut])
    elif callable(cut): return cut(model)
    raise NameError("cut must be either integer or a function")
def create_body(model, n_in=3, pretrained=True, cut=None):
    "Cut off the body of a typically pretrained `arch` as determined by `cut`"
    _update_first_layer(model, n_in, pretrained)
    if cut is None:
        ll = list(enumerate(model.children()))
        cut = next(i for i,o in reversed(ll) if has_pool_type(o))
    return cut_model(model, cut)

cut 可以是一个整数,在这种情况下,我们在相应的层切割模型;也可以是一个函数,在这种情况下,该函数返回 cut(model)。默认为第一个包含池化的层。

def tst(): return nn.Sequential(nn.Conv2d(3,5,3), nn.BatchNorm2d(5), nn.AvgPool2d(1), nn.Linear(3,4))
m = create_body(tst())
test_eq(len(m), 2)

m = create_body(tst(), cut=3)
test_eq(len(m), 3)

m = create_body(tst(), cut=noop)
test_eq(len(m), 4)

for n in range(1,5):    
    m = create_body(tst(), n_in=n)
    test_eq(_get_first_layer(m)[0].in_channels, n)

头部和模型

def create_head(nf, n_out, lin_ftrs=None, ps=0.5, pool=True, concat_pool=True, first_bn=True, bn_final=False,
                lin_first=False, y_range=None):
    "Model head that takes `nf` features, runs through `lin_ftrs`, and out `n_out` classes."
    if pool and concat_pool: nf *= 2
    lin_ftrs = [nf, 512, n_out] if lin_ftrs is None else [nf] + lin_ftrs + [n_out]
    bns = [first_bn] + [True]*len(lin_ftrs[1:])
    ps = L(ps)
    if len(ps) == 1: ps = [ps[0]/2] * (len(lin_ftrs)-2) + ps
    actns = [nn.ReLU(inplace=True)] * (len(lin_ftrs)-2) + [None]
    layers = []
    if pool:
        pool = AdaptiveConcatPool2d() if concat_pool else nn.AdaptiveAvgPool2d(1)
        layers += [pool, Flatten()]
    if lin_first: layers.append(nn.Dropout(ps.pop(0)))
    for ni,no,bn,p,actn in zip(lin_ftrs[:-1], lin_ftrs[1:], bns, ps, actns):
        layers += LinBnDrop(ni, no, bn=bn, p=p, act=actn, lin_first=lin_first)
    if lin_first: layers.append(nn.Linear(lin_ftrs[-2], n_out))
    if bn_final: layers.append(nn.BatchNorm1d(lin_ftrs[-1], momentum=0.01))
    if y_range is not None: layers.append(SigmoidRange(*y_range))
    return nn.Sequential(*layers)

头部首先使用 fastai 的 AdaptiveConcatPool2d,如果 concat_pool=True,否则使用传统的平均池化。接着使用 Flatten 层,然后进入 BatchNormDropoutLinear 层的块(如果 lin_first=True,则顺序为 LinearBatchNormDropout)。

这些块从 nf 开始,然后是 lin_ftrs 的每个元素(默认为 [512]),最后到达 n_outps 是一个概率列表,用于控制 dropout(如果只传入 1,将使用该值的一半,并根据需要多次使用该值)。

如果 first_bn=True,则在池化操作后添加一个 BatchNorm。如果 bn_final=True,则会添加一个最终的 BatchNorm 层。如果传入 y_range,则该函数会在该范围内添加一个 SigmoidRange

tst = create_head(5, 10)
tst
Sequential(
  (0): AdaptiveConcatPool2d(
    (ap): AdaptiveAvgPool2d(output_size=1)
    (mp): AdaptiveMaxPool2d(output_size=1)
  )
  (1): fastai.layers.Flatten(full=False)
  (2): BatchNorm1d(10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (3): Dropout(p=0.25, inplace=False)
  (4): Linear(in_features=10, out_features=512, bias=False)
  (5): ReLU(inplace=True)
  (6): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (7): Dropout(p=0.5, inplace=False)
  (8): Linear(in_features=512, out_features=10, bias=False)
)
mods = list(tst.children())
test_eq(len(mods), 9)
assert isinstance(mods[2], nn.BatchNorm1d)
assert isinstance(mods[-1], nn.Linear)

tst = create_head(5, 10, lin_first=True)
mods = list(tst.children())
test_eq(len(mods), 8)
assert isinstance(mods[2], nn.Dropout)

tst = create_head(5, 10, first_bn=False)
mods = list(tst.children())
test_eq(len(mods), 8)
assert isinstance(mods[2], nn.Dropout)

tst = create_head(5, 10, concat_pool=True)
modes = list(tst.children())
test_eq(modes[4].in_features, 10)

tst = create_head(5, 10, concat_pool=False)
modes = list(tst.children())
test_eq(modes[4].in_features, 5)
from fastai.callback.hook import num_features_model
#待办:重构,即类似以下内容?
# class ModelSplitter():
#     def __init__(self, idx): self.idx = idx
#     def split(self, m): return L(m[:self.idx], m[self.idx:]).map(params)
#     def __call__(self,): return {'cut':self.idx, 'split':self.split}
def default_split(m):
    "Default split of a model between body and head"
    return L(m[0], m[1:]).map(params)

要进行迁移学习,您需要将一个splitter传递给Learner。这应该是一个接受模型并返回参数组集合的函数,例如,一个参数列表的列表。

def _xresnet_split(m): return L(m[0][:3], m[0][3:], m[1:]).map(params)
def  _resnet_split(m): return L(m[0][:6], m[0][6:], m[1:]).map(params)
def _squeezenet_split(m:nn.Module): return L(m[0][0][:5], m[0][0][5:], m[1:]).map(params)
def _densenet_split(m:nn.Module): return L(m[0][0][:7],m[0][0][7:], m[1:]).map(params)
def _vgg_split(m:nn.Module): return L(m[0][0][:22], m[0][0][22:], m[1:]).map(params)
def _alexnet_split(m:nn.Module): return L(m[0][0][:6], m[0][0][6:], m[1:]).map(params)

_default_meta    = {'cut':None, 'split':default_split}
_xresnet_meta    = {'cut':-4, 'split':_xresnet_split, 'stats':imagenet_stats}
_resnet_meta     = {'cut':-2, 'split':_resnet_split, 'stats':imagenet_stats, 'weights':'DEFAULT'}
_squeezenet_meta = {'cut':-1, 'split': _squeezenet_split, 'stats':imagenet_stats, 'weights':'DEFAULT'}
_densenet_meta   = {'cut':-1, 'split':_densenet_split, 'stats':imagenet_stats, 'weights':'DEFAULT'}
_vgg_meta        = {'cut':-2, 'split':_vgg_split, 'stats':imagenet_stats, 'weights':'DEFAULT'}
_alexnet_meta    = {'cut':-2, 'split':_alexnet_split, 'stats':imagenet_stats, 'weights':'DEFAULT'}
model_meta = {
    models.xresnet.xresnet18 :{**_xresnet_meta}, models.xresnet.xresnet34: {**_xresnet_meta},
    models.xresnet.xresnet50 :{**_xresnet_meta}, models.xresnet.xresnet101:{**_xresnet_meta},
    models.xresnet.xresnet152:{**_xresnet_meta},

    models.resnet18 :{**_resnet_meta}, models.resnet34: {**_resnet_meta},
    models.resnet50 :{**_resnet_meta}, models.resnet101:{**_resnet_meta},
    models.resnet152:{**_resnet_meta},

    models.squeezenet1_0:{**_squeezenet_meta},
    models.squeezenet1_1:{**_squeezenet_meta},

    models.densenet121:{**_densenet_meta}, models.densenet169:{**_densenet_meta},
    models.densenet201:{**_densenet_meta}, models.densenet161:{**_densenet_meta},
    models.vgg11_bn:{**_vgg_meta}, models.vgg13_bn:{**_vgg_meta}, models.vgg16_bn:{**_vgg_meta}, models.vgg19_bn:{**_vgg_meta},
    models.alexnet:{**_alexnet_meta}}
def add_head(body, nf, n_out, init=nn.init.kaiming_normal_, head=None, concat_pool=True, pool=True,
                lin_ftrs=None, ps=0.5, first_bn=True, bn_final=False, lin_first=False, y_range=None):
    "Add a head to a vision body"
    if head is None:
        head = create_head(nf, n_out, concat_pool=concat_pool, pool=pool,
                           lin_ftrs=lin_ftrs, ps=ps, first_bn=first_bn, bn_final=bn_final, lin_first=lin_first, y_range=y_range)
    model = nn.Sequential(body, head)
    if init is not None: apply_init(model[1], init)
    return model
def create_vision_model(arch, n_out, pretrained=True, weights=None, cut=None, n_in=3, init=nn.init.kaiming_normal_, custom_head=None,
                        concat_pool=True, pool=True, lin_ftrs=None, ps=0.5, first_bn=True, bn_final=False, lin_first=False, y_range=None):
    "Create custom vision architecture"
    meta = model_meta.get(arch, _default_meta)
    if parse(torchvision.__version__) >= parse('0.13') and 'weights' in meta:
        if weights is not None and not pretrained:
            warn(f'{pretrained=} but `weights` are set {weights=}. To randomly initialize set `pretrained=False` & `weights=None`')
        model = arch(weights=meta['weights'] if (weights is None and pretrained) else weights)
    else:
        model = arch(pretrained=pretrained)
    body = create_body(model, n_in, pretrained, ifnone(cut, meta['cut']))
    nf = num_features_model(nn.Sequential(*body.children())) if custom_head is None else None
    return add_head(body, nf, n_out, init=init, head=custom_head, concat_pool=concat_pool, pool=pool,
                    lin_ftrs=lin_ftrs, ps=ps, first_bn=first_bn, bn_final=bn_final, lin_first=lin_first, y_range=y_range)
show_doc(create_vision_model)

source

create_vision_model

 create_vision_model (arch, n_out, pretrained=True, weights=None,
                      cut=None, n_in=3, init=<function kaiming_normal_>,
                      custom_head=None, concat_pool=True, pool=True,
                      lin_ftrs=None, ps=0.5, first_bn=True,
                      bn_final=False, lin_first=False, y_range=None)

Create custom vision architecture

模型是根据 cut 进行切割的,可能是 pretrained,在这种情况下,会下载并加载适当的权重。init 应用于模型的头部,该头部要么是通过 create_head 创建(使用 lin_ftrspsconcat_poolbn_finallin_firsty_range),要么是 custom_head

tst = create_vision_model(models.resnet18, 10, True)
tst = create_vision_model(models.resnet18, 10, True, n_in=1)
class TimmBody(nn.Module):
    def __init__(self, model, pretrained:bool=True, cut=None, n_in:int=3):
        super().__init__()
        self.needs_pool = model.default_cfg.get('pool_size', None) is not None
        self.model = model if cut is None else cut_model(model, cut)
    
    def forward(self,x): return self.model.forward_features(x) if self.needs_pool else self.model(x)
def create_timm_model(arch, n_out, cut=None, pretrained=True, n_in=3, init=nn.init.kaiming_normal_, custom_head=None,
                     concat_pool=True, pool=True, lin_ftrs=None, ps=0.5, first_bn=True, bn_final=False, lin_first=False, y_range=None, **kwargs):
    "Create custom architecture using `arch`, `n_in` and `n_out` from the `timm` library"
    model = timm.create_model(arch, pretrained=pretrained, num_classes=0, in_chans=n_in, **kwargs)
    body = TimmBody(model, pretrained, None, n_in)
    nf = body.model.num_features
    res = add_head(body, nf, n_out, init=init, head=custom_head, concat_pool=concat_pool, pool=body.needs_pool,
                   lin_ftrs=lin_ftrs, ps=ps, first_bn=first_bn, bn_final=bn_final, lin_first=lin_first, y_range=y_range)
    return res,model.default_cfg
# 确保timm模型可以被脚本化:
tst, _ = create_timm_model('resnet34', 1)
scripted = torch.jit.script(tst)
assert scripted, "model could not be converted to TorchScript"

Learner 便利函数

def _add_norm(dls, meta, pretrained, n_in=3):
    if not pretrained: return
    stats = meta.get('stats')
    if stats is None: return
    if n_in != len(stats[0]): return
    if not dls.after_batch.fs.filter(risinstance(Normalize)):
        dls.add_tfms([Normalize.from_stats(*stats)],'after_batch')
path = untar_data(URLs.PETS)
dls = ImageDataLoaders.from_name_re(path, get_image_files(path/"images"), r'^(.*)_\d+.jpg$', item_tfms=Resize(224))
for _ in range(5): _add_norm(dls, model_meta[models.resnet34], True)
test_eq(len(dls.after_batch.fs), 2)
def _timm_norm(dls, cfg, pretrained, n_in=3):
    if not pretrained: return
    if n_in != len(cfg['mean']): return
    if not dls.after_batch.fs.filter(risinstance(Normalize)):
        tfm = Normalize.from_stats(cfg['mean'],cfg['std'])
        dls.add_tfms([tfm],'after_batch')
@delegates(create_vision_model)
def vision_learner(dls, arch, normalize=True, n_out=None, pretrained=True, weights=None,
        # 学习者参数
        loss_func=None, opt_func=Adam, lr=defaults.lr, splitter=None, cbs=None, metrics=None, path=None,
        model_dir='models', wd=None, wd_bn_bias=False, train_bn=True, moms=(0.95,0.85,0.95),
        # 模型与头部参数
        cut=None, init=nn.init.kaiming_normal_, custom_head=None, concat_pool=True, pool=True,
        lin_ftrs=None, ps=0.5, first_bn=True, bn_final=False, lin_first=False, y_range=None, **kwargs):
    "Build a vision learner from `dls` and `arch`"
    if n_out is None: n_out = get_c(dls)
    assert n_out, "`n_out` is not defined, and could not be inferred from data, set `dls.c` or pass `n_out`"
    meta = model_meta.get(arch, _default_meta)
    model_args = dict(init=init, custom_head=custom_head, concat_pool=concat_pool, pool=pool, lin_ftrs=lin_ftrs, ps=ps,
                      first_bn=first_bn, bn_final=bn_final, lin_first=lin_first, y_range=y_range, **kwargs)
    n_in = kwargs['n_in'] if 'n_in' in kwargs else 3
    if isinstance(arch, str):
        model,cfg = create_timm_model(arch, n_out, default_split, pretrained, **model_args)
        if normalize: _timm_norm(dls, cfg, pretrained, n_in)
    else:
        if normalize: _add_norm(dls, meta, pretrained, n_in)
        model = create_vision_model(arch, n_out, pretrained=pretrained, weights=weights, **model_args)

    splitter = ifnone(splitter, meta['split'])
    learn = Learner(dls=dls, model=model, loss_func=loss_func, opt_func=opt_func, lr=lr, splitter=splitter, cbs=cbs,
                   metrics=metrics, path=path, model_dir=model_dir, wd=wd, wd_bn_bias=wd_bn_bias, train_bn=train_bn, moms=moms)
    if pretrained: learn.freeze()
    # 跟踪日志记录器的参数
    store_attr('arch,normalize,n_out,pretrained', self=learn, **kwargs)
    return learn

该模型是基于arch构建的,尽可能使用从dls推断出的最终激活数(如果不行,则传递一个值给n_out)。它可能是pretrained,并且架构使用模型架构的默认元数据进行切割和分割(通过传递一个cutsplitter可以自定义这一点)。

如果normalizepretrained都为True,这个函数将使用预训练模型的统计数据向dls中添加一个Normalization变换(如果尚未添加的话)。这样,在迁移学习中,您不会忘记对数据进行标准化。

所有其他参数都将传递给Learner

从版本0.13开始,TorchVision支持多个预训练权重用于相同的模型架构。vision_learner的默认值为pretrained=True, weights=None,将使用架构的默认权重,目前是IMAGENET1K_V2。如果您使用的是旧版TorchVision或创建一个timm模型,设置weights将没有效果。

from torchvision.models import ResNet50_Weights

# 旧版权重,准确率76.130%
vision_learner(models.resnet50, pretrained=True, weights=ResNet50_Weights.IMAGENET1K_V1, ...)

# 新版权重,准确率80.858%。字符串也被支持。
vision_learner(models.resnet50, pretrained=True, weights='IMAGENET1K_V2', ...)

# 最好的可用权重(目前是IMAGENET1K_V2的别名)。
# 如果vision_learner权重未设置,则使用默认权重。
vision_learner(models.resnet50, pretrained=True, weights=ResNet50_Weights.DEFAULT, ...)

# 没有权重 - 随机初始化
vision_learner(models.resnet50, pretrained=False, weights=None, ...)

上述示例展示了如何使用新的TorchVision 0.13多权重API与vision_learner

path = untar_data(URLs.PETS)
fnames = get_image_files(path/"images")
pat = r'^(.*)_\d+.jpg$'
dls = ImageDataLoaders.from_name_re(path, fnames, pat, item_tfms=Resize(224))
learn = vision_learner(dls, models.resnet18, loss_func=CrossEntropyLossFlat(), ps=0.25)
if parse(torchvision.__version__) >= parse('0.13'):
    from torchvision.models import ResNet34_Weights
    weights = ResNet34_Weights.IMAGENET1K_V1
else:
    weights = None
learn = vision_learner(dls, models.resnet34, weights=weights, loss_func=CrossEntropyLossFlat(), ps=0.25, concat_pool=False)
test_ne(learn.cbs, None)
test_eq(to_cpu(dls.after_batch[1].mean[0].squeeze()), tensor(imagenet_stats[0]))
test_eq(to_cpu(dls.valid.after_batch[1].mean[0].squeeze()), tensor(imagenet_stats[0]))

如果你将一个 str 传递给 arch,那么一个 timm 模型将被创建:

dls = ImageDataLoaders.from_name_re(path, fnames, pat, item_tfms=Resize(224))
learn = vision_learner(dls, 'convnext_tiny', loss_func=CrossEntropyLossFlat(), ps=0.25)
@delegates(models.unet.DynamicUnet.__init__)
def create_unet_model(arch, n_out, img_size, pretrained=True, weights=None, cut=None, n_in=3, **kwargs):
    "Create custom unet architecture"
    meta = model_meta.get(arch, _default_meta)
    if parse(torchvision.__version__) >= parse('0.13') and 'weights' in meta:
        if weights is not None and not pretrained:
            warn(f'{pretrained=} but `weights` are set {weights=}. To randomly initialize set `pretrained=False` & `weights=None`')
        model = arch(weights=meta['weights'] if (weights is None and pretrained) else weights)
    else:
        model = arch(pretrained=pretrained)
    body = create_body(model, n_in, pretrained, ifnone(cut, meta['cut']))
    model = models.unet.DynamicUnet(body, n_out, img_size, **kwargs)
    return model
show_doc(create_unet_model)

source

create_unet_model

 create_unet_model (arch, n_out, img_size, pretrained=True, weights=None,
                    cut=None, n_in=3, blur=False, blur_final=True,
                    self_attention=False, y_range=None, last_cross=True,
                    bottle=False, act_cls=<class
                    'torch.nn.modules.activation.ReLU'>, init=<function
                    kaiming_normal_>, norm_type=None)

Create custom unet architecture

tst = create_unet_model(models.resnet18, 10, (24,24), True, n_in=1)
@delegates(create_unet_model)
def unet_learner(dls, arch, normalize=True, n_out=None, pretrained=True, weights=None, config=None,
                 # 学习者参数
                 loss_func=None, opt_func=Adam, lr=defaults.lr, splitter=None, cbs=None, metrics=None, path=None,
                 model_dir='models', wd=None, wd_bn_bias=False, train_bn=True, moms=(0.95,0.85,0.95), **kwargs):
    "Build a unet learner from `dls` and `arch`"

    if config:
        warnings.warn('config param is deprecated. Pass your args directly to unet_learner.')
        kwargs = {**config, **kwargs}

    meta = model_meta.get(arch, _default_meta)
    n_in = kwargs['n_in'] if 'n_in' in kwargs else 3
    if normalize: _add_norm(dls, meta, pretrained, n_in)

    n_out = ifnone(n_out, get_c(dls))
    assert n_out, "`n_out` is not defined, and could not be inferred from data, set `dls.c` or pass `n_out`"
    img_size = dls.one_batch()[0].shape[-2:]
    assert img_size, "image size could not be inferred from data"
    model = create_unet_model(arch, n_out, img_size, pretrained=pretrained, weights=weights, **kwargs)

    splitter = ifnone(splitter, meta['split'])
    learn = Learner(dls=dls, model=model, loss_func=loss_func, opt_func=opt_func, lr=lr, splitter=splitter, cbs=cbs,
                   metrics=metrics, path=path, model_dir=model_dir, wd=wd, wd_bn_bias=wd_bn_bias, train_bn=train_bn,
                   moms=moms)
    if pretrained: learn.freeze()
    # 跟踪日志记录器的参数
    store_attr('arch,normalize,n_out,pretrained', self=learn, **kwargs)
    return learn

该模型是从arch构建的,使用从dls推断出的最终过滤器数量(如果可能的话,否则传递一个值给n_out)。它可能是pretrained,并且架构是使用模型架构的默认元数据进行了切割和拆分(这可以通过传递cutsplitter来自定义)。

如果normalizepretrained都为True,则此函数将使用预训练模型的统计信息向dls添加一个Normalization转换(如果还没有的话)。这样,您在迁移学习中就不会忘记对数据进行归一化。

所有其他参数都将传递给Learner

unet_learner 还通过weights支持TorchVision的新多权重API。有关更多详细信息,请参见vision_learner

path = untar_data(URLs.CAMVID_TINY)
fnames = get_image_files(path/'images')
def label_func(x): return path/'labels'/f'{x.stem}_P{x.suffix}'
codes = np.loadtxt(path/'codes.txt', dtype=str)
dls = SegmentationDataLoaders.from_label_func(path, fnames, label_func, codes=codes)
learn = unet_learner(dls, models.resnet34, loss_func=CrossEntropyLossFlat(axis=1), y_range=(0,1))
test_ne(learn.cbs, None)
def create_cnn_model(*args, **kwargs):
    "Deprecated name for `create_vision_model` -- do not use"
    warn("`create_cnn_model` has been renamed to `create_vision_model` -- please update your code")
    return create_vision_model(*args, **kwargs)
def cnn_learner(*args, **kwargs):
    "Deprecated name for `vision_learner` -- do not use"
    warn("`cnn_learner` has been renamed to `vision_learner` -- please update your code")
    return vision_learner(*args, **kwargs)

显示函数 -

@typedispatch
def show_results(x:TensorImage, y, samples, outs, ctxs=None, max_n=10, nrows=None, ncols=None, figsize=None, **kwargs):
    if ctxs is None: ctxs = get_grid(min(len(samples), max_n), nrows=nrows, ncols=ncols, figsize=figsize)
    ctxs = show_results[object](x, y, samples, outs, ctxs=ctxs, max_n=max_n, **kwargs)
    return ctxs
@typedispatch
def show_results(x:TensorImage, y:TensorCategory, samples, outs, ctxs=None, max_n=10, nrows=None, ncols=None, figsize=None, **kwargs):
    if ctxs is None: ctxs = get_grid(min(len(samples), max_n), nrows=nrows, ncols=ncols, figsize=figsize)
    for i in range(2):
        ctxs = [b.show(ctx=c, **kwargs) for b,c,_ in zip(samples.itemgot(i),ctxs,range(max_n))]
    ctxs = [r.show(ctx=c, color='green' if b==r else 'red', **kwargs)
            for b,r,c,_ in zip(samples.itemgot(1),outs.itemgot(0),ctxs,range(max_n))]
    return ctxs
@typedispatch
def show_results(x:TensorImage, y:TensorMask|TensorPoint|TensorBBox, samples, outs, ctxs=None, max_n=6,
                 nrows=None, ncols=1, figsize=None, **kwargs):
    if ctxs is None: ctxs = get_grid(min(len(samples), max_n), nrows=nrows, ncols=ncols, figsize=figsize, double=True,
                                     title='Target/Prediction')
    for i in range(2):
        ctxs[::2] = [b.show(ctx=c, **kwargs) for b,c,_ in zip(samples.itemgot(i),ctxs[::2],range(2*max_n))]
    for o in [samples,outs]:
        ctxs[1::2] = [b.show(ctx=c, **kwargs) for b,c,_ in zip(o.itemgot(0),ctxs[1::2],range(2*max_n))]
    return ctxs
@typedispatch
def show_results(x:TensorImage, y:TensorImage, samples, outs, ctxs=None, max_n=10, figsize=None, **kwargs):
    if ctxs is None: ctxs = get_grid(3*min(len(samples), max_n), ncols=3, figsize=figsize, title='Input/Target/Prediction')
    for i in range(2):
        ctxs[i::3] = [b.show(ctx=c, **kwargs) for b,c,_ in zip(samples.itemgot(i),ctxs[i::3],range(max_n))]
    ctxs[2::3] = [b.show(ctx=c, **kwargs) for b,c,_ in zip(outs.itemgot(0),ctxs[2::3],range(max_n))]
    return ctxs
@typedispatch
def plot_top_losses(x: TensorImage, y:TensorCategory, samples, outs, raws, losses, nrows=None, ncols=None, figsize=None, **kwargs):
    axs = get_grid(len(samples), nrows=nrows, ncols=ncols, figsize=figsize, title='Prediction/Actual/Loss/Probability')
    for ax,s,o,r,l in zip(axs, samples, outs, raws, losses):
        s[0].show(ctx=ax, **kwargs)
        ax.set_title(f'{o[0]}/{s[1]} / {l.item():.2f} / {r.max().item():.2f}')
@typedispatch
def plot_top_losses(x: TensorImage, y:TensorMultiCategory, samples, outs, raws, losses, nrows=None, ncols=None, figsize=None, **kwargs):
    axs = get_grid(len(samples), nrows=nrows, ncols=ncols, figsize=figsize)
    for i,(ax,s) in enumerate(zip(axs, samples)): s[0].show(ctx=ax, title=f'Image {i}', **kwargs)
    rows = get_empty_df(len(samples))
    outs = L(s[1:] + o + (TitledStr(r), TitledFloat(l.item())) for s,o,r,l in zip(samples, outs, raws, losses))
    for i,l in enumerate(["target", "predicted", "probabilities", "loss"]):
        rows = [b.show(ctx=r, label=l, **kwargs) for b,r in zip(outs.itemgot(i),rows)]
    display_df(pd.DataFrame(rows))
@typedispatch
def plot_top_losses(x:TensorImage, y:TensorMask, samples, outs, raws, losses, nrows=None, ncols=None, figsize=None, **kwargs):
    axes = get_grid(len(samples)*3, nrows=len(samples), ncols=3, figsize=figsize, flatten=False, title="Input | Target | Prediction")
    if axes.ndim == 1: axes = (axes,)
    titles = ["input", "target", "pred"]
    for axs,s,o,l in zip(axes, samples, outs, losses):
        imgs = (s[0], s[1], o[0])
        for ax,im,title in zip(axs, imgs, titles):
            if title=="pred": title += f"; loss = {l.item():.4f}"
            im.show(ctx=ax, **kwargs)
            ax.set_title(title)

导出 -

from nbdev import nbdev_export
nbdev_export()