Skip to content

实用工具

特殊功能类

Bases: BaseEstimator, TransformerMixin

Source code in src/pytorch_tabular/categorical_encoders.py
class CategoricalEmbeddingTransformer(BaseEstimator, TransformerMixin):
    NAN_CATEGORY = 0

    def __init__(self, tabular_model):
        """初始化Transformer并提取神经嵌入.

Parameters:
    tabular_model (TabularModel): 训练好的TabularModel对象
"""
        self._categorical_encoder = tabular_model.datamodule.categorical_encoder
        self.cols = tabular_model.model.hparams.categorical_cols
        # dict {str: np.ndarray} column name --> mapping from category (index of df) to value (column of df)
        self._mapping = {}

        self._extract_embedding(tabular_model.model)

    def _extract_embedding(self, model):
        try:
            embedding_layer = model.extract_embedding()
        except ValueError as e:
            logger.error(
                f"Extracting embedding layer from model received this error: {e}."
                f" Some models do not support this feature."
            )
            embedding_layer = None
        if embedding_layer is not None:
            for i, col in enumerate(self.cols):
                self._mapping[col] = {}
                embedding = embedding_layer[i]
                self._mapping[col][self.NAN_CATEGORY] = embedding.weight[0, :].detach().cpu().numpy().ravel()
                for key in self._categorical_encoder._mapping[col].index:
                    self._mapping[col][key] = (
                        embedding.weight[self._categorical_encoder._mapping[col].loc[key], :]
                        .detach()
                        .cpu()
                        .numpy()
                        .ravel()
                    )
        else:
            raise ValueError("Passed model doesn't support this feature.")

    def fit(self, X, y=None):
        """只是为了兼容.

不做任何事情
"""
        return self

    def transform(self, X: DataFrame, y=None) -> DataFrame:
        """将指定的分类列转换为模型训练得到的神经嵌入.

Parameters:
    X (DataFrame): 特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为None.

引发:
    ValueError: [描述]

Returns:
    DataFrame: 编码后的DataFrame
"""
        if not self._mapping:
            raise ValueError(
                "Passed model should either have an attribute `embeddng_layers`"
                " or a method `extract_embedding` defined for `transform`."
            )
        assert all(c in X.columns for c in self.cols)

        X_encoded = X.copy(deep=True)
        for col, mapping in track(
            self._mapping.items(),
            description="Encoding the data...",
            total=len(self._mapping.values()),
        ):
            for dim in range(mapping[self.NAN_CATEGORY].shape[0]):
                X_encoded.loc[:, f"{col}_embed_dim_{dim}"] = (
                    X_encoded[col].fillna(self.NAN_CATEGORY).map({k: v[dim] for k, v in mapping.items()})
                )
                # Filling unseen categories also with NAN Embedding
                X_encoded[f"{col}_embed_dim_{dim}"].fillna(mapping[self.NAN_CATEGORY][dim], inplace=True)
        X_encoded.drop(columns=self.cols, inplace=True)
        return X_encoded

    def fit_transform(self, X: DataFrame, y=None) -> DataFrame:
        """    根据学习到的嵌入对给定的X列进行编码.

Parameters:
    X (DataFrame): 特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为None.

Returns:
    DataFrame: 编码后的DataFrame
"""
        self.fit(X, y)
        return self.transform(X)

    def save_as_object_file(self, path):
        if not self._mapping:
            raise ValueError("`fit` method must be called before `save_as_object_file`.")
        pickle.dump(self.__dict__, open(path, "wb"))

    def load_from_object_file(self, path):
        for k, v in pickle.load(open(path, "rb")).items():
            setattr(self, k, v)

__init__(tabular_model)

初始化Transformer并提取神经嵌入.

Parameters:

Name Type Description Default
tabular_model TabularModel

训练好的TabularModel对象

required
Source code in src/pytorch_tabular/categorical_encoders.py
    def __init__(self, tabular_model):
        """初始化Transformer并提取神经嵌入.

Parameters:
    tabular_model (TabularModel): 训练好的TabularModel对象
"""
        self._categorical_encoder = tabular_model.datamodule.categorical_encoder
        self.cols = tabular_model.model.hparams.categorical_cols
        # dict {str: np.ndarray} column name --> mapping from category (index of df) to value (column of df)
        self._mapping = {}

        self._extract_embedding(tabular_model.model)

fit(X, y=None)

只是为了兼容.

不做任何事情

