Skip to content

公共模块

嵌入

Bases: Module

允许类别特征中的不同值拥有不同的嵌入.

Source code in src/pytorch_tabular/models/common/layers/embeddings.py
class Embedding1dLayer(nn.Module):
    """允许类别特征中的不同值拥有不同的嵌入."""

    def __init__(
        self,
        continuous_dim: int,
        categorical_embedding_dims: Tuple[int, int],
        embedding_dropout: float = 0.0,
        batch_norm_continuous_input: bool = False,
        virtual_batch_size: Optional[int] = None,
    ):
        super().__init__()
        self.continuous_dim = continuous_dim
        self.categorical_embedding_dims = categorical_embedding_dims
        self.batch_norm_continuous_input = batch_norm_continuous_input

        # Embedding layers
        self.cat_embedding_layers = nn.ModuleList([nn.Embedding(x, y) for x, y in categorical_embedding_dims])
        if embedding_dropout > 0:
            self.embd_dropout = nn.Dropout(embedding_dropout)
        else:
            self.embd_dropout = None
        # Continuous Layers
        if batch_norm_continuous_input:
            self.normalizing_batch_norm = BatchNorm1d(continuous_dim, virtual_batch_size)

    def forward(self, x: Dict[str, Any]) -> torch.Tensor:
        assert "continuous" in x or "categorical" in x, "x must contain either continuous and categorical features"
        # (B, N)
        continuous_data, categorical_data = (
            x.get("continuous", torch.empty(0, 0)),
            x.get("categorical", torch.empty(0, 0)),
        )
        assert categorical_data.shape[1] == len(
            self.cat_embedding_layers
        ), "categorical_data must have same number of columns as categorical embedding layers"
        assert (
            continuous_data.shape[1] == self.continuous_dim
        ), "continuous_data must have same number of columns as continuous dim"
        embed = None
        if continuous_data.shape[1] > 0:
            if self.batch_norm_continuous_input:
                embed = self.normalizing_batch_norm(continuous_data)
            else:
                embed = continuous_data
            # (B, N, C)
        if categorical_data.shape[1] > 0:
            categorical_embed = torch.cat(
                [
                    embedding_layer(categorical_data[:, i])
                    for i, embedding_layer in enumerate(self.cat_embedding_layers)
                ],
                dim=1,
            )
            # (B, N, C + C)
            if embed is None:
                embed = categorical_embed
            else:
                embed = torch.cat([embed, categorical_embed], dim=1)
        if self.embd_dropout is not None:
            embed = self.embd_dropout(embed)
        return embed

Bases: Module

将分类和连续特征嵌入到一个二维张量中.

Source code in src/pytorch_tabular/models/common/layers/embeddings.py
class Embedding2dLayer(nn.Module):
    """将分类和连续特征嵌入到一个二维张量中."""

    def __init__(
        self,
        continuous_dim: int,
        categorical_cardinality: List[int],
        embedding_dim: int,
        shared_embedding_strategy: Optional[str] = None,
        frac_shared_embed: float = 0.25,
        embedding_bias: bool = False,
        batch_norm_continuous_input: bool = False,
        virtual_batch_size: Optional[int] = None,
        embedding_dropout: float = 0.0,
        initialization: Optional[str] = None,
    ):
        """    Args:
    continuous_dim: 连续特征的数量
    categorical_cardinality: 分类特征的基数列表
    embedding_dim: 嵌入维度
    shared_embedding_strategy: 共享嵌入的策略
    frac_shared_embed: 共享嵌入的比例
    embedding_bias: 嵌入层是否使用偏置
    batch_norm_continuous_input: 是否对连续特征使用批量归一化
    embedding_dropout: 应用于嵌入的丢弃率
    initialization: 嵌入层的初始化策略
"""
        super().__init__()
        self.continuous_dim = continuous_dim
        self.categorical_cardinality = categorical_cardinality
        self.embedding_dim = embedding_dim
        self.batch_norm_continuous_input = batch_norm_continuous_input
        self.shared_embedding_strategy = shared_embedding_strategy
        self.frac_shared_embed = frac_shared_embed
        self.embedding_bias = embedding_bias
        self.initialization = initialization
        d_sqrt_inv = 1 / math.sqrt(embedding_dim)
        if initialization is not None:
            assert initialization in [
                "kaiming_uniform",
                "kaiming_normal",
            ], "initialization should be either of `kaiming` or `uniform`"
            self._do_kaiming_initialization = True
            self._initialize_kaiming = partial(
                _initialize_kaiming,
                initialization=initialization,
                d_sqrt_inv=d_sqrt_inv,
            )
        else:
            self._do_kaiming_initialization = False

        # cat Embedding layers
        if self.shared_embedding_strategy is not None:
            self.cat_embedding_layers = nn.ModuleList(
                [
                    SharedEmbeddings(
                        c,
                        self.embedding_dim,
                        add_shared_embed=(self.shared_embedding_strategy == "add"),
                        frac_shared_embed=self.frac_shared_embed,
                    )
                    for c in categorical_cardinality
                ]
            )
            if self._do_kaiming_initialization:
                for embedding_layer in self.cat_embedding_layers:
                    self._initialize_kaiming(embedding_layer.embed.weight)
                    self._initialize_kaiming(embedding_layer.shared_embed)
        else:
            self.cat_embedding_layers = nn.ModuleList(
                [nn.Embedding(c, self.embedding_dim) for c in categorical_cardinality]
            )
            if self._do_kaiming_initialization:
                for embedding_layer in self.cat_embedding_layers:
                    self._initialize_kaiming(embedding_layer.weight)
        if embedding_bias:
            self.cat_embedding_bias = nn.Parameter(torch.Tensor(len(self.categorical_cardinality), self.embedding_dim))
            if self._do_kaiming_initialization:
                self._initialize_kaiming(self.cat_embedding_bias)
        # Continuous Embedding Layer
        self.cont_embedding_layer = nn.Embedding(self.continuous_dim, self.embedding_dim)
        if self._do_kaiming_initialization:
            self._initialize_kaiming(self.cont_embedding_layer.weight)
        if embedding_bias:
            self.cont_embedding_bias = nn.Parameter(torch.Tensor(self.continuous_dim, self.embedding_dim))
            if self._do_kaiming_initialization:
                self._initialize_kaiming(self.cont_embedding_bias)
        if batch_norm_continuous_input:
            self.normalizing_batch_norm = BatchNorm1d(continuous_dim, virtual_batch_size)
        if embedding_dropout > 0:
            self.embd_dropout = nn.Dropout(embedding_dropout)
        else:
            self.embd_dropout = None

    def forward(self, x: Dict[str, Any]) -> torch.Tensor:
        assert "continuous" in x or "categorical" in x, "x must contain either continuous and categorical features"
        # (B, N)
        continuous_data, categorical_data = (
            x.get("continuous", torch.empty(0, 0)),
            x.get("categorical", torch.empty(0, 0)),
        )
        assert categorical_data.shape[1] == len(
            self.cat_embedding_layers
        ), "categorical_data must have same number of columns as categorical embedding layers"
        assert (
            continuous_data.shape[1] == self.continuous_dim
        ), "continuous_data must have same number of columns as continuous dim"
        embed = None
        if continuous_data.shape[1] > 0:
            cont_idx = torch.arange(self.continuous_dim, device=continuous_data.device).expand(
                continuous_data.size(0), -1
            )
            if self.batch_norm_continuous_input:
                continuous_data = self.normalizing_batch_norm(continuous_data)
            embed = torch.mul(
                continuous_data.unsqueeze(2),
                self.cont_embedding_layer(cont_idx),
            )
            if self.embedding_bias:
                embed += self.cont_embedding_bias
            # (B, N, C)
        if categorical_data.shape[1] > 0:
            categorical_embed = torch.cat(
                [
                    embedding_layer(categorical_data[:, i]).unsqueeze(1)
                    for i, embedding_layer in enumerate(self.cat_embedding_layers)
                ],
                dim=1,
            )
            if self.embedding_bias:
                categorical_embed += self.cat_embedding_bias
            # (B, N, C + C)
            if embed is None:
                embed = categorical_embed
            else:
                embed = torch.cat([embed, categorical_embed], dim=1)
        if self.embd_dropout is not None:
            embed = self.embd_dropout(embed)
        return embed

