mambular.models.sklearn_base_lss 源代码

import lightning as pl
import pandas as pd
import torch
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.base import BaseEstimator
from sklearn.metrics import accuracy_score
import warnings
from ..base_models.lightning_wrapper import TaskModel
from ..data_utils.datamodule import MambularDataModule
from ..preprocessing import Preprocessor
import numpy as np
from ..utils.distributional_metrics import (
    beta_brier_score,
    dirichlet_error,
    gamma_deviance,
    inverse_gamma_loss,
    negative_binomial_deviance,
    poisson_deviance,
    student_t_loss,
)
from sklearn.metrics import accuracy_score, mean_squared_error
import properscoring as ps
from ..utils.distributions import (
    BetaDistribution,
    CategoricalDistribution,
    DirichletDistribution,
    GammaDistribution,
    InverseGammaDistribution,
    NegativeBinomialDistribution,
    NormalDistribution,
    PoissonDistribution,
    StudentTDistribution,
    Quantile,
)
from lightning.pytorch.callbacks import ModelSummary


[文档]class SklearnBaseLSS(BaseEstimator): def __init__(self, model, config, **kwargs): preprocessor_arg_names = [ "n_bins", "numerical_preprocessing", "use_decision_tree_bins", "binning_strategy", "task", "cat_cutoff", "treat_all_integers_as_numerical", "knots", "degree", ] self.config_kwargs = { k: v for k, v in kwargs.items() if k not in preprocessor_arg_names } self.config = config(**self.config_kwargs) preprocessor_kwargs = { k: v for k, v in kwargs.items() if k in preprocessor_arg_names } self.preprocessor = Preprocessor(**preprocessor_kwargs) self.task_model = None # Raise a warning if task is set to 'classification' if preprocessor_kwargs.get("task") == "classification": warnings.warn( "The task is set to 'classification'. Be aware of your preferred distribution, that this might lead to unsatisfactory results.", UserWarning, ) self.base_model = model
[文档] def get_params(self, deep=True): """ Get parameters for this estimator. Parameters ---------- deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators. Returns ------- params : dict Parameter names mapped to their values. """ params = {} params.update(self.config_kwargs) if deep: preprocessor_params = { "preprocessor__" + key: value for key, value in self.preprocessor.get_params().items() } params.update(preprocessor_params) return params
[文档] def set_params(self, **parameters): """ Set the parameters of this estimator. Parameters ---------- **parameters : dict Estimator parameters. Returns ------- self : object Estimator instance. """ config_params = { k: v for k, v in parameters.items() if not k.startswith("preprocessor__") } preprocessor_params = { k.split("__")[1]: v for k, v in parameters.items() if k.startswith("preprocessor__") } if config_params: self.config_kwargs.update(config_params) if self.config is not None: for key, value in config_params.items(): setattr(self.config, key, value) else: self.config = self.config_class(**self.config_kwargs) if preprocessor_params: self.preprocessor.set_params(**preprocessor_params) return self
[文档] def build_model( self, X, y, val_size: float = 0.2, X_val=None, y_val=None, random_state: int = 101, batch_size: int = 128, shuffle: bool = True, lr: float = 1e-4, lr_patience: int = 10, factor: float = 0.1, weight_decay: float = 1e-06, dataloader_kwargs={}, ): """ Builds the model using the provided training data. Parameters ---------- X : DataFrame or array-like, shape (n_samples, n_features) The training input samples. y : array-like, shape (n_samples,) or (n_samples, n_targets) The target values (real numbers). val_size : float, default=0.2 The proportion of the dataset to include in the validation split if `X_val` is None. Ignored if `X_val` is provided. X_val : DataFrame or array-like, shape (n_samples, n_features), optional The validation input samples. If provided, `X` and `y` are not split and this data is used for validation. y_val : array-like, shape (n_samples,) or (n_samples, n_targets), optional The validation target values. Required if `X_val` is provided. random_state : int, default=101 Controls the shuffling applied to the data before applying the split. batch_size : int, default=64 Number of samples per gradient update. shuffle : bool, default=True Whether to shuffle the training data before each epoch. lr : float, default=1e-3 Learning rate for the optimizer. lr_patience : int, default=10 Number of epochs with no improvement on the validation loss to wait before reducing the learning rate. factor : float, default=0.1 Factor by which the learning rate will be reduced. weight_decay : float, default=0.025 Weight decay (L2 penalty) coefficient. dataloader_kwargs: dict, default={} The kwargs for the pytorch dataloader class. Returns ------- self : object The built distributional regressor. """ if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) if isinstance(y, pd.Series): y = y.values if X_val: if not isinstance(X_val, pd.DataFrame): X_val = pd.DataFrame(X_val) if isinstance(y_val, pd.Series): y_val = y_val.values self.data_module = MambularDataModule( preprocessor=self.preprocessor, batch_size=batch_size, shuffle=shuffle, X_val=X_val, y_val=y_val, val_size=val_size, random_state=random_state, regression=False, **dataloader_kwargs ) self.data_module.preprocess_data( X, y, X_val, y_val, val_size=val_size, random_state=random_state ) self.task_model = TaskModel( model_class=self.base_model, num_classes=self.family.param_count, config=self.config, cat_feature_info=self.data_module.cat_feature_info, num_feature_info=self.data_module.num_feature_info, lr=lr, lr_patience=lr_patience, lr_factor=factor, weight_decay=weight_decay, ) self.built = True return self
[文档] def get_number_of_params(self, requires_grad=True): """ Calculate the number of parameters in the model. Parameters ---------- requires_grad : bool, optional If True, only count the parameters that require gradients (trainable parameters). If False, count all parameters. Default is True. Returns ------- int The total number of parameters in the model. Raises ------ ValueError If the model has not been built prior to calling this method. """ if not self.built: raise ValueError( "The model must be built before the number of parameters can be estimated" ) else: if requires_grad: return sum( p.numel() for p in self.task_model.parameters() if p.requires_grad ) else: return sum(p.numel() for p in self.task_model.parameters())
[文档] def fit( self, X, y, family, val_size: float = 0.2, X_val=None, y_val=None, max_epochs: int = 100, random_state: int = 101, batch_size: int = 128, shuffle: bool = True, patience: int = 15, monitor: str = "val_loss", mode: str = "min", lr: float = 1e-4, lr_patience: int = 10, factor: float = 0.1, weight_decay: float = 1e-06, checkpoint_path="model_checkpoints", distributional_kwargs=None, dataloader_kwargs={}, **trainer_kwargs ): """ Trains the regression model using the provided training data. Optionally, a separate validation set can be used. Parameters ---------- X : DataFrame or array-like, shape (n_samples, n_features) The training input samples. y : array-like, shape (n_samples,) or (n_samples, n_targets) The target values (real numbers). family : str The name of the distribution family to use for the loss function. Examples include 'normal' for regression tasks. val_size : float, default=0.2 The proportion of the dataset to include in the validation split if `X_val` is None. Ignored if `X_val` is provided. X_val : DataFrame or array-like, shape (n_samples, n_features), optional The validation input samples. If provided, `X` and `y` are not split and this data is used for validation. y_val : array-like, shape (n_samples,) or (n_samples, n_targets), optional The validation target values. Required if `X_val` is provided. max_epochs : int, default=100 Maximum number of epochs for training. random_state : int, default=101 Controls the shuffling applied to the data before applying the split. batch_size : int, default=64 Number of samples per gradient update. shuffle : bool, default=True Whether to shuffle the training data before each epoch. patience : int, default=10 Number of epochs with no improvement on the validation loss to wait before early stopping. monitor : str, default="val_loss" The metric to monitor for early stopping. mode : str, default="min" Whether the monitored metric should be minimized (`min`) or maximized (`max`). lr : float, default=1e-3 Learning rate for the optimizer. lr_patience : int, default=10 Number of epochs with no improvement on the validation loss to wait before reducing the learning rate. factor : float, default=0.1 Factor by which the learning rate will be reduced. weight_decay : float, default=0.025 Weight decay (L2 penalty) coefficient. distributional_kwargs : dict, default=None any arguments taht are specific for a certain distribution. checkpoint_path : str, default="model_checkpoints" Path where the checkpoints are being saved. dataloader_kwargs: dict, default={} The kwargs for the pytorch dataloader class. **trainer_kwargs : Additional keyword arguments for PyTorch Lightning's Trainer class. Returns ------- self : object The fitted regressor. """ distribution_classes = { "normal": NormalDistribution, "poisson": PoissonDistribution, "gamma": GammaDistribution, "beta": BetaDistribution, "dirichlet": DirichletDistribution, "studentt": StudentTDistribution, "negativebinom": NegativeBinomialDistribution, "inversegamma": InverseGammaDistribution, "categorical": CategoricalDistribution, "quantile": Quantile, } if distributional_kwargs is None: distributional_kwargs = {} if family in distribution_classes: self.family = distribution_classes[family](**distributional_kwargs) else: raise ValueError("Unsupported family: {}".format(family)) if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) if isinstance(y, pd.Series): y = y.values if X_val: if not isinstance(X_val, pd.DataFrame): X_val = pd.DataFrame(X_val) if isinstance(y_val, pd.Series): y_val = y_val.values self.data_module = MambularDataModule( preprocessor=self.preprocessor, batch_size=batch_size, shuffle=shuffle, X_val=X_val, y_val=y_val, val_size=val_size, random_state=random_state, regression=True, **dataloader_kwargs ) self.data_module.preprocess_data( X, y, X_val, y_val, val_size=val_size, random_state=random_state ) self.task_model = TaskModel( model_class=self.base_model, num_classes=self.family.param_count, family=self.family, config=self.config, cat_feature_info=self.data_module.cat_feature_info, num_feature_info=self.data_module.num_feature_info, lr=lr, lr_patience=lr_patience, lr_factor=factor, weight_decay=weight_decay, lss=True, ) early_stop_callback = EarlyStopping( monitor=monitor, min_delta=0.00, patience=patience, verbose=False, mode=mode ) checkpoint_callback = ModelCheckpoint( monitor="val_loss", # Adjust according to your validation metric mode="min", save_top_k=1, dirpath=checkpoint_path, # Specify the directory to save checkpoints filename="best_model", ) # Initialize the trainer and train the model self.trainer = pl.Trainer( max_epochs=max_epochs, callbacks=[ early_stop_callback, checkpoint_callback, ModelSummary(max_depth=2), ], **trainer_kwargs ) self.trainer.fit(self.task_model, self.data_module) best_model_path = checkpoint_callback.best_model_path if best_model_path: checkpoint = torch.load(best_model_path) self.task_model.load_state_dict(checkpoint["state_dict"]) return self
[文档] def predict(self, X, raw=False): """ Predicts target values for the given input samples. Parameters ---------- X : DataFrame or array-like, shape (n_samples, n_features) The input samples for which to predict target values. Returns ------- predictions : ndarray, shape (n_samples,) or (n_samples, n_outputs) The predicted target values. """ # Ensure model and data module are initialized if self.task_model is None or self.data_module is None: raise ValueError("The model or data module has not been fitted yet.") # Preprocess the data using the data module cat_tensors, num_tensors = self.data_module.preprocess_test_data(X) # Move tensors to appropriate device device = next(self.task_model.parameters()).device if isinstance(cat_tensors, list): cat_tensors = [tensor.to(device) for tensor in cat_tensors] else: cat_tensors = cat_tensors.to(device) if isinstance(num_tensors, list): num_tensors = [tensor.to(device) for tensor in num_tensors] else: num_tensors = num_tensors.to(device) # Set model to evaluation mode self.task_model.eval() # Perform inference with torch.no_grad(): predictions = self.task_model( num_features=num_tensors, cat_features=cat_tensors ) if not raw: return self.task_model.family(predictions).cpu().numpy() # Convert predictions to NumPy array and return else: return predictions.cpu().numpy()
[文档] def evaluate(self, X, y_true, metrics=None, distribution_family=None): """ Evaluate the model on the given data using specified metrics. Parameters ---------- X : array-like or pd.DataFrame of shape (n_samples, n_features) The input samples to predict. y_true : array-like of shape (n_samples,) The true class labels against which to evaluate the predictions. metrics : dict A dictionary where keys are metric names and values are tuples containing the metric function and a boolean indicating whether the metric requires probability scores (True) or class labels (False). distribution_family : str, optional Specifies the distribution family the model is predicting for. If None, it will attempt to infer based on the model's settings. Returns ------- scores : dict A dictionary with metric names as keys and their corresponding scores as values. Notes ----- This method uses either the `predict` or `predict_proba` method depending on the metric requirements. """ # Infer distribution family from model settings if not provided if distribution_family is None: distribution_family = getattr( self.task_model, "distribution_family", "normal" ) # Setup default metrics if none are provided if metrics is None: metrics = self.get_default_metrics(distribution_family) # Make predictions predictions = self.predict(X, raw=False) # Initialize dictionary to store results scores = {} # Compute each metric for metric_name, metric_func in metrics.items(): scores[metric_name] = metric_func(y_true, predictions) return scores
[文档] def get_default_metrics(self, distribution_family): """ Provides default metrics based on the distribution family. Parameters ---------- distribution_family : str The distribution family for which to provide default metrics. Returns ------- metrics : dict A dictionary of default metric functions. """ default_metrics = { "normal": { "MSE": lambda y, pred: mean_squared_error(y, pred[:, 0]), "CRPS": lambda y, pred: np.mean( [ ps.crps_gaussian(y[i], mu=pred[i, 0], sig=np.sqrt(pred[i, 1])) for i in range(len(y)) ] ), }, "poisson": {"Poisson Deviance": poisson_deviance}, "gamma": {"Gamma Deviance": gamma_deviance}, "beta": {"Brier Score": beta_brier_score}, "dirichlet": {"Dirichlet Error": dirichlet_error}, "studentt": {"Student-T Loss": student_t_loss}, "negativebinom": {"Negative Binomial Deviance": negative_binomial_deviance}, "inversegamma": {"Inverse Gamma Loss": inverse_gamma_loss}, "categorical": {"Accuracy": accuracy_score}, } return default_metrics.get(distribution_family, {})
[文档] def score(self, X, y, metric="NLL"): """ Calculate the score of the model using the specified metric. Parameters ---------- X : array-like or pd.DataFrame of shape (n_samples, n_features) The input samples to predict. y : array-like of shape (n_samples,) or (n_samples, n_outputs) The true target values against which to evaluate the predictions. metric : str, default="NLL" So far, only negative log-likelihood is supported Returns ------- score : float The score calculated using the specified metric. """ predictions = self.predict(X) score = self.task_model.family.evaluate_nll(y, predictions) return score