Source code in src/pytorch_tabular/categorical_encoders.py
    def fit(self, X, y=None):
        """只是为了兼容.

不做任何事情
"""
        return self

fit_transform(X, y=None)

根据学习到的嵌入对给定的X列进行编码.

Parameters:

Name Type Description Default
X DataFrame

特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.

required
y ([type], 可选)

仅用于兼容性.未使用.默认为None.

None

Returns:

Name Type Description
DataFrame DataFrame

编码后的DataFrame

Source code in src/pytorch_tabular/categorical_encoders.py
    def fit_transform(self, X: DataFrame, y=None) -> DataFrame:
        """    根据学习到的嵌入对给定的X列进行编码.

Parameters:
    X (DataFrame): 特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为None.

Returns:
    DataFrame: 编码后的DataFrame
"""
        self.fit(X, y)
        return self.transform(X)

transform(X, y=None)

将指定的分类列转换为模型训练得到的神经嵌入.

Parameters:

Name Type Description Default
X DataFrame

特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.

required
y ([type], 可选)

仅用于兼容性.未使用.默认为None.

None
引发

ValueError: [描述]

Returns:

Name Type Description
DataFrame DataFrame

编码后的DataFrame

Source code in src/pytorch_tabular/categorical_encoders.py
    def transform(self, X: DataFrame, y=None) -> DataFrame:
        """将指定的分类列转换为模型训练得到的神经嵌入.

Parameters:
    X (DataFrame): 特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为None.

引发:
    ValueError: [描述]

Returns:
    DataFrame: 编码后的DataFrame
"""
        if not self._mapping:
            raise ValueError(
                "Passed model should either have an attribute `embeddng_layers`"
                " or a method `extract_embedding` defined for `transform`."
            )
        assert all(c in X.columns for c in self.cols)

        X_encoded = X.copy(deep=True)
        for col, mapping in track(
            self._mapping.items(),
            description="Encoding the data...",
            total=len(self._mapping.values()),
        ):
            for dim in range(mapping[self.NAN_CATEGORY].shape[0]):
                X_encoded.loc[:, f"{col}_embed_dim_{dim}"] = (
                    X_encoded[col].fillna(self.NAN_CATEGORY).map({k: v[dim] for k, v in mapping.items()})
                )
                # Filling unseen categories also with NAN Embedding
                X_encoded[f"{col}_embed_dim_{dim}"].fillna(mapping[self.NAN_CATEGORY][dim], inplace=True)
        X_encoded.drop(columns=self.cols, inplace=True)
        return X_encoded

Bases: BaseEstimator, TransformerMixin

Source code in src/pytorch_tabular/feature_extractor.py
class DeepFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, tabular_model, extract_keys=["backbone_features"], drop_original=True):
        """初始化Transformer并提取神经特征.

Parameters:
    tabular_model (TabularModel): 训练好的TabularModel对象
    extract_keys (list, 可选): 要提取的特征的键.默认为["backbone_features"].
    drop_original (bool, 可选): 是否删除原始列.默认为True.
"""
        assert not (
            isinstance(tabular_model.model, NODEModel)
            or isinstance(tabular_model.model, TabNetModel)
            or isinstance(tabular_model.model, MDNModel)
        ), "FeatureExtractor doesn't work for Mixture Density Networks, NODE Model, & Tabnet Model"
        self.tabular_model = tabular_model
        self.extract_keys = extract_keys
        self.drop_original = drop_original

    def fit(self, X, y=None):
        """只是为了兼容.

不做任何事情
"""
        return self

    def transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
        """将指定的分类列转换为模型训练得到的神经特征.

Parameters:
    X (pd.DataFrame): 特征数据框,形状为 (n_samples, n_features).必须包含需要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为 None.

Raises:
    ValueError: [描述]

Returns:
    pd.DataFrame: 编码后的数据框
"""

        X_encoded = X.copy(deep=True)
        orig_features = X_encoded.columns
        self.tabular_model.model.eval()
        inference_dataloader = self.tabular_model.datamodule.prepare_inference_dataloader(X_encoded)
        logits_predictions = defaultdict(list)
        for batch in track(inference_dataloader, description="Generating Features..."):
            for k, v in batch.items():
                if isinstance(v, list) and (len(v) == 0):
                    # Skipping empty list
                    continue
                batch[k] = v.to(self.tabular_model.model.device)
            if self.tabular_model.config.task == "ssl":
                ret_value = {"backbone_features": self.tabular_model.model.predict(batch, ret_model_output=True)}
            else:
                _, ret_value = self.tabular_model.model.predict(batch, ret_model_output=True)
            for k in self.extract_keys:
                if k in ret_value.keys():
                    logits_predictions[k].append(ret_value[k].detach().cpu())

        for k, v in logits_predictions.items():
            v = torch.cat(v, dim=0).numpy()
            if v.ndim == 1:
                v = v.reshape(-1, 1)
            for i in range(v.shape[-1]):
                if v.shape[-1] > 1:
                    X_encoded[f"{k}_{i}"] = v[:, i]
                else:
                    X_encoded[f"{k}"] = v[:, i]

        if self.drop_original:
            X_encoded.drop(columns=orig_features, inplace=True)
        return X_encoded

    def fit_transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
        """    基于学习到的特征对给定的X列进行编码.

Parameters:
    X (pd.DataFrame): 特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为None.

Returns:
    pd.DataFrame: 编码后的DataFrame
"""
        self.fit(X, y)
        return self.transform(X)

    def save_as_object_file(self, path):
        """保存特征提取器为pickle文件.

Parameters:
    path (str): 保存文件的路径
"""
        if not self._mapping:
            raise ValueError("`fit` method must be called before `save_as_object_file`.")
        pickle.dump(self.__dict__, open(path, "wb"))

    def load_from_object_file(self, path):
        """加载从pickle文件中的特征提取器.

Parameters:
    path (str): 文件的加载路径
"""
        for k, v in pickle.load(open(path, "rb")).items():
            setattr(self, k, v)