__init__(continuous_dim, categorical_cardinality, embedding_dim, shared_embedding_strategy=None, frac_shared_embed=0.25, embedding_bias=False, batch_norm_continuous_input=False, virtual_batch_size=None, embedding_dropout=0.0, initialization=None)

Args: continuous_dim: 连续特征的数量 categorical_cardinality: 分类特征的基数列表 embedding_dim: 嵌入维度 shared_embedding_strategy: 共享嵌入的策略 frac_shared_embed: 共享嵌入的比例 embedding_bias: 嵌入层是否使用偏置 batch_norm_continuous_input: 是否对连续特征使用批量归一化 embedding_dropout: 应用于嵌入的丢弃率 initialization: 嵌入层的初始化策略

Source code in src/pytorch_tabular/models/common/layers/embeddings.py
    def __init__(
        self,
        continuous_dim: int,
        categorical_cardinality: List[int],
        embedding_dim: int,
        shared_embedding_strategy: Optional[str] = None,
        frac_shared_embed: float = 0.25,
        embedding_bias: bool = False,
        batch_norm_continuous_input: bool = False,
        virtual_batch_size: Optional[int] = None,
        embedding_dropout: float = 0.0,
        initialization: Optional[str] = None,
    ):
        """    Args:
    continuous_dim: 连续特征的数量
    categorical_cardinality: 分类特征的基数列表
    embedding_dim: 嵌入维度
    shared_embedding_strategy: 共享嵌入的策略
    frac_shared_embed: 共享嵌入的比例
    embedding_bias: 嵌入层是否使用偏置
    batch_norm_continuous_input: 是否对连续特征使用批量归一化
    embedding_dropout: 应用于嵌入的丢弃率
    initialization: 嵌入层的初始化策略
"""
        super().__init__()
        self.continuous_dim = continuous_dim
        self.categorical_cardinality = categorical_cardinality
        self.embedding_dim = embedding_dim
        self.batch_norm_continuous_input = batch_norm_continuous_input
        self.shared_embedding_strategy = shared_embedding_strategy
        self.frac_shared_embed = frac_shared_embed
        self.embedding_bias = embedding_bias
        self.initialization = initialization
        d_sqrt_inv = 1 / math.sqrt(embedding_dim)
        if initialization is not None:
            assert initialization in [
                "kaiming_uniform",
                "kaiming_normal",
            ], "initialization should be either of `kaiming` or `uniform`"
            self._do_kaiming_initialization = True
            self._initialize_kaiming = partial(
                _initialize_kaiming,
                initialization=initialization,
                d_sqrt_inv=d_sqrt_inv,
            )
        else:
            self._do_kaiming_initialization = False

        # cat Embedding layers
        if self.shared_embedding_strategy is not None:
            self.cat_embedding_layers = nn.ModuleList(
                [
                    SharedEmbeddings(
                        c,
                        self.embedding_dim,
                        add_shared_embed=(self.shared_embedding_strategy == "add"),
                        frac_shared_embed=self.frac_shared_embed,
                    )
                    for c in categorical_cardinality
                ]
            )
            if self._do_kaiming_initialization:
                for embedding_layer in self.cat_embedding_layers:
                    self._initialize_kaiming(embedding_layer.embed.weight)
                    self._initialize_kaiming(embedding_layer.shared_embed)
        else:
            self.cat_embedding_layers = nn.ModuleList(
                [nn.Embedding(c, self.embedding_dim) for c in categorical_cardinality]
            )
            if self._do_kaiming_initialization:
                for embedding_layer in self.cat_embedding_layers:
                    self._initialize_kaiming(embedding_layer.weight)
        if embedding_bias:
            self.cat_embedding_bias = nn.Parameter(torch.Tensor(len(self.categorical_cardinality), self.embedding_dim))
            if self._do_kaiming_initialization:
                self._initialize_kaiming(self.cat_embedding_bias)
        # Continuous Embedding Layer
        self.cont_embedding_layer = nn.Embedding(self.continuous_dim, self.embedding_dim)
        if self._do_kaiming_initialization:
            self._initialize_kaiming(self.cont_embedding_layer.weight)
        if embedding_bias:
            self.cont_embedding_bias = nn.Parameter(torch.Tensor(self.continuous_dim, self.embedding_dim))
            if self._do_kaiming_initialization:
                self._initialize_kaiming(self.cont_embedding_bias)
        if batch_norm_continuous_input:
            self.normalizing_batch_norm = BatchNorm1d(continuous_dim, virtual_batch_size)
        if embedding_dropout > 0:
            self.embd_dropout = nn.Dropout(embedding_dropout)
        else:
            self.embd_dropout = None

Bases: Module

接受预先编码的分类变量,并仅与连续变量连接.没有可学习的组件.

Source code in src/pytorch_tabular/models/common/layers/embeddings.py
class PreEncoded1dLayer(nn.Module):
    """接受预先编码的分类变量,并仅与连续变量连接.没有可学习的组件."""

    def __init__(
        self,
        continuous_dim: int,
        categorical_dim: Tuple[int, int],
        embedding_dropout: float = 0.0,
        batch_norm_continuous_input: bool = False,
        virtual_batch_size: Optional[int] = None,
    ):
        super().__init__()
        self.continuous_dim = continuous_dim
        self.categorical_dim = categorical_dim
        self.batch_norm_continuous_input = batch_norm_continuous_input

        if embedding_dropout > 0:
            self.embd_dropout = nn.Dropout(embedding_dropout)
        else:
            self.embd_dropout = None
        # Continuous Layers
        if batch_norm_continuous_input:
            self.normalizing_batch_norm = BatchNorm1d(continuous_dim, virtual_batch_size)

    def forward(self, x: Dict[str, Any]) -> torch.Tensor:
        assert "continuous" in x or "categorical" in x, "x must contain either continuous and categorical features"
        # (B, N)
        continuous_data, categorical_data = (
            x.get("continuous", torch.empty(0, 0)),
            x.get("categorical", torch.empty(0, 0)),
        )
        assert (
            categorical_data.shape[1] == self.categorical_dim
        ), "categorical_data must have same number of columns as categorical embedding layers"
        assert (
            continuous_data.shape[1] == self.continuous_dim
        ), "continuous_data must have same number of columns as continuous dim"
        embed = None
        if continuous_data.shape[1] > 0:
            if self.batch_norm_continuous_input:
                embed = self.normalizing_batch_norm(continuous_data)
            else:
                embed = continuous_data
            # (B, N, C)
        if categorical_data.shape[1] > 0:
            # (B, N, C)
            if embed is None:
                embed = categorical_data
            else:
                embed = torch.cat([embed, categorical_data], dim=1)
        if self.embd_dropout is not None:
            embed = self.embd_dropout(embed)
        return embed