__init__(tabular_model, extract_keys=['backbone_features'], drop_original=True)

初始化Transformer并提取神经特征.

Parameters:

Name Type Description Default
tabular_model TabularModel

训练好的TabularModel对象

required
extract_keys (list, 可选)

要提取的特征的键.默认为["backbone_features"].

['backbone_features']
drop_original (bool, 可选)

是否删除原始列.默认为True.

True
Source code in src/pytorch_tabular/feature_extractor.py
    def __init__(self, tabular_model, extract_keys=["backbone_features"], drop_original=True):
        """初始化Transformer并提取神经特征.

Parameters:
    tabular_model (TabularModel): 训练好的TabularModel对象
    extract_keys (list, 可选): 要提取的特征的键.默认为["backbone_features"].
    drop_original (bool, 可选): 是否删除原始列.默认为True.
"""
        assert not (
            isinstance(tabular_model.model, NODEModel)
            or isinstance(tabular_model.model, TabNetModel)
            or isinstance(tabular_model.model, MDNModel)
        ), "FeatureExtractor doesn't work for Mixture Density Networks, NODE Model, & Tabnet Model"
        self.tabular_model = tabular_model
        self.extract_keys = extract_keys
        self.drop_original = drop_original

fit(X, y=None)

只是为了兼容.

不做任何事情

Source code in src/pytorch_tabular/feature_extractor.py
    def fit(self, X, y=None):
        """只是为了兼容.

不做任何事情
"""
        return self

fit_transform(X, y=None)

基于学习到的特征对给定的X列进行编码.

Parameters:

Name Type Description Default
X DataFrame

特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.

required
y ([type], 可选)

仅用于兼容性.未使用.默认为None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: 编码后的DataFrame

Source code in src/pytorch_tabular/feature_extractor.py
    def fit_transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
        """    基于学习到的特征对给定的X列进行编码.

Parameters:
    X (pd.DataFrame): 特征的DataFrame,形状为(n_samples, n_features).必须包含要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为None.

Returns:
    pd.DataFrame: 编码后的DataFrame
"""
        self.fit(X, y)
        return self.transform(X)

load_from_object_file(path)

加载从pickle文件中的特征提取器.

Parameters:

Name Type Description Default
path str

文件的加载路径

required
Source code in src/pytorch_tabular/feature_extractor.py
    def load_from_object_file(self, path):
        """加载从pickle文件中的特征提取器.

Parameters:
    path (str): 文件的加载路径
"""
        for k, v in pickle.load(open(path, "rb")).items():
            setattr(self, k, v)

save_as_object_file(path)

保存特征提取器为pickle文件.

Parameters:

Name Type Description Default
path str

保存文件的路径

required
Source code in src/pytorch_tabular/feature_extractor.py
    def save_as_object_file(self, path):
        """保存特征提取器为pickle文件.

Parameters:
    path (str): 保存文件的路径
"""
        if not self._mapping:
            raise ValueError("`fit` method must be called before `save_as_object_file`.")
        pickle.dump(self.__dict__, open(path, "wb"))

transform(X, y=None)

将指定的分类列转换为模型训练得到的神经特征.

Parameters:

Name Type Description Default
X DataFrame

特征数据框,形状为 (n_samples, n_features).必须包含需要编码的列.

required
y ([type], 可选)

仅用于兼容性.未使用.默认为 None.

None

Raises:

Type Description
ValueError

[描述]

Returns:

Type Description
DataFrame

pd.DataFrame: 编码后的数据框

Source code in src/pytorch_tabular/feature_extractor.py
    def transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
        """将指定的分类列转换为模型训练得到的神经特征.

Parameters:
    X (pd.DataFrame): 特征数据框,形状为 (n_samples, n_features).必须包含需要编码的列.
    y ([type], 可选): 仅用于兼容性.未使用.默认为 None.

Raises:
    ValueError: [描述]

Returns:
    pd.DataFrame: 编码后的数据框
"""

        X_encoded = X.copy(deep=True)
        orig_features = X_encoded.columns
        self.tabular_model.model.eval()
        inference_dataloader = self.tabular_model.datamodule.prepare_inference_dataloader(X_encoded)
        logits_predictions = defaultdict(list)
        for batch in track(inference_dataloader, description="Generating Features..."):
            for k, v in batch.items():
                if isinstance(v, list) and (len(v) == 0):
                    # Skipping empty list
                    continue
                batch[k] = v.to(self.tabular_model.model.device)
            if self.tabular_model.config.task == "ssl":
                ret_value = {"backbone_features": self.tabular_model.model.predict(batch, ret_model_output=True)}
            else:
                _, ret_value = self.tabular_model.model.predict(batch, ret_model_output=True)
            for k in self.extract_keys:
                if k in ret_value.keys():
                    logits_predictions[k].append(ret_value[k].detach().cpu())

        for k, v in logits_predictions.items():
            v = torch.cat(v, dim=0).numpy()
            if v.ndim == 1:
                v = v.reshape(-1, 1)
            for i in range(v.shape[-1]):
                if v.shape[-1] > 1:
                    X_encoded[f"{k}_{i}"] = v[:, i]
                else:
                    X_encoded[f"{k}"] = v[:, i]

        if self.drop_original:
            X_encoded.drop(columns=orig_features, inplace=True)
        return X_encoded

数据实用工具

Source code in src/pytorch_tabular/utils/data_utils.py
def get_balanced_sampler(y_train):
    assert y_train.ndim == 1, "Utility function only works for binary classification"
    y_train = LabelEncoder().fit_transform(y_train)
    class_sample_counts = np.bincount(y_train)
    # compute weight for all the samples in the dataset
    # samples_weights contain the probability for each example in dataset to be sampled
    class_weights = 1.0 / torch.Tensor(class_sample_counts)
    train_samples_weight = [class_weights[class_id] for class_id in y_train]
    # now lets initialize samplers
    train_sampler = torch.utils.data.sampler.WeightedRandomSampler(train_samples_weight, len(y_train))
    return train_sampler
Source code in src/pytorch_tabular/utils/data_utils.py
def get_class_weighted_cross_entropy(y_train, mu=1.0):
    assert y_train.ndim == 1, "Utility function only works for binary classification"
    y_train = LabelEncoder().fit_transform(y_train)
    weights = _make_smooth_weights_for_balanced_classes(y_train, mu=mu)
    criterion = torch.nn.CrossEntropyLoss(weight=torch.FloatTensor(weights))
    return criterion
Source code in src/pytorch_tabular/utils/data_utils.py
def get_gaussian_centers(y, n_components):
    if isinstance(y, Series) or isinstance(y, DataFrame):
        y = y.values
    if y.ndim == 1:
        y = y.reshape(-1, 1)
    cluster = KMeans(n_clusters=n_components, random_state=42).fit(y)
    return cluster.cluster_centers_.ravel().tolist()

预测森林覆盖类型仅基于制图变量(无遥感数据).给定观测值(30 x 30米单元格)的实际森林覆盖类型由美国林业局(USFS)第二区资源信息系统(RIS)数据确定.自变量源自最初从美国地质调查局(USGS)和美国林业局获取的数据.数据为原始形式(未缩放),并包含定性自变量(荒野区域和土壤类型)的二进制(0或1)列数据.