Bases: Module

使分类特征中的不同值能够共享一些嵌入.

Source code in src/pytorch_tabular/models/common/layers/embeddings.py
class SharedEmbeddings(nn.Module):
    """使分类特征中的不同值能够共享一些嵌入."""

    def __init__(
        self,
        num_embed: int,
        embed_dim: int,
        add_shared_embed: bool = False,
        frac_shared_embed: float = 0.25,
    ):
        super().__init__()
        assert frac_shared_embed < 1, "'frac_shared_embed' must be less than 1"

        self.add_shared_embed = add_shared_embed
        self.embed = nn.Embedding(num_embed, embed_dim, padding_idx=0)
        self.embed.weight.data.clamp_(-2, 2)
        if add_shared_embed:
            col_embed_dim = embed_dim
        else:
            col_embed_dim = int(embed_dim * frac_shared_embed)
        self.shared_embed = nn.Parameter(torch.empty(1, col_embed_dim).uniform_(-1, 1))

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        out = self.embed(X)
        shared_embed = self.shared_embed.expand(out.shape[0], -1)
        if self.add_shared_embed:
            out += shared_embed
        else:
            out[:, : shared_embed.shape[1]] = shared_embed
        return out

    @property
    def weight(self):
        w = self.embed.weight.detach()
        if self.add_shared_embed:
            w += self.shared_embed
        else:
            w[:, : self.shared_embed.shape[1]] = self.shared_embed
        return w

门控单元

Bases: Module

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
class GatedFeatureLearningUnit(nn.Module):
    def __init__(
        self,
        n_features_in: int,
        n_stages: int,
        feature_mask_function: Callable = entmax15,
        feature_sparsity: float = 0.3,
        learnable_sparsity: bool = True,
        dropout: float = 0.0,
    ):
        super().__init__()
        self.n_features_in = n_features_in
        self.n_features_out = n_features_in
        self.feature_mask_function = feature_mask_function
        self._dropout = dropout
        self.n_stages = n_stages
        self.feature_sparsity = feature_sparsity
        self.learnable_sparsity = learnable_sparsity
        self._build_network()

    def _create_feature_mask(self):
        feature_masks = torch.cat(
            [
                torch.distributions.Beta(
                    torch.tensor([random.uniform(0.5, 10.0)]),
                    torch.tensor([random.uniform(0.5, 10.0)]),
                )
                .sample((self.n_features_in,))
                .squeeze(-1)
                for _ in range(self.n_stages)
            ]
        ).reshape(self.n_stages, self.n_features_in)
        return nn.Parameter(
            feature_masks,
            requires_grad=True,
        )

    def _build_network(self):
        self.W_in = nn.ModuleList(
            [nn.Linear(2 * self.n_features_in, 2 * self.n_features_in) for _ in range(self.n_stages)]
        )
        self.W_out = nn.ModuleList(
            [nn.Linear(2 * self.n_features_in, self.n_features_in) for _ in range(self.n_stages)]
        )

        self.feature_masks = self._create_feature_mask()
        if self.feature_mask_function.__name__ == "t_softmax":
            t = RSoftmax.calculate_t(self.feature_masks, r=torch.tensor([self.feature_sparsity]), dim=-1)
            self.t = nn.Parameter(t, requires_grad=self.learnable_sparsity)
        self.dropout = nn.Dropout(self._dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        h = x
        t = torch.relu(self.t) if self.feature_mask_function.__name__ == "t_softmax" else None
        for d in range(self.n_stages):
            if self.feature_mask_function.__name__ == "t_softmax":
                feature = self.feature_mask_function(self.feature_masks[d], t[d]) * x
            else:
                feature = self.feature_mask_function(self.feature_masks[d]) * x
            h_in = self.W_in[d](torch.cat([feature, h], dim=-1))
            z = torch.sigmoid(h_in[:, : self.n_features_in])
            r = torch.sigmoid(h_in[:, self.n_features_in :])
            h_out = torch.tanh(self.W_out[d](torch.cat([r * h, x], dim=-1)))
            h = self.dropout((1 - z) * h + z * h_out)
        return h

Bases: Module

门控指数线性单元 (GEGLU)

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
class GEGLU(nn.Module):
    """门控指数线性单元 (GEGLU)"""

    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        """    Args:
    d_model: 模型的维度
    d_ff: 前馈层的维度
    dropout: 丢弃概率
"""
        super().__init__()
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout, nn.GELU(), True, False, False, False)

    def forward(self, x: torch.Tensor):
        return self.ffn(x)

__init__(d_model, d_ff, dropout=0.1)

Args: d_model: 模型的维度 d_ff: 前馈层的维度 dropout: 丢弃概率

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        """    Args:
    d_model: 模型的维度
    d_ff: 前馈层的维度
    dropout: 丢弃概率
"""
        super().__init__()
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout, nn.GELU(), True, False, False, False)

Bases: Module

ReGLU.

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
class ReGLU(nn.Module):
    """    ReGLU."""

    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        """    Args:
    d_model: 模型的维度
    d_ff: 前馈层的维度
    dropout: 丢弃概率
"""
        super().__init__()
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout, nn.ReLU(), True, False, False, False)

    def forward(self, x: torch.Tensor):
        return self.ffn(x)

__init__(d_model, d_ff, dropout=0.1)

Args: d_model: 模型的维度 d_ff: 前馈层的维度 dropout: 丢弃概率

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        """    Args:
    d_model: 模型的维度
    d_ff: 前馈层的维度
    dropout: 丢弃概率
"""
        super().__init__()
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout, nn.ReLU(), True, False, False, False)

Bases: Module

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
class SwiGLU(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        """    Args:
    d_model: 模型的维度
    d_ff: 前馈层的维度
    dropout: 丢弃概率
"""
        super().__init__()
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout, nn.SiLU(), True, False, False, False)

    def forward(self, x: torch.Tensor):
        return self.ffn(x)

__init__(d_model, d_ff, dropout=0.1)

Args: d_model: 模型的维度 d_ff: 前馈层的维度 dropout: 丢弃概率

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        """    Args:
    d_model: 模型的维度
    d_ff: 前馈层的维度
    dropout: 丢弃概率
"""
        super().__init__()
        self.ffn = PositionWiseFeedForward(d_model, d_ff, dropout, nn.SiLU(), True, False, False, False)

Bases: Module

title: 逐位置前馈网络 (FFN) summary: 逐位置前馈网络的可重用实现文档.

逐位置前馈网络 (FFN)

这是 PyTorch 实现的逐位置前馈网络,用于Transformer模型. FFN由两个全连接层组成.隐藏层的维度 $d_{ff}$ 通常设置为词嵌入维度 $d_{model}$ 的四倍左右. 因此,它有时也被称为扩展-收缩网络.隐藏层有一个激活函数,通常设置为ReLU(修正线性单元)激活,即 $$\max(0, x)$$ 也就是说,FFN函数为, $$FFN(x, W_1, W_2, b_1, b_2) = \max(0, x W_1 + b_1) W_2 + b_2$$ 其中 $W_1$、$W_2$、$b_1$ 和 $b_2$ 是可学习的参数.有时也会使用GELU(高斯误差线性单元)激活代替ReLU. $$x \Phi(x)$$ 其中 $\Phi(x) = P(X \le x), X \sim \mathcal{N}(0,1)$

门控线性单元

这是一个支持不同变体的通用实现,包括 门控线性单元 (GLU).