该研究区域包括位于科罗拉多州北部罗斯福国家森林内的四个荒野区域.这些区域代表受人为干扰最小的森林,因此现有森林覆盖类型更多是生态过程的结果,而非森林管理实践的结果.

数据源自UCI机器学习库,但进行了小幅改动: - 独热编码列已转换为分类变量 - 土壤类型和荒野类型

Parameters:

Name Type Description Default
download_dir str

下载数据的目标目录.默认为None,将下载至~/.pytorch_tabular/datasets/

None
Source code in src/pytorch_tabular/utils/data_utils.py
def load_covertype_dataset(download_dir=None):
    """预测森林覆盖类型仅基于制图变量(无遥感数据).给定观测值(30 x 30米单元格)的实际森林覆盖类型由美国林业局(USFS)第二区资源信息系统(RIS)数据确定.自变量源自最初从美国地质调查局(USGS)和美国林业局获取的数据.数据为原始形式(未缩放),并包含定性自变量(荒野区域和土壤类型)的二进制(0或1)列数据.

该研究区域包括位于科罗拉多州北部罗斯福国家森林内的四个荒野区域.这些区域代表受人为干扰最小的森林,因此现有森林覆盖类型更多是生态过程的结果,而非森林管理实践的结果.

数据源自[UCI机器学习库](https://archive.ics.uci.edu/ml/datasets/covertype),但进行了小幅改动:
- 独热编码列已转换为分类变量 - 土壤类型和荒野类型

Parameters:
    download_dir (str): 下载数据的目标目录.默认为None,将下载至~/.pytorch_tabular/datasets/
"""
    if download_dir is None:
        download_dir = os.path.join(os.path.expanduser("~"), ".pytorch_tabular", "datasets")
    if not os.path.exists(download_dir):
        os.makedirs(download_dir)
    file_path = os.path.join(download_dir, "covertype.csv")
    if not os.path.exists(file_path):
        logger.info("Downloading Covertype Dataset")
        url = "https://archive.ics.uci.edu/ml/machine-learning-databases/covtype/covtype.data.gz"
        response = requests.get(url)
        with open(os.path.join(download_dir, "covertype.data.gz"), "wb") as f:
            f.write(response.content)
        with gzip.open(os.path.join(download_dir, "covertype.data.gz"), "rb") as f_in:
            with open(os.path.join(download_dir, "covertype.csv"), "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(os.path.join(download_dir, "covertype.data.gz"))
    df = pd.read_csv(file_path, header=None)
    df.columns = (
        [
            "Elevation",
            "Aspect",
            "Slope",
            "Horizontal_Distance_To_Hydrology",
            "Vertical_Distance_To_Hydrology",
            "Horizontal_Distance_To_Roadways",
            "Hillshade_9am",
            "Hillshade_Noon",
            "Hillshade_3pm",
            "Horizontal_Distance_To_Fire_Points",
        ]
        + [f"Wilderness_Area_{i}" for i in range(4)]
        + [f"Soil_Type_{i}" for i in range(40)]
        + ["Cover_Type"]
    )
    # convert one hot encoded columns to categorical
    df["Wilderness_Area"] = df[[f"Wilderness_Area_{i}" for i in range(4)]].idxmax(axis=1).str.split("_").str[-1]
    df["Soil_Type"] = df[[f"Soil_Type_{i}" for i in range(40)]].idxmax(axis=1).str.split("_").str[-1]
    df.drop(
        [f"Wilderness_Area_{i}" for i in range(4)] + [f"Soil_Type_{i}" for i in range(40)],
        axis=1,
        inplace=True,
    )
    continuous_cols = [
        "Elevation",
        "Aspect",
        "Slope",
        "Horizontal_Distance_To_Hydrology",
        "Vertical_Distance_To_Hydrology",
        "Horizontal_Distance_To_Roadways",
        "Hillshade_9am",
        "Hillshade_Noon",
        "Hillshade_3pm",
        "Horizontal_Distance_To_Fire_Points",
    ]
    categorical_cols = ["Wilderness_Area", "Soil_Type"]
    return df, categorical_cols, continuous_cols, "Cover_Type"

创建一个包含混合数据类型的合成数据集.

Parameters:

Name Type Description Default
task str

可以是 "classification" 或 "regression"

required
n_samples int

要生成的样本数量

required
n_features int

要生成的总特征数量

7
n_categories int

要生成的分类特征数量

2
n_informative int

有用的特征数量

5
random_state int

用于可重复性的随机种子

42
n_targets int

要生成的目标数量.n_targets>1 将生成回归的多目标数据集和分类的多类数据集. 分类默认为2个类别,回归默认为1个目标

None
kwargs

传递给 make_classification 或 make_regression 函数的额外参数

{}
Source code in src/pytorch_tabular/utils/data_utils.py
def make_mixed_dataset(
    task,
    n_samples,
    n_features=7,
    n_categories=2,
    n_informative=5,
    random_state=42,
    n_targets=None,
    **kwargs,
):
    """创建一个包含混合数据类型的合成数据集.

Parameters:
    task (str): 可以是 "classification" 或 "regression"
    n_samples (int): 要生成的样本数量
    n_features (int): 要生成的总特征数量
    n_categories (int): 要生成的分类特征数量
    n_informative (int): 有用的特征数量
    random_state (int): 用于可重复性的随机种子
    n_targets (int): 要生成的目标数量.n_targets>1 将生成回归的多目标数据集和分类的多类数据集.
        分类默认为2个类别,回归默认为1个目标
    kwargs: 传递给 make_classification 或 make_regression 函数的额外参数
"""
    assert n_features >= n_categories, "n_features must be greater than or equal to n_categories"
    assert n_informative <= n_features, "n_informative must be less than or equal to n_features"
    assert task in [
        "classification",
        "regression",
    ], "task must be either classification or regression"
    if n_targets is None:
        n_targets = 1 if task == "regression" else 2
    if task == "classification":
        X, y = make_classification(
            n_samples=n_samples,
            n_features=n_features,
            random_state=random_state,
            n_informative=n_informative,
            n_classes=n_targets,
            **kwargs,
        )
    elif task == "regression":
        X, y = make_regression(
            n_samples=n_samples,
            n_features=n_features,
            random_state=random_state,
            n_informative=n_informative,
            n_targets=n_targets,
            **kwargs,
        )
    cat_cols = random.choices(list(range(X.shape[-1])), k=n_categories)
    num_cols = [i for i in range(X.shape[-1]) if i not in cat_cols]
    for col in cat_cols:
        X[:, col] = pd.qcut(X[:, col], q=4).codes.astype(int)
    col_names = []
    num_col_names = []
    cat_col_names = []
    for i in range(X.shape[-1]):
        if i in cat_cols:
            col_names.append(f"cat_col_{i}")
            cat_col_names.append(f"cat_col_{i}")
        if i in num_cols:
            col_names.append(f"num_col_{i}")
            num_col_names.append(f"num_col_{i}")
    X = pd.DataFrame(X, columns=col_names)
    if n_targets == 1 or task == "classification":
        y = pd.Series(y, name="target")
    else:
        y = pd.DataFrame(y, columns=[f"target_{i}" for i in range(n_targets)])
    if task == "classification":
        y = "class_" + y.astype(str)
    data = X.join(y)
    return data, cat_col_names, num_col_names
Source code in src/pytorch_tabular/utils/data_utils.py
def print_metrics(metrics, y_true, y_pred, tag, return_dict=False):
    if isinstance(y_true, pd.DataFrame) or isinstance(y_true, pd.Series):
        y_true = y_true.values
    if isinstance(y_pred, pd.DataFrame) or isinstance(y_pred, pd.Series):
        y_pred = y_pred.values
    if y_true.ndim > 1:
        y_true = y_true.ravel()
    if y_pred.ndim > 1:
        y_pred = y_pred.ravel()
    print_str_l = []
    res_d = {}
    for metric, name, params in metrics:
        score = metric(y_true, y_pred, **params)
        print_str_l.append(f"{tag} {name}: {score}")
        res_d[name] = score
    print((" | ".join(print_str_l)).strip())
    if return_dict:
        return res_d

神经网络实用工具

Source code in src/pytorch_tabular/utils/nn_utils.py
def _initialize_layers(activation, initialization, layers):
    if type(layers) is nn.Sequential:
        for layer in layers:
            if hasattr(layer, "weight"):
                _initialize_layers(activation, initialization, layer)
    else:
        if activation == "ReLU":
            nonlinearity = "relu"
        elif activation == "LeakyReLU":
            nonlinearity = "leaky_relu"
        else:
            if initialization == "kaiming":
                logger.warning("Kaiming initialization is only recommended for ReLU and" " LeakyReLU.")
                nonlinearity = "leaky_relu"
            else:
                nonlinearity = "relu"

        if initialization == "kaiming":
            nn.init.kaiming_normal_(layers.weight, nonlinearity=nonlinearity)
        elif initialization == "xavier":
            nn.init.xavier_normal_(
                layers.weight,
                gain=(nn.init.calculate_gain(nonlinearity) if activation in ["ReLU", "LeakyReLU"] else 1),
            )
        elif initialization == "random":
            nn.init.normal_(layers.weight)
Source code in src/pytorch_tabular/utils/nn_utils.py
def _initialize_kaiming(x, initialization, d_sqrt_inv):
    if initialization == "kaiming_uniform":
        nn.init.uniform_(x, a=-d_sqrt_inv, b=d_sqrt_inv)
    elif initialization == "kaiming_normal":
        nn.init.normal_(x, std=d_sqrt_inv)
    elif initialization is None:
        pass
    else:
        raise NotImplementedError("initialization should be either of `kaiming_normal`, `kaiming_uniform`," " `None`")
Source code in src/pytorch_tabular/utils/nn_utils.py
def _linear_dropout_bn(activation, initialization, use_batch_norm, in_units, out_units, dropout):
    if isinstance(activation, str):
        _activation = getattr(nn, activation)
    else:
        _activation = activation
    layers = []
    if use_batch_norm:
        from pytorch_tabular.models.common.layers.batch_norm import BatchNorm1d

        layers.append(BatchNorm1d(num_features=in_units))
    linear = nn.Linear(in_units, out_units)
    _initialize_layers(activation, initialization, linear)
    layers.extend([linear, _activation()])
    if dropout != 0:
        layers.append(nn.Dropout(dropout))
    return layers
Source code in src/pytorch_tabular/utils/nn_utils.py
def _make_ix_like(input, dim=0):
    d = input.size(dim)
    rho = torch.arange(1, d + 1, device=input.device, dtype=input.dtype)
    view = [1] * input.dim()
    view[0] = -1
    return rho.view(view).transpose(0, dim)

重置网络中的所有参数.

Parameters:

Name Type Description Default
model Module

要重置参数的模型.

required
参考
  • https://discuss.pytorch.org/t/how-to-re-set-alll-parameters-in-a-network/20819/6
  • https://stackoverflow.com/questions/63627997/reset-parameters-of-a-neural-network-in-pytorch
  • https://pytorch.org/docs/stable/generated/torch.nn.Module.html
Source code in src/pytorch_tabular/utils/nn_utils.py
def reset_all_weights(model: nn.Module) -> None:
    """    重置网络中的所有参数.

Parameters:
    model: 要重置参数的模型.

参考:
    - https://discuss.pytorch.org/t/how-to-re-set-alll-parameters-in-a-network/20819/6
    - https://stackoverflow.com/questions/63627997/reset-parameters-of-a-neural-network-in-pytorch
    - https://pytorch.org/docs/stable/generated/torch.nn.Module.html
"""

    @torch.no_grad()
    def weight_reset(m: nn.Module):
        # - check if the current module has reset_parameters & if it's callabed called it on m
        reset_parameters = getattr(m, "reset_parameters", None)
        if callable(reset_parameters):
            m.reset_parameters()

    # Applies fn recursively to every submodule see: https://pytorch.org/docs/stable/generated/torch.nn.Module.html
    model.apply(fn=weight_reset)

将具有n维的整数转换为具有n+1维的one-hot表示.

n+1维在除第y个索引外的所有位置均为零,而在第y个索引处为1. Parameters: y: 输入整数(IntTensor、LongTensor或Variable),可以是任意形状 depth (int): one-hot维度的尺寸

Source code in src/pytorch_tabular/utils/nn_utils.py
def to_one_hot(y, depth=None):
    r"""    将具有n维的整数转换为具有n+1维的one-hot表示.

n+1维在除第y个索引外的所有位置均为零,而在第y个索引处为1.
Parameters:
    y: 输入整数(IntTensor、LongTensor或Variable),可以是任意形状
    depth (int): one-hot维度的尺寸
"""
    y_flat = y.to(torch.int64).view(-1, 1)
    depth = depth or int(torch.max(y_flat)) + 1
    y_one_hot = torch.zeros(y_flat.size()[0], depth, device=y.device).scatter_(1, y_flat, 1)
    y_one_hot = y_one_hot.view(*(tuple(y.shape) + (-1,)))
    return y_one_hot
Source code in src/pytorch_tabular/utils/nn_utils.py
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

Python 实用工具

Source code in src/pytorch_tabular/utils/python_utils.py
def getattr_nested(_module_src, _model_name):
    module = root_module
    for m in _module_src.split("."):
        module = getattr(module, m)
    return getattr(module, _model_name)
Source code in src/pytorch_tabular/utils/python_utils.py
def ifnone(arg, default_arg):
    return default_arg if arg is None else arg

确保 x 是一个 numpy 数组.

Source code in src/pytorch_tabular/utils/python_utils.py
def check_numpy(x):
    """确保 x 是一个 numpy 数组."""
    if isinstance(x, torch.Tensor):
        x = x.detach().cpu().numpy()
    x = np.asarray(x)
    assert isinstance(x, np.ndarray)
    return x

加载一个检查点.

Parameters:

Name Type Description Default
path_or_url Union[IO, _PATH]

检查点的路径或URL.

required
map_location _MAP_LOCATION_TYPE

一个函数、torch.device、字符串或字典,指定如何重新映射存储位置.

None
Source code in src/pytorch_tabular/utils/python_utils.py
def pl_load(
    path_or_url: Union[IO, _PATH],
    map_location: _MAP_LOCATION_TYPE = None,
) -> Any:
    """加载一个检查点.

Parameters:
    path_or_url: 检查点的路径或URL.
    map_location: 一个函数、``torch.device``、字符串或字典,指定如何重新映射存储位置.
"""
    if not isinstance(path_or_url, (str, Path)):
        # any sort of BytesIO or similar
        return torch.load(path_or_url, map_location=map_location)
    if str(path_or_url).startswith("http"):
        return torch.hub.load_state_dict_from_url(
            str(path_or_url),
            map_location=map_location,  # type: ignore[arg-type] # upstream annotation is not correct
        )
    fs = get_filesystem(path_or_url)
    with fs.open(path_or_url, "rb") as f:
        return torch.load(f, map_location=map_location)
Source code in src/pytorch_tabular/utils/python_utils.py
def generate_doc_dataclass(dataclass, desc=None, width=100):
    if desc is not None:
        doc_str = f"{desc}\nArgs:"
    else:
        doc_str = "Args:"
    for key in dataclass.__dataclass_fields__.keys():
        if key.startswith("_"):  # Skipping private fields
            continue
        atr = dataclass.__dataclass_fields__[key]
        if atr.init:
            type = str(atr.type).replace("<class '", "").replace("'>", "").replace("typing.", "")
            help_str = atr.metadata.get("help", "")
            if "choices" in atr.metadata.keys():
                help_str += ". Choices are:" f" [{','.join(['`'+str(ch)+'`' for ch in atr.metadata['choices']])}]."
            # help_str += f'. Defaults to {atr.default}'
            h_str = textwrap.fill(
                f"{key} ({type}): {help_str}",
                width=width,
                subsequent_indent="\t\t",
                initial_indent="\t",
            )
            h_str = f"\n{h_str}\n"
            doc_str += h_str
    return doc_str
Source code in src/pytorch_tabular/utils/python_utils.py
def suppress_lightning_logs(log_level=None):
    import logging

    log_level = log_level or logging.ERROR
    for logger_name in logging.root.manager.loggerDict:
        if logger_name.startswith("pytorch_lightning") or logger_name.startswith("lightning"):
            logging.getLogger(logger_name).setLevel(log_level)
Source code in src/pytorch_tabular/utils/python_utils.py
def enable_lightning_logs(log_level=None):
    import logging

    log_level = log_level or logging.INFO

    for logger_name in logging.root.manager.loggerDict:
        if logger_name.startswith("pytorch_lightning") or logger_name.startswith("lightning"):
            logging.getLogger(logger_name).setLevel(log_level)
Source code in src/pytorch_tabular/utils/python_utils.py
def int_to_human_readable(number: int, round_number=True) -> str:
    millnames = ["", " T", " M", " B", " T"]
    n = float(number)
    millidx = max(
        0,
        min(
            len(millnames) - 1,
            int(math.floor(0 if n == 0 else math.log10(abs(n)) / 3)),
        ),
    )
    if round_number:
        return f"{int(n / 10 ** (3 * millidx))}{millnames[millidx]}"
    else:
        return f"{n / 10 ** (3 * millidx):.2f}{millnames[millidx]}"