Source code in src/pytorch_tabular/models/common/layers/gated_units.py
class PositionWiseFeedForward(nn.Module):
    r"""title: 逐位置前馈网络 (FFN)
summary: 逐位置前馈网络的可重用实现文档.

# 逐位置前馈网络 (FFN)
这是 [PyTorch](https://pytorch.org) 实现的逐位置前馈网络,用于Transformer模型.
FFN由两个全连接层组成.隐藏层的维度 $d_{ff}$ 通常设置为词嵌入维度 $d_{model}$ 的四倍左右.
因此,它有时也被称为扩展-收缩网络.隐藏层有一个激活函数,通常设置为ReLU(修正线性单元)激活,即 $$\\max(0, x)$$
也就是说,FFN函数为,
$$FFN(x, W_1, W_2, b_1, b_2) = \\max(0, x W_1 + b_1) W_2 + b_2$$
其中 $W_1$、$W_2$、$b_1$ 和 $b_2$ 是可学习的参数.有时也会使用GELU(高斯误差线性单元)激活代替ReLU.
$$x \\Phi(x)$$ 其中 $\\Phi(x) = P(X \\le x), X \\sim \\mathcal{N}(0,1)$
### 门控线性单元
这是一个支持不同变体的通用实现,包括 [门控线性单元](https://arxiv.org/abs/2002.05202) (GLU)."""

    def __init__(
        self,
        d_model: int,
        d_ff: int,
        dropout: float = 0.1,
        activation=nn.ReLU(),
        is_gated: bool = False,
        bias1: bool = True,
        bias2: bool = True,
        bias_gate: bool = True,
    ):
        """* `d_model` 是词嵌入中的特征数量
* `d_ff` 是前馈网络隐藏层中的特征数量
* `dropout` 是隐藏层的dropout概率
* `is_gated` 指定隐藏层是否为门控
* `bias1` 指定第一个全连接层是否应具有可学习的偏置
* `bias2` 指定第二个全连接层是否应具有可学习的偏置
* `bias_gate` 指定门控的全连接层是否应具有可学习的偏置
"""
        super().__init__()
        # Layer one parameterized by weight $W_1$ and bias $b_1$
        self.layer1 = nn.Linear(d_model, d_ff, bias=bias1)
        # Layer one parameterized by weight $W_1$ and bias $b_1$
        self.layer2 = nn.Linear(d_ff, d_model, bias=bias2)
        # Hidden layer dropout
        self.dropout = nn.Dropout(dropout)
        # Activation function $f$
        self.activation = activation
        # Whether there is a gate
        self.is_gated = is_gated
        if is_gated:
            # If there is a gate the linear layer to transform inputs to
            # be multiplied by the gate, parameterized by weight $V$ and bias $c$
            self.linear_v = nn.Linear(d_model, d_ff, bias=bias_gate)

    def forward(self, x: torch.Tensor):
        # $f(x W_1 + b_1)$
        g = self.activation(self.layer1(x))
        # If gated, $f(x W_1 + b_1) \otimes (x V + b) $
        if self.is_gated:
            x = g * self.linear_v(x)
        # Otherwise
        else:
            x = g
        # Apply dropout
        x = self.dropout(x)
        # $(f(x W_1 + b_1) \otimes (x V + b)) W_2 + b_2$ or $f(x W_1 + b_1) W_2 + b_2$
        # depending on whether it is gated
        return self.layer2(x)

__init__(d_model, d_ff, dropout=0.1, activation=nn.ReLU(), is_gated=False, bias1=True, bias2=True, bias_gate=True)

  • d_model 是词嵌入中的特征数量
  • d_ff 是前馈网络隐藏层中的特征数量
  • dropout 是隐藏层的dropout概率
  • is_gated 指定隐藏层是否为门控
  • bias1 指定第一个全连接层是否应具有可学习的偏置
  • bias2 指定第二个全连接层是否应具有可学习的偏置
  • bias_gate 指定门控的全连接层是否应具有可学习的偏置
Source code in src/pytorch_tabular/models/common/layers/gated_units.py
    def __init__(
        self,
        d_model: int,
        d_ff: int,
        dropout: float = 0.1,
        activation=nn.ReLU(),
        is_gated: bool = False,
        bias1: bool = True,
        bias2: bool = True,
        bias_gate: bool = True,
    ):
        """* `d_model` 是词嵌入中的特征数量
* `d_ff` 是前馈网络隐藏层中的特征数量
* `dropout` 是隐藏层的dropout概率
* `is_gated` 指定隐藏层是否为门控
* `bias1` 指定第一个全连接层是否应具有可学习的偏置
* `bias2` 指定第二个全连接层是否应具有可学习的偏置
* `bias_gate` 指定门控的全连接层是否应具有可学习的偏置
"""
        super().__init__()
        # Layer one parameterized by weight $W_1$ and bias $b_1$
        self.layer1 = nn.Linear(d_model, d_ff, bias=bias1)
        # Layer one parameterized by weight $W_1$ and bias $b_1$
        self.layer2 = nn.Linear(d_ff, d_model, bias=bias2)
        # Hidden layer dropout
        self.dropout = nn.Dropout(dropout)
        # Activation function $f$
        self.activation = activation
        # Whether there is a gate
        self.is_gated = is_gated
        if is_gated:
            # If there is a gate the linear layer to transform inputs to
            # be multiplied by the gate, parameterized by weight $V$ and bias $c$
            self.linear_v = nn.Linear(d_model, d_ff, bias=bias_gate)

软树

Bases: Module

Source code in src/pytorch_tabular/models/common/layers/soft_trees.py
class NeuralDecisionTree(nn.Module):
    def __init__(
        self,
        depth: int,
        n_features: int,
        dropout: float = 0,
        binning_activation: Callable = entmax15,
        feature_mask_function: Callable = entmax15,
        feature_sparsity: float = 0.8,
        learnable_sparsity: bool = True,
    ):
        super().__init__()
        self.depth = depth
        self._num_cutpoints = 1
        self.n_features = n_features
        self._dropout = dropout
        self.binning_activation = binning_activation
        self.feature_mask_function = feature_mask_function
        self.feature_sparsity = feature_sparsity
        self.learnable_sparsity = learnable_sparsity
        self._build_network()

    def _build_network(self):
        for d in range(self.depth):
            for n in range(max(2 ** (d), 1)):
                self.add_module(
                    f"decision_stump_{d}_{n}",
                    NeuralDecisionStump(
                        self.n_features + (2 ** (d) if d > 0 else 0),
                        self.binning_activation,
                        self.feature_mask_function,
                        self.feature_sparsity,
                        self.learnable_sparsity,
                    ),
                )
        self.dropout = nn.Dropout(self._dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        tree_input = x
        feature_masks = []
        for d in range(self.depth):
            layer_nodes = []
            layer_feature_masks = []
            for n in range(max(2 ** (d), 1)):
                leaf_nodes, feature_mask = self._modules[f"decision_stump_{d}_{n}"](tree_input)
                layer_nodes.append(leaf_nodes)
                layer_feature_masks.append(feature_mask)
            layer_nodes = torch.cat(layer_nodes, dim=1)
            tree_input = torch.cat([x, layer_nodes], dim=1)
            feature_masks.append(layer_feature_masks)
        return self.dropout(layer_nodes), feature_masks

Bases: ModuleWithInit

Source code in src/pytorch_tabular/models/common/layers/soft_trees.py
class ODST(ModuleWithInit):
    def __init__(
        self,
        in_features,
        num_trees,
        depth=6,
        tree_output_dim=1,
        flatten_output=True,
        choice_function=sparsemax,
        bin_function=sparsemoid,
        initialize_response_=nn.init.normal_,
        initialize_selection_logits_=nn.init.uniform_,
        threshold_init_beta=1.0,
        threshold_init_cutoff=1.0,
    ):
        """    Oblivious Differentiable Sparsemax Trees. http://tinyurl.com/odst-readmore 可以将此模块替换为nn.Linear,随处使用.

:param in_features: 输入张量的特征数量
:param num_trees: 该层中的树的数量
:param tree_dim: 单棵树响应中的响应通道数量
:param depth: 每棵树中的分割数量
:param flatten_output: 如果为False,返回[..., num_trees, tree_dim],
    默认返回[..., num_trees * tree_dim]
:param choice_function: f(tensor, dim) -> R_simplex 计算特征权重,使得 f(tensor, dim).sum(dim) == 1
:param bin_function: f(tensor) -> R[0, 1],计算树叶子权重

:param initialize_response_: 树输出张量的就地初始化器
:param initialize_selection_logits_: 选择树特征的对数概率的就地初始化器
阈值和尺度均使用数据感知初始化(或通过.load_state_dict加载)
:param threshold_init_beta: 将阈值初始化为数据点的q分位数
    其中 q ~ Beta(:threshold_init_beta:, :threshold_init_beta:)
    如果此参数设置为1,初始阈值将具有与数据点相同的分布
    如果大于1(例如10),阈值将更接近中位数数据值
    如果小于1(例如0.1),阈值将接近数据的最小/最大值.

:param threshold_init_cutoff: 阈值对数温度的初始化器,取值范围为(0, inf)
    默认情况下(1.0),对数温度被初始化为使得所有二进制选择器
    最终位于稀疏S形函数的线性区域.然后温度将按此参数进行缩放.
    设置此值 > 1.0 将在数据点和稀疏S形函数的截止值之间产生一些余量
    设置此值 < 1.0 将导致 (1 - value) 部分数据点最终位于稀疏S形函数的平坦区域
    例如,threshold_init_cutoff = 0.9 将设置10%的点等于0.0或1.0
    设置此值 > 1.0 将在数据点和稀疏S形函数的截止值之间产生余量
    所有点将介于 (0.5 - 0.5 / threshold_init_cutoff) 和 (0.5 + 0.5 / threshold_init_cutoff) 之间
"""
        super().__init__()
        self.depth, self.num_trees, self.tree_dim, self.flatten_output = (
            depth,
            num_trees,
            tree_output_dim,
            flatten_output,
        )
        self.choice_function, self.bin_function = choice_function, bin_function
        self.threshold_init_beta, self.threshold_init_cutoff = (
            threshold_init_beta,
            threshold_init_cutoff,
        )

        self.response = nn.Parameter(torch.zeros([num_trees, tree_output_dim, 2**depth]), requires_grad=True)
        initialize_response_(self.response)

        self.feature_selection_logits = nn.Parameter(torch.zeros([in_features, num_trees, depth]), requires_grad=True)
        initialize_selection_logits_(self.feature_selection_logits)

        self.feature_thresholds = nn.Parameter(
            torch.full([num_trees, depth], float("nan"), dtype=torch.float32),
            requires_grad=True,
        )  # nan values will be initialized on first batch (data-aware init)
        self.log_temperatures = nn.Parameter(
            torch.full([num_trees, depth], float("nan"), dtype=torch.float32),
            requires_grad=True,
        )

        # binary codes for mapping between 1-hot vectors and bin indices
        with torch.no_grad():
            indices = torch.arange(2**self.depth)
            offsets = 2 ** torch.arange(self.depth)
            bin_codes = (indices.view(1, -1) // offsets.view(-1, 1) % 2).to(torch.float32)
            bin_codes_1hot = torch.stack([bin_codes, 1.0 - bin_codes], dim=-1)
            self.bin_codes_1hot = nn.Parameter(bin_codes_1hot, requires_grad=False)
            # ^-- [depth, 2 ** depth, 2]

    def forward(self, input):
        assert len(input.shape) >= 2
        if len(input.shape) > 2:
            return self.forward(input.view(-1, input.shape[-1])).view(*input.shape[:-1], -1)
        # new input shape: [batch_size, in_features]

        feature_logits = self.feature_selection_logits
        feature_selectors = self.choice_function(feature_logits, dim=0)
        # ^--[in_features, num_trees, depth]

        feature_values = torch.einsum("bi,ind->bnd", input, feature_selectors)
        # ^--[batch_size, num_trees, depth]

        threshold_logits = (feature_values - self.feature_thresholds) * torch.exp(-self.log_temperatures)

        threshold_logits = torch.stack([-threshold_logits, threshold_logits], dim=-1)
        # ^--[batch_size, num_trees, depth, 2]

        bins = self.bin_function(threshold_logits)
        # ^--[batch_size, num_trees, depth, 2], approximately binary

        bin_matches = torch.einsum("btds,dcs->btdc", bins, self.bin_codes_1hot)
        # ^--[batch_size, num_trees, depth, 2 ** depth]

        response_weights = torch.prod(bin_matches, dim=-2)
        # ^-- [batch_size, num_trees, 2 ** depth]

        response = torch.einsum("bnd,ncd->bnc", response_weights, self.response)
        # ^-- [batch_size, num_trees, tree_dim]

        return response.flatten(1, 2) if self.flatten_output else response

    def initialize(self, input, eps=1e-6):
        # data-aware initializer
        assert len(input.shape) == 2
        if input.shape[0] < 1000:
            warn(
                "Data-aware initialization is performed on less than 1000 data points. This may cause instability."
                "To avoid potential problems, run this model on a data batch with at least 1000 data samples."
                "You can do so manually before training. Use with torch.no_grad() for memory efficiency."
            )
        with torch.no_grad():
            feature_selectors = self.choice_function(self.feature_selection_logits, dim=0)
            # ^--[in_features, num_trees, depth]

            feature_values = torch.einsum("bi,ind->bnd", input, feature_selectors)
            # ^--[batch_size, num_trees, depth]

            # initialize thresholds: sample random percentiles of data
            percentiles_q = 100 * np.random.beta(
                self.threshold_init_beta,
                self.threshold_init_beta,
                size=[self.num_trees, self.depth],
            )
            self.feature_thresholds.data[...] = torch.as_tensor(
                list(
                    map(
                        np.percentile,
                        check_numpy(feature_values.flatten(1, 2).t()),
                        percentiles_q.flatten(),
                    )
                ),
                dtype=feature_values.dtype,
                device=feature_values.device,
            ).view(self.num_trees, self.depth)

            # init temperatures: make sure enough data points are in the linear region of sparse-sigmoid
            temperatures = np.percentile(
                check_numpy(abs(feature_values - self.feature_thresholds)),
                q=100 * min(1.0, self.threshold_init_cutoff),
                axis=0,
            )

            # if threshold_init_cutoff > 1, scale everything down by it
            temperatures /= max(1.0, self.threshold_init_cutoff)
            self.log_temperatures.data[...] = torch.log(torch.as_tensor(temperatures) + eps)

    def __repr__(self):
        return (
            f"{self.__class__.__name__}(in_features={self.feature_selection_logits.shape[0]},"
            f" num_trees={self.num_trees},"
            f" depth={self.depth},"
            f" tree_dim={self.tree_dim},"
            f" flatten_output={self.flatten_output})"
        )

__init__(in_features, num_trees, depth=6, tree_output_dim=1, flatten_output=True, choice_function=sparsemax, bin_function=sparsemoid, initialize_response_=nn.init.normal_, initialize_selection_logits_=nn.init.uniform_, threshold_init_beta=1.0, threshold_init_cutoff=1.0)

Oblivious Differentiable Sparsemax Trees. http://tinyurl.com/odst-readmore 可以将此模块替换为nn.Linear,随处使用.

:param in_features: 输入张量的特征数量 :param num_trees: 该层中的树的数量 :param tree_dim: 单棵树响应中的响应通道数量 :param depth: 每棵树中的分割数量 :param flatten_output: 如果为False,返回[..., num_trees, tree_dim], 默认返回[..., num_trees * tree_dim] :param choice_function: f(tensor, dim) -> R_simplex 计算特征权重,使得 f(tensor, dim).sum(dim) == 1 :param bin_function: f(tensor) -> R[0, 1],计算树叶子权重

:param initialize_response_: 树输出张量的就地初始化器 :param initialize_selection_logits_: 选择树特征的对数概率的就地初始化器 阈值和尺度均使用数据感知初始化(或通过.load_state_dict加载) :param threshold_init_beta: 将阈值初始化为数据点的q分位数 其中 q ~ Beta(:threshold_init_beta:, :threshold_init_beta:) 如果此参数设置为1,初始阈值将具有与数据点相同的分布 如果大于1(例如10),阈值将更接近中位数数据值 如果小于1(例如0.1),阈值将接近数据的最小/最大值.

:param threshold_init_cutoff: 阈值对数温度的初始化器,取值范围为(0, inf) 默认情况下(1.0),对数温度被初始化为使得所有二进制选择器 最终位于稀疏S形函数的线性区域.然后温度将按此参数进行缩放. 设置此值 > 1.0 将在数据点和稀疏S形函数的截止值之间产生一些余量 设置此值 < 1.0 将导致 (1 - value) 部分数据点最终位于稀疏S形函数的平坦区域 例如,threshold_init_cutoff = 0.9 将设置10%的点等于0.0或1.0 设置此值 > 1.0 将在数据点和稀疏S形函数的截止值之间产生余量 所有点将介于 (0.5 - 0.5 / threshold_init_cutoff) 和 (0.5 + 0.5 / threshold_init_cutoff) 之间

Source code in src/pytorch_tabular/models/common/layers/soft_trees.py
    def __init__(
        self,
        in_features,
        num_trees,
        depth=6,
        tree_output_dim=1,
        flatten_output=True,
        choice_function=sparsemax,
        bin_function=sparsemoid,
        initialize_response_=nn.init.normal_,
        initialize_selection_logits_=nn.init.uniform_,
        threshold_init_beta=1.0,
        threshold_init_cutoff=1.0,
    ):
        """    Oblivious Differentiable Sparsemax Trees. http://tinyurl.com/odst-readmore 可以将此模块替换为nn.Linear,随处使用.

:param in_features: 输入张量的特征数量
:param num_trees: 该层中的树的数量
:param tree_dim: 单棵树响应中的响应通道数量
:param depth: 每棵树中的分割数量
:param flatten_output: 如果为False,返回[..., num_trees, tree_dim],
    默认返回[..., num_trees * tree_dim]
:param choice_function: f(tensor, dim) -> R_simplex 计算特征权重,使得 f(tensor, dim).sum(dim) == 1
:param bin_function: f(tensor) -> R[0, 1],计算树叶子权重

:param initialize_response_: 树输出张量的就地初始化器
:param initialize_selection_logits_: 选择树特征的对数概率的就地初始化器
阈值和尺度均使用数据感知初始化(或通过.load_state_dict加载)
:param threshold_init_beta: 将阈值初始化为数据点的q分位数
    其中 q ~ Beta(:threshold_init_beta:, :threshold_init_beta:)
    如果此参数设置为1,初始阈值将具有与数据点相同的分布
    如果大于1(例如10),阈值将更接近中位数数据值
    如果小于1(例如0.1),阈值将接近数据的最小/最大值.

:param threshold_init_cutoff: 阈值对数温度的初始化器,取值范围为(0, inf)
    默认情况下(1.0),对数温度被初始化为使得所有二进制选择器
    最终位于稀疏S形函数的线性区域.然后温度将按此参数进行缩放.
    设置此值 > 1.0 将在数据点和稀疏S形函数的截止值之间产生一些余量
    设置此值 < 1.0 将导致 (1 - value) 部分数据点最终位于稀疏S形函数的平坦区域
    例如,threshold_init_cutoff = 0.9 将设置10%的点等于0.0或1.0
    设置此值 > 1.0 将在数据点和稀疏S形函数的截止值之间产生余量
    所有点将介于 (0.5 - 0.5 / threshold_init_cutoff) 和 (0.5 + 0.5 / threshold_init_cutoff) 之间
"""
        super().__init__()
        self.depth, self.num_trees, self.tree_dim, self.flatten_output = (
            depth,
            num_trees,
            tree_output_dim,
            flatten_output,
        )
        self.choice_function, self.bin_function = choice_function, bin_function
        self.threshold_init_beta, self.threshold_init_cutoff = (
            threshold_init_beta,
            threshold_init_cutoff,
        )

        self.response = nn.Parameter(torch.zeros([num_trees, tree_output_dim, 2**depth]), requires_grad=True)
        initialize_response_(self.response)

        self.feature_selection_logits = nn.Parameter(torch.zeros([in_features, num_trees, depth]), requires_grad=True)
        initialize_selection_logits_(self.feature_selection_logits)

        self.feature_thresholds = nn.Parameter(
            torch.full([num_trees, depth], float("nan"), dtype=torch.float32),
            requires_grad=True,
        )  # nan values will be initialized on first batch (data-aware init)
        self.log_temperatures = nn.Parameter(
            torch.full([num_trees, depth], float("nan"), dtype=torch.float32),
            requires_grad=True,
        )

        # binary codes for mapping between 1-hot vectors and bin indices
        with torch.no_grad():
            indices = torch.arange(2**self.depth)
            offsets = 2 ** torch.arange(self.depth)
            bin_codes = (indices.view(1, -1) // offsets.view(-1, 1) % 2).to(torch.float32)
            bin_codes_1hot = torch.stack([bin_codes, 1.0 - bin_codes], dim=-1)
            self.bin_codes_1hot = nn.Parameter(bin_codes_1hot, requires_grad=False)

变换器

Bases: Module

Applies LayerNorm, Dropout 并加到输入上.

标准 Transformer 中的 AddNorm 操作

Source code in src/pytorch_tabular/models/common/layers/transformers.py
class AddNorm(nn.Module):
    """    Applies LayerNorm, Dropout 并加到输入上.

    标准 Transformer 中的 AddNorm 操作"""

    def __init__(self, input_dim: int, dropout: float):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(input_dim)

    def forward(self, X: torch.Tensor, Y: torch.Tensor) -> torch.Tensor:
        return self.ln(self.dropout(Y) + X)

Bases: Module

附加用于BERT推理的[CLS]标记.

Source code in src/pytorch_tabular/models/common/layers/transformers.py
class AppendCLSToken(nn.Module):
    """附加用于BERT推理的[CLS]标记."""

    def __init__(self, d_token: int, initialization: str) -> None:
        """初始化 self."""
        super().__init__()
        self.weight = nn.Parameter(torch.Tensor(d_token))
        d_sqrt_inv = 1 / math.sqrt(d_token)
        _initialize_kaiming(self.weight, initialization, d_sqrt_inv)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """执行前向传播."""
        assert x.ndim == 3
        return torch.cat([x, self.weight.view(1, 1, -1).repeat(len(x), 1, 1)], dim=1)

__init__(d_token, initialization)

初始化 self.

Source code in src/pytorch_tabular/models/common/layers/transformers.py
def __init__(self, d_token: int, initialization: str) -> None:
    """初始化 self."""
    super().__init__()
    self.weight = nn.Parameter(torch.Tensor(d_token))
    d_sqrt_inv = 1 / math.sqrt(d_token)
    _initialize_kaiming(self.weight, initialization, d_sqrt_inv)

forward(x)

执行前向传播.

Source code in src/pytorch_tabular/models/common/layers/transformers.py
def forward(self, x: torch.Tensor) -> torch.Tensor:
    """执行前向传播."""
    assert x.ndim == 3
    return torch.cat([x, self.weight.view(1, 1, -1).repeat(len(x), 1, 1)], dim=1)

Bases: Module

多头注意力块在变压器中.

Source code in src/pytorch_tabular/models/common/layers/transformers.py
class MultiHeadedAttention(nn.Module):
    """多头注意力块在变压器中."""

    def __init__(
        self,
        input_dim: int,
        num_heads: int = 8,
        head_dim: int = 16,
        dropout: int = 0.1,
        keep_attn: bool = True,
    ):
        super().__init__()
        assert input_dim % num_heads == 0, "'input_dim' must be multiples of 'num_heads'"
        inner_dim = head_dim * num_heads
        self.n_heads = num_heads
        self.scale = head_dim**-0.5
        self.keep_attn = keep_attn

        self.to_qkv = nn.Linear(input_dim, inner_dim * 3, bias=False)
        self.to_out = nn.Linear(inner_dim, input_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        h = self.n_heads
        q, k, v = self.to_qkv(x).chunk(3, dim=-1)
        q, k, v = (rearrange(t, "b n (h d) -> b h n d", h=h) for t in (q, k, v))
        sim = einsum("b h i d, b h j d -> b h i j", q, k) * self.scale

        attn = sim.softmax(dim=-1)
        attn = self.dropout(attn)
        if self.keep_attn:
            self.attn_weights = attn
        out = einsum("b h i j, b h j d -> b h i d", attn, v)
        out = rearrange(out, "b h n d -> b n (h d)", h=h)
        return self.to_out(out)

Bases: Module

单个变压器编码器块.

Source code in src/pytorch_tabular/models/common/layers/transformers.py
class TransformerEncoderBlock(nn.Module):
    """单个变压器编码器块."""

    def __init__(
        self,
        input_embed_dim: int,
        num_heads: int = 8,
        ff_hidden_multiplier: int = 4,
        ff_activation: str = "GEGLU",
        attn_dropout: float = 0.1,
        keep_attn: bool = True,
        ff_dropout: float = 0.1,
        add_norm_dropout: float = 0.1,
        transformer_head_dim: Optional[int] = None,
    ):
        """    Args:
    input_embed_dim: 输入嵌入维度
    num_heads: 注意力头数
    ff_hidden_multiplier: 逐位置前馈层的隐藏维度乘数
    ff_activation: 逐位置前馈层的激活函数
    attn_dropout: 注意力层的dropout概率
    keep_attn: 是否保留注意力权重
    ff_dropout: 逐位置前馈层的dropout概率
    add_norm_dropout: 残差连接的dropout概率
    transformer_head_dim: 注意力头的维度.如果为None,将默认为input_embed_dim
"""
        super().__init__()
        self.mha = MultiHeadedAttention(
            input_embed_dim,
            num_heads,
            head_dim=input_embed_dim if transformer_head_dim is None else transformer_head_dim,
            dropout=attn_dropout,
            keep_attn=keep_attn,
        )

        try:
            self.pos_wise_ff = GATED_UNITS[ff_activation](
                d_model=input_embed_dim,
                d_ff=input_embed_dim * ff_hidden_multiplier,
                dropout=ff_dropout,
            )
        except (AttributeError, KeyError):
            self.pos_wise_ff = PositionWiseFeedForward(
                d_model=input_embed_dim,
                d_ff=input_embed_dim * ff_hidden_multiplier,
                dropout=ff_dropout,
                activation=getattr(nn, ff_activation)(),
            )
        self.attn_add_norm = AddNorm(input_embed_dim, add_norm_dropout)
        self.ff_add_norm = AddNorm(input_embed_dim, add_norm_dropout)

    def forward(self, x):
        y = self.mha(x)
        x = self.attn_add_norm(x, y)
        y = self.pos_wise_ff(y)
        return self.ff_add_norm(x, y)

__init__(input_embed_dim, num_heads=8, ff_hidden_multiplier=4, ff_activation='GEGLU', attn_dropout=0.1, keep_attn=True, ff_dropout=0.1, add_norm_dropout=0.1, transformer_head_dim=None)

Args: input_embed_dim: 输入嵌入维度 num_heads: 注意力头数 ff_hidden_multiplier: 逐位置前馈层的隐藏维度乘数 ff_activation: 逐位置前馈层的激活函数 attn_dropout: 注意力层的dropout概率 keep_attn: 是否保留注意力权重 ff_dropout: 逐位置前馈层的dropout概率 add_norm_dropout: 残差连接的dropout概率 transformer_head_dim: 注意力头的维度.如果为None,将默认为input_embed_dim

Source code in src/pytorch_tabular/models/common/layers/transformers.py
    def __init__(
        self,
        input_embed_dim: int,
        num_heads: int = 8,
        ff_hidden_multiplier: int = 4,
        ff_activation: str = "GEGLU",
        attn_dropout: float = 0.1,
        keep_attn: bool = True,
        ff_dropout: float = 0.1,
        add_norm_dropout: float = 0.1,
        transformer_head_dim: Optional[int] = None,
    ):
        """    Args:
    input_embed_dim: 输入嵌入维度
    num_heads: 注意力头数
    ff_hidden_multiplier: 逐位置前馈层的隐藏维度乘数
    ff_activation: 逐位置前馈层的激活函数
    attn_dropout: 注意力层的dropout概率
    keep_attn: 是否保留注意力权重
    ff_dropout: 逐位置前馈层的dropout概率
    add_norm_dropout: 残差连接的dropout概率
    transformer_head_dim: 注意力头的维度.如果为None,将默认为input_embed_dim
"""
        super().__init__()
        self.mha = MultiHeadedAttention(
            input_embed_dim,
            num_heads,
            head_dim=input_embed_dim if transformer_head_dim is None else transformer_head_dim,
            dropout=attn_dropout,
            keep_attn=keep_attn,
        )

        try:
            self.pos_wise_ff = GATED_UNITS[ff_activation](
                d_model=input_embed_dim,
                d_ff=input_embed_dim * ff_hidden_multiplier,
                dropout=ff_dropout,
            )
        except (AttributeError, KeyError):
            self.pos_wise_ff = PositionWiseFeedForward(
                d_model=input_embed_dim,
                d_ff=input_embed_dim * ff_hidden_multiplier,
                dropout=ff_dropout,
                activation=getattr(nn, ff_activation)(),
            )
        self.attn_add_norm = AddNorm(input_embed_dim, add_norm_dropout)
        self.ff_add_norm = AddNorm(input_embed_dim, add_norm_dropout)

杂项

Bases: Module

A wrapper for a lambda function as a PyTorch module.

Source code in src/pytorch_tabular/models/common/layers/misc.py
class Lambda(nn.Module):
    """A wrapper for a lambda function as a PyTorch module."""

    def __init__(self, func: Callable):
        """初始化lambda模块
Parameters:
    func: 任意函数/可调用对象
"""
        super().__init__()
        self.func = func

    def forward(self, *args, **kwargs):
        return self.func(*args, **kwargs)

__init__(func)

初始化lambda模块 Parameters: func: 任意函数/可调用对象

Source code in src/pytorch_tabular/models/common/layers/misc.py
    def __init__(self, func: Callable):
        """初始化lambda模块
Parameters:
    func: 任意函数/可调用对象
"""
        super().__init__()
        self.func = func

Bases: Module

PyTorch模块的基类,在第一个批次上具有数据感知初始化器.

Source code in src/pytorch_tabular/models/common/layers/misc.py
class ModuleWithInit(nn.Module):
    """PyTorch模块的基类,在第一个批次上具有数据感知初始化器."""

    def __init__(self):
        super().__init__()
        self._is_initialized_tensor = nn.Parameter(torch.tensor(0, dtype=torch.uint8), requires_grad=False)
        self._is_initialized_bool = None
        # Note: this module uses a separate flag self._is_initialized so as to achieve both
        # * persistence: is_initialized is saved alongside model in state_dict
        # * speed: model doesn't need to cache
        # please DO NOT use these flags in child modules

    def initialize(self, *args, **kwargs):
        """使用第一批数据初始化模块张量."""
        raise NotImplementedError("Please implement ")

    def __call__(self, *args, **kwargs):
        if self._is_initialized_bool is None:
            self._is_initialized_bool = bool(self._is_initialized_tensor.item())
        if not self._is_initialized_bool:
            self.initialize(*args, **kwargs)
            self._is_initialized_tensor.data[...] = 1
            self._is_initialized_bool = True
        return super().__call__(*args, **kwargs)

initialize(*args, **kwargs)

使用第一批数据初始化模块张量.

Source code in src/pytorch_tabular/models/common/layers/misc.py
def initialize(self, *args, **kwargs):
    """使用第一批数据初始化模块张量."""
    raise NotImplementedError("Please implement ")

Bases: Module

Source code in src/pytorch_tabular/models/common/layers/misc.py
class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, **kwargs):
        return self.fn(x, **kwargs) + x

激活函数

Bases: Function

等价于lambda x: Entmax15([x, 0])的高效优化版本.

Source code in src/pytorch_tabular/models/common/layers/activations.py
class Entmoid15(Function):
    """等价于`lambda x: Entmax15([x, 0])`的高效优化版本."""

    @staticmethod
    def forward(ctx, input):
        output = Entmoid15._forward(input)
        ctx.save_for_backward(output)
        return output

    @staticmethod
    @script
    def _forward(input):
        input, is_pos = abs(input), input >= 0
        tau = (input + torch.sqrt(F.relu(8 - input**2))) / 2
        tau.masked_fill_(tau <= input, 2.0)
        y_neg = 0.25 * F.relu(tau - input, inplace=True) ** 2
        return torch.where(is_pos, 1 - y_neg, y_neg)

    @staticmethod
    def backward(ctx, grad_output):
        return Entmoid15._backward(ctx.saved_tensors[0], grad_output)

    @staticmethod
    @script
    def _backward(output, grad_output):
        gppr0, gppr1 = output.sqrt(), (1 - output).sqrt()
        grad_input = grad_output * gppr0
        q = grad_input / (gppr0 + gppr1)
        grad_input -= q * gppr0
        return grad_input

1.5-entmax:归一化稀疏变换(类似于softmax).

解决优化问题:

max_p <x, p> - H_1.5(p)    满足    p >= 0, sum(p) == 1.

其中 H_1.5(p) 是 alpha=1.5 时的 Tsallis alpha-熵.

Parameters

X : torch.Tensor 输入张量.

int

沿此维度应用1.5-entmax.

int 或 None

部分排序的最大元素数量.为了获得最佳性能,应略大于预期解中的非零元素数量. 如果解比 k 更稀疏,此函数将以 2*k 的计划递归调用. 如果为 None,则从一开始就进行完全排序.

Returns

P : torch 张量,形状与 X 相同 投影结果,使得 P.sum(dim=dim) == 1 逐元素成立.

sparsemax: 归一化稀疏变换(类似于softmax).

解决投影问题:

min_p ||x - p||_2   s.t.    p >= 0, sum(p) == 1.

Parameters

X : torch.Tensor 输入张量.

int

沿此维度应用sparsemax.

int 或 None

部分排序的最大元素数量.为了获得最佳性能,应略大于预期解中非零元素的数量.如果解比k稀疏,此函数将以2*k的调度递归调用. 如果为None,则从一开始就进行完全排序.

Returns

P : torch张量,形状与X相同 投影结果,使得P.sum(dim=dim) == 1逐元素成立.

Source code in src/pytorch_tabular/models/common/layers/activations.py
def sparsemoid(input):
    return (0.5 * input + 0.5).clamp_(0, 1)
Source code in src/pytorch_tabular/models/common/layers/activations.py
def t_softmax(input: Tensor, t: Tensor = None, dim: int = -1) -> Tensor:
    if t is None:
        t = torch.tensor(0.5, device=input.device)
    assert (t >= 0.0).all()
    maxes = torch.max(input, dim=dim, keepdim=True).values
    input_minus_maxes = input - maxes

    w = torch.relu(input_minus_maxes + t) + 1e-8
    return torch.softmax(input_minus_maxes + torch.log(w), dim=dim)

Bases: Module

Source code in src/pytorch_tabular/models/common/layers/activations.py
class TSoftmax(torch.nn.Module):
    def __init__(self, dim: int = -1):
        super().__init__()
        self.dim = dim

    def forward(self, input: Tensor, t: Tensor) -> Tensor:
        return t_softmax(input, t, self.dim)

Bases: Module

Source code in src/pytorch_tabular/models/common/layers/activations.py
class RSoftmax(torch.nn.Module):
    def __init__(self, dim: int = -1, eps: float = 1e-8):
        super().__init__()
        self.dim = dim
        self.eps = eps
        self.tsoftmax = TSoftmax(dim=dim)

    @classmethod
    def calculate_t(cls, input: Tensor, r: Tensor, dim: int = -1, eps: float = 1e-8):
        # r represents what is the fraction of zero values that we want to have
        assert ((0.0 <= r) & (r <= 1.0)).all()

        maxes = torch.max(input, dim=dim, keepdim=True).values
        input_minus_maxes = input - maxes

        zeros_mask = torch.exp(input_minus_maxes) == 0.0
        zeros_frac = zeros_mask.sum(dim=dim, keepdim=True).float() / input_minus_maxes.shape[dim]

        q = torch.clamp((r - zeros_frac) / (1 - zeros_frac), min=0.0, max=1.0)
        x_minus_maxes = input_minus_maxes * (~zeros_mask).float()
        if q.ndim > 1:
            t = -torch.quantile(x_minus_maxes, q.view(-1), dim=dim, keepdim=True).detach()
            t = t.squeeze(dim).diagonal(dim1=-2, dim2=-1).unsqueeze(-1) + eps
        else:
            t = -torch.quantile(x_minus_maxes, q, dim=dim).detach() + eps
        return t

    def forward(self, input: Tensor, r: Tensor):
        t = RSoftmax.calculate_t(input, r, self.dim, self.eps)
        return self.tsoftmax(input, t)