sktime.forecasting.model_selection._tune 源代码

#!/usr/bin/env python3 -u
# copyright: sktime developers, BSD-3-Clause License (see LICENSE file)
"""Implements grid search functionality to tune forecasters."""

__all__ = [
    "ForecastingGridSearchCV",
    "ForecastingRandomizedSearchCV",
    "ForecastingSkoptSearchCV",
    "ForecastingOptunaSearchCV",
]

from collections.abc import Iterable, Mapping, Sequence
from typing import Optional, Union

import numpy as np
import pandas as pd
from sklearn.model_selection import ParameterGrid, ParameterSampler, check_cv

from sktime.datatypes import mtype_to_scitype
from sktime.exceptions import NotFittedError
from sktime.forecasting.base._delegate import _DelegatedForecaster
from sktime.forecasting.model_evaluation import evaluate
from sktime.performance_metrics.base import BaseMetric
from sktime.split.base import BaseSplitter
from sktime.utils.parallel import parallelize
from sktime.utils.validation.forecasting import check_scoring
from sktime.utils.warnings import warn


class BaseGridSearch(_DelegatedForecaster):
    _tags = {
        "authors": ["mloning", "fkiraly", "aiwalter"],
        "scitype:y": "both",
        "requires-fh-in-fit": False,
        "handles-missing-data": False,
        "ignores-exogeneous-X": True,
        "capability:pred_int": True,
        "capability:pred_int:insample": True,
    }

    def __init__(
        self,
        forecaster,
        cv,
        strategy="refit",
        backend="loky",
        refit=False,
        scoring=None,
        verbose=0,
        return_n_best_forecasters=1,
        update_behaviour="full_refit",
        error_score=np.nan,
        tune_by_instance=False,
        tune_by_variable=False,
        backend_params=None,
        n_jobs="deprecated",
    ):
        self.forecaster = forecaster
        self.cv = cv
        self.strategy = strategy
        self.backend = backend
        self.refit = refit
        self.scoring = scoring
        self.verbose = verbose
        self.return_n_best_forecasters = return_n_best_forecasters
        self.update_behaviour = update_behaviour
        self.error_score = error_score
        self.tune_by_instance = tune_by_instance
        self.tune_by_variable = tune_by_variable
        self.backend_params = backend_params
        self.n_jobs = n_jobs

        super().__init__()

        self._set_delegated_tags(forecaster)

        tags_to_clone = ["y_inner_mtype", "X_inner_mtype"]
        self.clone_tags(forecaster, tags_to_clone)
        self._extend_to_all_scitypes("y_inner_mtype")
        self._extend_to_all_scitypes("X_inner_mtype")

        # this ensures univariate broadcasting over variables
        # if tune_by_variable is True
        if tune_by_variable:
            self.set_tags(**{"scitype:y": "univariate"})

        # todo 0.34.0: check if this is still necessary
        # n_jobs is deprecated, left due to use in tutorials, books, blog posts
        if n_jobs != "deprecated":
            warn(
                f"Parameter n_jobs of {self.__class__.__name__} has been removed "
                "in sktime 0.27.0 and is no longer used. It is ignored when passed. "
                "Instead, the backend and backend_params parameters should be used "
                "to pass n_jobs or other parallelization parameters.",
                obj=self,
                stacklevel=2,
            )

    # attribute for _DelegatedForecaster, which then delegates
    #     all non-overridden methods are same as of getattr(self, _delegate_name)
    #     see further details in _DelegatedForecaster docstring
    _delegate_name = "best_forecaster_"

    def _extend_to_all_scitypes(self, tagname):
        """Ensure mtypes for all scitypes are in the tag with tagname.

        Mutates self tag with name ``tagname``.
        If no mtypes are present of a time series scitype, adds a pandas based one.
        If only univariate pandas scitype is present for Series ("pd.Series"),
        also adds the multivariate one ("pd.DataFrame").

        If tune_by_instance is True, only Series mtypes are added,
        and potentially present Panel or Hierarchical mtypes are removed.

        Parameters
        ----------
        tagname : str, name of the tag. Should be "y_inner_mtype" or "X_inner_mtype".

        Returns
        -------
        None (mutates tag in self)
        """
        tagval = self.get_tag(tagname)
        if not isinstance(tagval, list):
            tagval = [tagval]
        scitypes = mtype_to_scitype(tagval, return_unique=True)
        # if no Series mtypes are present, add pd.DataFrame
        if "Series" not in scitypes:
            tagval = tagval + ["pd.DataFrame"]
        # ensure we have a Series mtype capable of multivariate
        elif "pd.Series" in tagval and "pd.DataFrame" not in tagval:
            tagval = ["pd.DataFrame"] + tagval
        # if no Panel mtypes are present, add pd.DataFrame based one
        if "Panel" not in scitypes:
            tagval = tagval + ["pd-multiindex"]
        # if no Hierarchical mtypes are present, add pd.DataFrame based one
        if "Hierarchical" not in scitypes:
            tagval = tagval + ["pd_multiindex_hier"]

        if self.tune_by_instance:
            tagval = [x for x in tagval if mtype_to_scitype(x) == "Series"]

        self.set_tags(**{tagname: tagval})

    def _get_fitted_params(self):
        """Get fitted parameters.

        Returns
        -------
        fitted_params : dict
            A dict containing the best hyper parameters and the parameters of
            the best estimator (if available), merged together with the former
            taking precedence.
        """
        fitted_params = {}
        try:
            fitted_params = self.best_forecaster_.get_fitted_params()
        except NotImplementedError:
            pass
        fitted_params = {**fitted_params, **self.best_params_}
        fitted_params.update(self._get_fitted_params_default())

        return fitted_params

    def _run_search(self, evaluate_candidates):
        raise NotImplementedError("abstract method")

    def _fit(self, y, X, fh):
        """Fit to training data.

        Parameters
        ----------
        y : pd.Series
            Target time series to which to fit the forecaster.
        fh : int, list or np.array, optional (default=None)
            The forecasters horizon with the steps ahead to to predict.
        X : pd.DataFrame, optional (default=None)
            Exogenous variables are ignored

        Returns
        -------
        self : returns an instance of self.
        """
        cv = check_cv(self.cv)

        scoring = check_scoring(self.scoring, obj=self)
        scoring_name = f"test_{scoring.name}"

        backend = self.backend
        backend_params = self.backend_params if self.backend_params else {}

        def evaluate_candidates(candidate_params):
            candidate_params = list(candidate_params)

            if self.verbose > 0:
                n_candidates = len(candidate_params)
                n_splits = cv.get_n_splits(y)
                print(
                    f"Fitting {n_splits} folds for each of {n_candidates} candidates,"
                    f" totalling {n_candidates * n_splits} fits"
                )

            # Set meta variables for parallelization.
            meta = {}
            meta["forecaster"] = self.forecaster
            meta["y"] = y
            meta["X"] = X
            meta["cv"] = cv
            meta["strategy"] = self.strategy
            meta["scoring"] = scoring
            meta["error_score"] = self.error_score
            meta["scoring_name"] = scoring_name

            out = parallelize(
                fun=_fit_and_score,
                iter=candidate_params,
                meta=meta,
                backend=backend,
                backend_params=backend_params,
            )

            if len(out) < 1:
                raise ValueError(
                    "No fits were performed. "
                    "Was the CV iterator empty? "
                    "Were there no candidates?"
                )

            return out

        # Run grid-search cross-validation.
        results = self._run_search(evaluate_candidates)

        results = pd.DataFrame(results)

        # Rank results, according to whether greater is better for the given scoring.
        results[f"rank_{scoring_name}"] = results.loc[:, f"mean_{scoring_name}"].rank(
            ascending=scoring.get_tag("lower_is_better")
        )

        self.cv_results_ = results

        # Select best parameters.
        self.best_index_ = results.loc[:, f"rank_{scoring_name}"].argmin()
        # Raise error if all fits in evaluate failed because all score values are NaN.
        if self.best_index_ == -1:
            raise NotFittedError(
                f"""All fits of forecaster failed,
                set error_score='raise' to see the exceptions.
                Failed forecaster: {self.forecaster}"""
            )
        self.best_score_ = results.loc[self.best_index_, f"mean_{scoring_name}"]
        self.best_params_ = results.loc[self.best_index_, "params"]
        self.best_forecaster_ = self.forecaster.clone().set_params(**self.best_params_)

        # Refit model with best parameters.
        if self.refit:
            self.best_forecaster_.fit(y=y, X=X, fh=fh)

        # Sort values according to rank
        results = results.sort_values(
            by=f"rank_{scoring_name}",
            ascending=True,
        )
        # Select n best forecaster
        self.n_best_forecasters_ = []
        self.n_best_scores_ = []
        _forecasters_to_return = min(self.return_n_best_forecasters, len(results.index))
        if _forecasters_to_return == -1:
            _forecasters_to_return = len(results.index)
        for i in range(_forecasters_to_return):
            params = results["params"].iloc[i]
            rank = results[f"rank_{scoring_name}"].iloc[i]
            rank = str(int(rank))
            forecaster = self.forecaster.clone().set_params(**params)
            # Refit model with best parameters.
            if self.refit:
                forecaster.fit(y=y, X=X, fh=fh)
            self.n_best_forecasters_.append((rank, forecaster))
            # Save score
            score = results[f"mean_{scoring_name}"].iloc[i]
            self.n_best_scores_.append(score)

        return self

    def _predict(self, fh, X):
        """Forecast time series at future horizon.

        private _predict containing the core logic, called from predict

        State required:
            Requires state to be "fitted".

        Accesses in self:
            Fitted model attributes ending in "_"
            self.cutoff

        Parameters
        ----------
        fh : guaranteed to be ForecastingHorizon or None, optional (default=None)
            The forecasting horizon with the steps ahead to to predict.
            If not passed in _fit, guaranteed to be passed here
        X : pd.DataFrame, optional (default=None)
            Exogenous time series

        Returns
        -------
        y_pred : pd.Series
            Point predictions
        """
        if not self.refit:
            raise RuntimeError(
                f"In {self.__class__.__name__}, refit must be True to make predictions,"
                f" but found refit=False. If refit=False, {self.__class__.__name__} can"
                " be used only to tune hyper-parameters, as a parameter estimator."
            )
        return super()._predict(fh=fh, X=X)

    def _update(self, y, X=None, update_params=True):
        """Update time series to incremental training data.

        Parameters
        ----------
        y : guaranteed to be of a type in self.get_tag("y_inner_mtype")
            Time series with which to update the forecaster.
            if self.get_tag("scitype:y")=="univariate":
                guaranteed to have a single column/variable
            if self.get_tag("scitype:y")=="multivariate":
                guaranteed to have 2 or more columns
            if self.get_tag("scitype:y")=="both": no restrictions apply
        X : optional (default=None)
            guaranteed to be of a type in self.get_tag("X_inner_mtype")
            Exogeneous time series for the forecast
        update_params : bool, optional (default=True)
            whether model parameters should be updated

        Returns
        -------
        self : reference to self
        """
        update_behaviour = self.update_behaviour

        if update_behaviour == "full_refit":
            super()._update(y=y, X=X, update_params=update_params)
        elif update_behaviour == "inner_only":
            self.best_forecaster_.update(y=y, X=X, update_params=update_params)
        elif update_behaviour == "no_update":
            self.best_forecaster_.update(y=y, X=X, update_params=False)
        else:
            raise ValueError(
                'update_behaviour must be one of "full_refit", "inner_only",'
                f' or "no_update", but found {update_behaviour}'
            )
        return self


def _fit_and_score(params, meta):
    """Fit and score forecaster with given parameters.

    Root level function for parallelization, called from
    BaseGridSearchCV._fit, evaluate_candidates, within parallelize.
    """
    meta = meta.copy()
    scoring_name = meta.pop("scoring_name")

    # Set parameters.
    forecaster = meta.pop("forecaster").clone()
    forecaster.set_params(**params)

    # Evaluate.
    out = evaluate(forecaster, **meta)

    # Filter columns.
    out = out.filter(items=[scoring_name, "fit_time", "pred_time"], axis=1)

    # Aggregate results.
    out = out.mean()
    out = out.add_prefix("mean_")

    # Add parameters to output table.
    out["params"] = params

    return out


[文档]class ForecastingGridSearchCV(BaseGridSearch): """Perform grid-search cross-validation to find optimal model parameters. The forecaster is fit on the initial window and then temporal cross-validation is used to find the optimal parameter. Grid-search cross-validation is performed based on a cross-validation iterator encoding the cross-validation scheme, the parameter grid to search over, and (optionally) the evaluation metric for comparing model performance. As in scikit-learn, tuning works through the common hyper-parameter interface which allows to repeatedly fit and evaluate the same forecaster with different hyper-parameters. Parameters ---------- forecaster : sktime forecaster, BaseForecaster instance or interface compatible The forecaster to tune, must implement the sktime forecaster interface. sklearn regressors can be used, but must first be converted to forecasters via one of the reduction compositors, e.g., via ``make_reduction`` cv : cross-validation generator or an iterable e.g. SlidingWindowSplitter() strategy : {"refit", "update", "no-update_params"}, optional, default="refit" data ingestion strategy in fitting cv, passed to ``evaluate`` internally defines the ingestion mode when the forecaster sees new data when window expands "refit" = a new copy of the forecaster is fitted to each training window "update" = forecaster is updated with training window data, in sequence provided "no-update_params" = fit to first training window, re-used without fit or update update_behaviour : str, optional, default = "full_refit" one of {"full_refit", "inner_only", "no_update"} behaviour of the forecaster when calling update "full_refit" = both tuning parameters and inner estimator refit on all data seen "inner_only" = tuning parameters are not re-tuned, inner estimator is updated "no_update" = neither tuning parameters nor inner estimator are updated param_grid : dict or list of dictionaries Model tuning parameters of the forecaster to evaluate scoring : sktime metric (BaseMetric), str, or callable, optional (default=None) scoring metric to use in tuning the forecaster * sktime metric objects (BaseMetric) descendants can be searched with the ``registry.all_estimators`` search utility, for instance via ``all_estimators("metric", as_dataframe=True)`` * If callable, must have signature ``(y_true: 1D np.ndarray, y_pred: 1D np.ndarray) -> float``, assuming np.ndarrays being of the same length, and lower being better. Metrics in sktime.performance_metrics.forecasting are all of this form. * If str, uses registry.resolve_alias to resolve to one of the above. Valid strings are valid registry.craft specs, which include string repr-s of any BaseMetric object, e.g., "MeanSquaredError()"; and keys of registry.ALIAS_DICT referring to metrics. * If None, defaults to MeanAbsolutePercentageError() refit : bool, optional (default=True) True = refit the forecaster with the best parameters on the entire data in fit False = no refitting takes place. The forecaster cannot be used to predict. This is to be used to tune the hyperparameters, and then use the estimator as a parameter estimator, e.g., via get_fitted_params or PluginParamsForecaster. verbose: int, optional (default=0) return_n_best_forecasters : int, default=1 In case the n best forecaster should be returned, this value can be set and the n best forecasters will be assigned to n_best_forecasters_. Set return_n_best_forecasters to -1 to return all forecasters. error_score : numeric value or the str 'raise', optional (default=np.nan) The test score returned when a forecaster fails to be fitted. return_train_score : bool, optional (default=False) backend : {"dask", "loky", "multiprocessing", "threading"}, by default "loky". Runs parallel evaluate if specified and ``strategy`` is set as "refit". - "None": executes loop sequentally, simple list comprehension - "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark`` - "dask": uses ``dask``, requires ``dask`` package in environment Recommendation: Use "dask" or "loky" for parallel evaluate. "threading" is unlikely to see speed ups due to the GIL and the serialization backend (``cloudpickle``) for "dask" and "loky" is generally more robust than the standard ``pickle`` library used in "multiprocessing". error_score : "raise" or numeric, default=np.nan Value to assign to the score if an exception occurs in estimator fitting. If set to "raise", the exception is raised. If a numeric value is given, FitFailedWarning is raised. tune_by_instance : bool, optional (default=False) Whether to tune parameter by each time series instance separately, in case of Panel or Hierarchical data passed to the tuning estimator. Only applies if time series passed are Panel or Hierarchical. If True, clones of the forecaster will be fit to each instance separately, and are available in fields of the forecasters_ attribute. Has the same effect as applying ForecastByLevel wrapper to self. If False, the same best parameter is selected for all instances. tune_by_variable : bool, optional (default=False) Whether to tune parameter by each time series variable separately, in case of multivariate data passed to the tuning estimator. Only applies if time series passed are strictly multivariate. If True, clones of the forecaster will be fit to each variable separately, and are available in fields of the forecasters_ attribute. Has the same effect as applying ColumnEnsembleForecaster wrapper to self. If False, the same best parameter is selected for all variables. backend_params : dict, optional additional parameters passed to the backend as config. Directly passed to ``utils.parallel.parallelize``. Valid keys depend on the value of ``backend``: - "None": no additional parameters, ``backend_params`` is ignored - "loky", "multiprocessing" and "threading": default ``joblib`` backends any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, with the exception of ``backend`` which is directly controlled by ``backend``. If ``n_jobs`` is not passed, it will default to ``-1``, other parameters will default to ``joblib`` defaults. - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark``. any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, ``backend`` must be passed as a key of ``backend_params`` in this case. If ``n_jobs`` is not passed, it will default to ``-1``, other parameters will default to ``joblib`` defaults. - "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler`` Attributes ---------- best_index_ : int best_score_: float Score of the best model best_params_ : dict Best parameter values across the parameter grid best_forecaster_ : estimator Fitted estimator with the best parameters cv_results_ : dict Results from grid search cross validation n_splits_: int Number of splits in the data for cross validation refit_time_ : float Time (seconds) to refit the best forecaster scorer_ : function Function used to score model n_best_forecasters_: list of tuples ("rank", <forecaster>) The "rank" is in relation to best_forecaster_ n_best_scores_: list of float The scores of n_best_forecasters_ sorted from best to worst score of forecasters forecasters_ : pd.DataFramee DataFrame with all fitted forecasters and their parameters. Only present if tune_by_instance=True or tune_by_variable=True, and at least one of the two is applicable. In this case, the other attributes are not present in self, only in the fields of forecasters_. Examples -------- >>> from sktime.datasets import load_shampoo_sales >>> from sktime.forecasting.model_selection import ForecastingGridSearchCV >>> from sktime.split import ExpandingWindowSplitter >>> from sktime.forecasting.naive import NaiveForecaster >>> y = load_shampoo_sales() >>> fh = [1,2,3] >>> cv = ExpandingWindowSplitter(fh=fh) >>> forecaster = NaiveForecaster() >>> param_grid = {"strategy" : ["last", "mean", "drift"]} >>> gscv = ForecastingGridSearchCV( ... forecaster=forecaster, ... param_grid=param_grid, ... cv=cv) >>> gscv.fit(y) ForecastingGridSearchCV(...) >>> y_pred = gscv.predict(fh) Advanced model meta-tuning (model selection) with multiple forecasters together with hyper-parametertuning at same time using sklearn notation: >>> from sktime.datasets import load_shampoo_sales >>> from sktime.forecasting.exp_smoothing import ExponentialSmoothing >>> from sktime.forecasting.naive import NaiveForecaster >>> from sktime.split import ExpandingWindowSplitter >>> from sktime.forecasting.model_selection import ForecastingGridSearchCV >>> from sktime.forecasting.compose import TransformedTargetForecaster >>> from sktime.forecasting.theta import ThetaForecaster >>> from sktime.transformations.series.impute import Imputer >>> y = load_shampoo_sales() >>> pipe = TransformedTargetForecaster(steps=[ ... ("imputer", Imputer()), ... ("forecaster", NaiveForecaster())]) >>> cv = ExpandingWindowSplitter( ... initial_window=24, ... step_length=12, ... fh=[1,2,3]) >>> gscv = ForecastingGridSearchCV( ... forecaster=pipe, ... param_grid=[{ ... "forecaster": [NaiveForecaster(sp=12)], ... "forecaster__strategy": ["drift", "last", "mean"], ... }, ... { ... "imputer__method": ["mean", "drift"], ... "forecaster": [ThetaForecaster(sp=12)], ... }, ... { ... "imputer__method": ["mean", "median"], ... "forecaster": [ExponentialSmoothing(sp=12)], ... "forecaster__trend": ["add", "mul"], ... }, ... ], ... cv=cv, ... ) # doctest: +SKIP >>> gscv.fit(y) # doctest: +SKIP ForecastingGridSearchCV(...) >>> y_pred = gscv.predict(fh=[1,2,3]) # doctest: +SKIP """ def __init__( self, forecaster, cv, param_grid, scoring=None, strategy="refit", refit=True, verbose=0, return_n_best_forecasters=1, backend="loky", update_behaviour="full_refit", error_score=np.nan, tune_by_instance=False, tune_by_variable=False, backend_params=None, n_jobs="deprecated", ): super().__init__( forecaster=forecaster, scoring=scoring, refit=refit, cv=cv, strategy=strategy, verbose=verbose, return_n_best_forecasters=return_n_best_forecasters, backend=backend, update_behaviour=update_behaviour, error_score=error_score, tune_by_instance=tune_by_instance, tune_by_variable=tune_by_variable, backend_params=backend_params, n_jobs=n_jobs, ) self.param_grid = param_grid def _check_param_grid(self, param_grid): """_check_param_grid from sklearn 1.0.2, before it was removed.""" if hasattr(param_grid, "items"): param_grid = [param_grid] for p in param_grid: for name, v in p.items(): if isinstance(v, np.ndarray) and v.ndim > 1: raise ValueError("Parameter array should be one-dimensional.") if isinstance(v, str) or not isinstance(v, (np.ndarray, Sequence)): raise ValueError( f"Parameter grid for parameter ({name}) needs to" f" be a list or numpy array, but got ({type(v)})." " Single values need to be wrapped in a list" " with one element." ) if len(v) == 0: raise ValueError( f"Parameter values for parameter ({name}) need " "to be a non-empty sequence." ) def _run_search(self, evaluate_candidates): """Search all candidates in param_grid.""" self._check_param_grid(self.param_grid) return evaluate_candidates(ParameterGrid(self.param_grid))
[文档] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. Returns ------- params : dict or list of dict """ from sktime.forecasting.naive import NaiveForecaster from sktime.forecasting.trend import PolynomialTrendForecaster from sktime.performance_metrics.forecasting import ( MeanAbsolutePercentageError, mean_absolute_percentage_error, ) from sktime.split import SingleWindowSplitter params = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_grid": {"window_length": [2, 5]}, "scoring": MeanAbsolutePercentageError(symmetric=True), } params2 = { "forecaster": PolynomialTrendForecaster(), "cv": SingleWindowSplitter(fh=1), "param_grid": {"degree": [1, 2]}, "scoring": mean_absolute_percentage_error, "update_behaviour": "inner_only", } params3 = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_grid": {"window_length": [3, 4]}, "scoring": "MeanAbsolutePercentageError(symmetric=True)", "update_behaviour": "no_update", } return [params, params2, params3]
[文档]class ForecastingRandomizedSearchCV(BaseGridSearch): """Perform randomized-search cross-validation to find optimal model parameters. The forecaster is fit on the initial window and then temporal cross-validation is used to find the optimal parameter Randomized cross-validation is performed based on a cross-validation iterator encoding the cross-validation scheme, the parameter distributions to search over, and (optionally) the evaluation metric for comparing model performance. As in scikit-learn, tuning works through the common hyper-parameter interface which allows to repeatedly fit and evaluate the same forecaster with different hyper-parameters. Parameters ---------- forecaster : sktime forecaster, BaseForecaster instance or interface compatible The forecaster to tune, must implement the sktime forecaster interface. sklearn regressors can be used, but must first be converted to forecasters via one of the reduction compositors, e.g., via ``make_reduction`` cv : cross-validation generator or an iterable e.g. SlidingWindowSplitter() strategy : {"refit", "update", "no-update_params"}, optional, default="refit" data ingestion strategy in fitting cv, passed to ``evaluate`` internally defines the ingestion mode when the forecaster sees new data when window expands "refit" = a new copy of the forecaster is fitted to each training window "update" = forecaster is updated with training window data, in sequence provided "no-update_params" = fit to first training window, re-used without fit or update update_behaviour: str, optional, default = "full_refit" one of {"full_refit", "inner_only", "no_update"} behaviour of the forecaster when calling update "full_refit" = both tuning parameters and inner estimator refit on all data seen "inner_only" = tuning parameters are not re-tuned, inner estimator is updated "no_update" = neither tuning parameters nor inner estimator are updated param_distributions : dict or list of dicts Dictionary with parameters names (``str``) as keys and distributions or lists of parameters to try. Distributions must provide a ``rvs`` method for sampling (such as those from scipy.stats.distributions). If a list is given, it is sampled uniformly. If a list of dicts is given, first a dict is sampled uniformly, and then a parameter is sampled using that dict as above. n_iter : int, default=10 Number of parameter settings that are sampled. n_iter trades off runtime vs quality of the solution. scoring : sktime metric (BaseMetric), str, or callable, optional (default=None) scoring metric to use in tuning the forecaster * sktime metric objects (BaseMetric) descendants can be searched with the ``registry.all_estimators`` search utility, for instance via ``all_estimators("metric", as_dataframe=True)`` * If callable, must have signature ``(y_true: 1D np.ndarray, y_pred: 1D np.ndarray) -> float``, assuming np.ndarrays being of the same length, and lower being better. Metrics in sktime.performance_metrics.forecasting are all of this form. * If str, uses registry.resolve_alias to resolve to one of the above. Valid strings are valid registry.craft specs, which include string repr-s of any BaseMetric object, e.g., "MeanSquaredError()"; and keys of registry.ALIAS_DICT referring to metrics. * If None, defaults to MeanAbsolutePercentageError() refit : bool, optional (default=True) True = refit the forecaster with the best parameters on the entire data in fit False = no refitting takes place. The forecaster cannot be used to predict. This is to be used to tune the hyperparameters, and then use the estimator as a parameter estimator, e.g., via get_fitted_params or PluginParamsForecaster. verbose : int, optional (default=0) return_n_best_forecasters: int, default=1 In case the n best forecaster should be returned, this value can be set and the n best forecasters will be assigned to n_best_forecasters_. Set return_n_best_forecasters to -1 to return all forecasters. random_state : int, RandomState instance or None, default=None Pseudo random number generator state used for random uniform sampling from lists of possible values instead of scipy.stats distributions. Pass an int for reproducible output across multiple function calls. backend : {"dask", "loky", "multiprocessing", "threading"}, by default "loky". Runs parallel evaluate if specified and ``strategy`` is set as "refit". - "None": executes loop sequentally, simple list comprehension - "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark`` - "dask": uses ``dask``, requires ``dask`` package in environment Recommendation: Use "dask" or "loky" for parallel evaluate. "threading" is unlikely to see speed ups due to the GIL and the serialization backend (``cloudpickle``) for "dask" and "loky" is generally more robust than the standard ``pickle`` library used in "multiprocessing". error_score : "raise" or numeric, default=np.nan Value to assign to the score if an exception occurs in estimator fitting. If set to "raise", the exception is raised. If a numeric value is given, FitFailedWarning is raised. tune_by_instance : bool, optional (default=False) Whether to tune parameter by each time series instance separately, in case of Panel or Hierarchical data passed to the tuning estimator. Only applies if time series passed are Panel or Hierarchical. If True, clones of the forecaster will be fit to each instance separately, and are available in fields of the forecasters_ attribute. Has the same effect as applying ForecastByLevel wrapper to self. If False, the same best parameter is selected for all instances. tune_by_variable : bool, optional (default=False) Whether to tune parameter by each time series variable separately, in case of multivariate data passed to the tuning estimator. Only applies if time series passed are strictly multivariate. If True, clones of the forecaster will be fit to each variable separately, and are available in fields of the forecasters_ attribute. Has the same effect as applying ColumnEnsembleForecaster wrapper to self. If False, the same best parameter is selected for all variables. backend_params : dict, optional additional parameters passed to the backend as config. Directly passed to ``utils.parallel.parallelize``. Valid keys depend on the value of ``backend``: - "None": no additional parameters, ``backend_params`` is ignored - "loky", "multiprocessing" and "threading": default ``joblib`` backends any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, with the exception of ``backend`` which is directly controlled by ``backend``. If ``n_jobs`` is not passed, it will default to ``-1``, other parameters will default to ``joblib`` defaults. - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark``. any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, ``backend`` must be passed as a key of ``backend_params`` in this case. If ``n_jobs`` is not passed, it will default to ``-1``, other parameters will default to ``joblib`` defaults. - "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler`` Attributes ---------- best_index_ : int best_score_: float Score of the best model best_params_ : dict Best parameter values across the parameter grid best_forecaster_ : estimator Fitted estimator with the best parameters cv_results_ : dict Results from grid search cross validation n_best_forecasters_: list of tuples ("rank", <forecaster>) The "rank" is in relation to best_forecaster_ n_best_scores_: list of float The scores of n_best_forecasters_ sorted from best to worst score of forecasters forecasters_ : pd.DataFramee DataFrame with all fitted forecasters and their parameters. Only present if tune_by_instance=True or tune_by_variable=True, and at least one of the two is applicable. In this case, the other attributes are not present in self, only in the fields of forecasters_. """ def __init__( self, forecaster, cv, param_distributions, n_iter=10, scoring=None, strategy="refit", refit=True, verbose=0, return_n_best_forecasters=1, random_state=None, backend="loky", update_behaviour="full_refit", error_score=np.nan, tune_by_instance=False, tune_by_variable=False, backend_params=None, n_jobs="deprecated", ): super().__init__( forecaster=forecaster, scoring=scoring, strategy=strategy, refit=refit, cv=cv, verbose=verbose, return_n_best_forecasters=return_n_best_forecasters, backend=backend, update_behaviour=update_behaviour, error_score=error_score, tune_by_instance=tune_by_instance, tune_by_variable=tune_by_variable, backend_params=backend_params, n_jobs=n_jobs, ) self.param_distributions = param_distributions self.n_iter = n_iter self.random_state = random_state def _run_search(self, evaluate_candidates): """Search n_iter candidates from param_distributions.""" return evaluate_candidates( ParameterSampler( self.param_distributions, self.n_iter, random_state=self.random_state ) )
[文档] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. Returns ------- params : dict or list of dict """ from sktime.forecasting.naive import NaiveForecaster from sktime.forecasting.trend import PolynomialTrendForecaster from sktime.performance_metrics.forecasting import MeanAbsolutePercentageError from sktime.split import SingleWindowSplitter params = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_distributions": {"window_length": [2, 5]}, "scoring": MeanAbsolutePercentageError(symmetric=True), } params2 = { "forecaster": PolynomialTrendForecaster(), "cv": SingleWindowSplitter(fh=1), "param_distributions": {"degree": [1, 2]}, "scoring": MeanAbsolutePercentageError(symmetric=True), "update_behaviour": "inner_only", } params3 = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_distributions": {"window_length": [3, 4]}, "scoring": "MeanAbsolutePercentageError(symmetric=True)", "update_behaviour": "no_update", } return [params, params2, params3]
[文档]class ForecastingSkoptSearchCV(BaseGridSearch): """Bayesian search over hyperparameters for a forecaster. Experimental: This feature is under development and interface may likely to change. Parameters ---------- forecaster : sktime forecaster, BaseForecaster instance or interface compatible The forecaster to tune, must implement the sktime forecaster interface. sklearn regressors can be used, but must first be converted to forecasters via one of the reduction compositors, e.g., via ``make_reduction`` cv : cross-validation generator or an iterable Splitter used for generating validation folds. e.g. SlidingWindowSplitter() param_distributions : dict or a list of dict/tuple. See below for details. 1. If dict, a dictionary that represents the search space over the parameters of the provided estimator. The keys are parameter names (strings), and the values follows the following format. A list to store categorical parameters and a tuple for integer and real parameters with the following format (int/float, int/float, "prior") e.g (1e-6, 1e-1, "log-uniform"). 2. If a list of dict, each dictionary corresponds to a parameter space, following the same structure described in case 1 above. the search will be performed sequentially for each parameter space, with the number of samples set to n_iter. 3. If a list of tuple, tuple must contain (dict, int) where the int refers to n_iter for that search space. dict must follow the same structure as in case 1. This is useful if you want to perform a search with different number of iterations for each parameter space. n_iter : int, default=10 Number of parameter settings that are sampled. n_iter trades off runtime vs quality of the solution. Consider increasing n_points if you want to try more parameter settings in parallel. n_points : int, default=1 Number of parameter settings to sample in parallel. If this does not align with n_iter, the last iteration will sample less points scoring : sktime metric (BaseMetric), str, or callable, optional (default=None) scoring metric to use in tuning the forecaster * sktime metric objects (BaseMetric) descendants can be searched with the ``registry.all_estimators`` search utility, for instance via ``all_estimators("metric", as_dataframe=True)`` * If callable, must have signature ``(y_true: 1D np.ndarray, y_pred: 1D np.ndarray) -> float``, assuming np.ndarrays being of the same length, and lower being better. Metrics in sktime.performance_metrics.forecasting are all of this form. * If str, uses registry.resolve_alias to resolve to one of the above. Valid strings are valid registry.craft specs, which include string repr-s of any BaseMetric object, e.g., "MeanSquaredError()"; and keys of registry.ALIAS_DICT referring to metrics. * If None, defaults to MeanAbsolutePercentageError() optimizer_kwargs: dict, optional Arguments passed to Optimizer to control the behaviour of the bayesian search. For example, {'base_estimator': 'RF'} would use a Random Forest surrogate instead of the default Gaussian Process. Please refer to the ``skopt.Optimizer`` documentation for more information. random_state : int, RandomState instance or None, default=None Pseudo random number generator state used for random uniform sampling from lists of possible values instead of scipy.stats distributions. Pass an int for reproducible output across multiple function calls. strategy : {"refit", "update", "no-update_params"}, optional, default="refit" data ingestion strategy in fitting cv, passed to ``evaluate`` internally defines the ingestion mode when the forecaster sees new data when window expands "refit" = a new copy of the forecaster is fitted to each training window "update" = forecaster is updated with training window data, in sequence provided "no-update_params" = fit to first training window, re-used without fit or update update_behaviour: str, optional, default = "full_refit" one of {"full_refit", "inner_only", "no_update"} behaviour of the forecaster when calling update "full_refit" = both tuning parameters and inner estimator refit on all data seen "inner_only" = tuning parameters are not re-tuned, inner estimator is updated "no_update" = neither tuning parameters nor inner estimator are updated refit : bool, optional (default=True) True = refit the forecaster with the best parameters on the entire data in fit False = no refitting takes place. The forecaster cannot be used to predict. This is to be used to tune the hyperparameters, and then use the estimator as a parameter estimator, e.g., via get_fitted_params or PluginParamsForecaster. verbose : int, optional (default=0) error_score : "raise" or numeric, default=np.nan Value to assign to the score if an exception occurs in estimator fitting. If set to "raise", the exception is raised. If a numeric value is given, FitFailedWarning is raised. return_n_best_forecasters: int, default=1 In case the n best forecaster should be returned, this value can be set and the n best forecasters will be assigned to n_best_forecasters_. Set return_n_best_forecasters to -1 to return all forecasters. backend : {"dask", "loky", "multiprocessing", "threading"}, by default "loky". Runs parallel evaluate if specified and ``strategy`` is set as "refit". - "None": executes loop sequentally, simple list comprehension - "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark`` - "dask": uses ``dask``, requires ``dask`` package in environment Recommendation: Use "dask" or "loky" for parallel evaluate. "threading" is unlikely to see speed ups due to the GIL and the serialization backend (``cloudpickle``) for "dask" and "loky" is generally more robust than the standard ``pickle`` library used in "multiprocessing". tune_by_instance : bool, optional (default=False) Whether to tune parameter by each time series instance separately, in case of Panel or Hierarchical data passed to the tuning estimator. Only applies if time series passed are Panel or Hierarchical. If True, clones of the forecaster will be fit to each instance separately, and are available in fields of the forecasters_ attribute. Has the same effect as applying ForecastByLevel wrapper to self. If False, the same best parameter is selected for all instances. tune_by_variable : bool, optional (default=False) Whether to tune parameter by each time series variable separately, in case of multivariate data passed to the tuning estimator. Only applies if time series passed are strictly multivariate. If True, clones of the forecaster will be fit to each variable separately, and are available in fields of the forecasters_ attribute. Has the same effect as applying ColumnEnsembleForecaster wrapper to self. If False, the same best parameter is selected for all variables. backend_params : dict, optional additional parameters passed to the backend as config. Directly passed to ``utils.parallel.parallelize``. Valid keys depend on the value of ``backend``: - "None": no additional parameters, ``backend_params`` is ignored - "loky", "multiprocessing" and "threading": default ``joblib`` backends any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, with the exception of ``backend`` which is directly controlled by ``backend``. If ``n_jobs`` is not passed, it will default to ``-1``, other parameters will default to ``joblib`` defaults. - "joblib": custom and 3rd party ``joblib`` backends, e.g., ``spark``. any valid keys for ``joblib.Parallel`` can be passed here, e.g., ``n_jobs``, ``backend`` must be passed as a key of ``backend_params`` in this case. If ``n_jobs`` is not passed, it will default to ``-1``, other parameters will default to ``joblib`` defaults. - "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler`` Attributes ---------- best_index_ : int best_score_: float Score of the best model best_params_ : dict Best parameter values across the parameter grid best_forecaster_ : estimator Fitted estimator with the best parameters cv_results_ : dict Results from grid search cross validation n_best_forecasters_: list of tuples ("rank", <forecaster>) The "rank" is in relation to best_forecaster_ n_best_scores_: list of float The scores of n_best_forecasters_ sorted from best to worst score of forecasters forecasters_ : pd.DataFramee DataFrame with all fitted forecasters and their parameters. Only present if tune_by_instance=True or tune_by_variable=True, and at least one of the two is applicable. In this case, the other attributes are not present in self, only in the fields of forecasters_. Examples -------- >>> from sktime.datasets import load_shampoo_sales >>> from sktime.forecasting.model_selection import ForecastingSkoptSearchCV >>> from sktime.split import ExpandingWindowSplitter >>> from sklearn.ensemble import GradientBoostingRegressor >>> from sktime.forecasting.compose import make_reduction >>> y = load_shampoo_sales() >>> fh = [1,2,3,4] >>> cv = ExpandingWindowSplitter(fh=fh) >>> forecaster = make_reduction(GradientBoostingRegressor(random_state=10)) >>> param_distributions = { ... "estimator__learning_rate" : (1e-4, 1e-1, "log-uniform"), ... "window_length" : (1, 10, "uniform"), ... "estimator__criterion" : ["friedman_mse", "squared_error"]} >>> sscv = ForecastingSkoptSearchCV( ... forecaster=forecaster, ... param_distributions=param_distributions, ... cv=cv, ... n_iter=5, ... random_state=10) # doctest: +SKIP >>> sscv.fit(y) # doctest: +SKIP ForecastingSkoptSearchCV(...) >>> y_pred = sscv.predict(fh) # doctest: +SKIP """ _tags = { "authors": ["HazrulAkmal"], "maintainers": ["HazrulAkmal"], "scitype:y": "both", "requires-fh-in-fit": False, "handles-missing-data": False, "ignores-exogeneous-X": True, "capability:pred_int": True, "capability:pred_int:insample": True, "python_dependencies": ["scikit-optimize"], "python_version": ">= 3.6", } def __init__( self, forecaster, cv: BaseSplitter, param_distributions: Union[dict, list[dict]], n_iter: int = 10, n_points: Optional[int] = 1, random_state: Optional[int] = None, scoring: Optional[list[BaseMetric]] = None, optimizer_kwargs: Optional[dict] = None, strategy: Optional[str] = "refit", refit: bool = True, verbose: int = 0, return_n_best_forecasters: int = 1, backend: str = "loky", update_behaviour: str = "full_refit", error_score=np.nan, tune_by_instance=False, tune_by_variable=False, backend_params=None, n_jobs="deprecated", ): self.param_distributions = param_distributions self.n_iter = n_iter self.n_points = n_points self.random_state = random_state self.optimizer_kwargs = optimizer_kwargs super().__init__( forecaster=forecaster, scoring=scoring, strategy=strategy, refit=refit, cv=cv, verbose=verbose, return_n_best_forecasters=return_n_best_forecasters, backend=backend, update_behaviour=update_behaviour, error_score=error_score, tune_by_instance=tune_by_instance, tune_by_variable=tune_by_variable, backend_params=backend_params, n_jobs=n_jobs, ) def _fit(self, y, X=None, fh=None): """Run fit with all sets of parameters.""" self._check_cv = check_cv(self.cv) self._check_scoring = check_scoring(self.scoring, obj=self) scoring_name = f"test_{self._check_scoring.name}" self._check_search_space(self.param_distributions) self.cv_results_ = pd.DataFrame() self._run_search(y, X) # Rank results, according to whether greater is better for the given scoring. self.cv_results_[f"rank_{scoring_name}"] = self.cv_results_.loc[ :, f"mean_{scoring_name}" ].rank(ascending=self._check_scoring.get_tag("lower_is_better")) results = self.cv_results_ # Select best parameters. self.best_index_ = results.loc[:, f"rank_{scoring_name}"].argmin() # Raise error if all fits in evaluate failed because all score values are NaN. if self.best_index_ == -1: raise NotFittedError( f"""All fits of forecaster failed, set error_score='raise' to see the exceptions. Failed forecaster: {self.forecaster}""" ) self.best_score_ = results.loc[self.best_index_, f"mean_{scoring_name}"] self.best_params_ = results.loc[self.best_index_, "params"] self.best_forecaster_ = self.forecaster.clone().set_params(**self.best_params_) # Refit model with best parameters. if self.refit: self.best_forecaster_.fit(y, X, fh) # Sort values according to rank results = results.sort_values( by=f"rank_{scoring_name}", ascending=True, ) # Select n best forecaster self.n_best_forecasters_ = [] self.n_best_scores_ = [] for i in range(self.return_n_best_forecasters): params = results["params"].iloc[i] rank = results[f"rank_{scoring_name}"].iloc[i] rank = str(int(rank)) forecaster = self.forecaster.clone().set_params(**params) # Refit model with best parameters. if self.refit: forecaster.fit(y, X, fh) self.n_best_forecasters_.append((rank, forecaster)) # Save score score = results[f"mean_{scoring_name}"].iloc[i] self.n_best_scores_.append(score) return self def _run_search(self, y, X=None, fh=None): """Search n_iter candidates from param_distributions. Parameters ---------- y : time series in sktime compatible data container format Target time series to which to fit the forecaster. X : time series in sktime compatible format, optional (default=None) Exogenous variables. fh : int, list, np.array or ForecastingHorizon, optional (default=None) """ # check if space is a single dict, convert to list if so param_distributions = self.param_distributions if isinstance(param_distributions, dict): param_distributions = [param_distributions] if self.optimizer_kwargs is None: self.optimizer_kwargs_ = {} else: self.optimizer_kwargs_ = dict(self.optimizer_kwargs) self.optimizer_kwargs_["random_state"] = self.random_state optimizers = [] mappings = [] for search_space in param_distributions: if isinstance(search_space, tuple): search_space = search_space[0] # hacky approach to handle unhashable type objects if "forecaster" in search_space: forecasters = search_space.get("forecaster") mapping = {num: estimator for num, estimator in enumerate(forecasters)} search_space["forecaster"] = list(mapping.keys()) mappings.append(mapping) else: mappings.append(None) optimizers.append(self._create_optimizer(search_space)) self.optimizers_ = optimizers # will save the states of the optimizers if self.verbose > 0: n_candidates = self.n_iter n_splits = self.cv.get_n_splits(y) print( f"Fitting {n_splits} folds for each of {n_candidates} candidates," f" totalling {n_candidates * n_splits} fits" ) # Run sequential search by iterating through each optimizer and evaluates # the search space iteratively until all n_iter candidates are evaluated. for num, (search_space, optimizer) in enumerate( zip(param_distributions, optimizers) ): # if search subspace n_iter is provided, use it otherwise use self.n_iter if isinstance(search_space, tuple): n_iter = search_space[1] else: n_iter = self.n_iter # iterations for each search space while n_iter > 0: # when n_iter < n_points points left for evaluation n_points_adjusted = min(n_iter, self.n_points) self._evaluate_step( y, X, optimizer, n_points=n_points_adjusted, mapping=mappings[num], ) n_iter -= self.n_points # reset n_iter for next search space n_iter = self.n_iter def _evaluate_step(self, y, X, optimizer, n_points, mapping=None): """Evaluate a candidate parameter set at each iteration. Parameters ---------- y : time series in sktime compatible data container format Target time series to which to fit the forecaster. X : time series in sktime compatible format, optional (default=None) Exogenous variables. optimizer : skopt.Optimizer Optimizer instance. n_points : int Number of candidate parameter combination to evaluate at each step. if n_points=2, then the two candidate parameter combinations are evaluated e.g {'sp': 1, 'strategy':'last'} and {'sp': 2, 'strategy': 'mean'}. mapping : dict, optional (default=None) Mapping of forecaster to estimator instance. """ # Get a list of dimension parameter space with name from optimizer dimensions = optimizer.space.dimensions test_score_name = f"test_{self._check_scoring.name}" # Set meta variables for parallelization. meta = {} meta["forecaster"] = self.forecaster meta["y"] = y meta["X"] = X meta["mapping"] = mapping meta["cv"] = self._check_cv meta["strategy"] = self.strategy meta["scoring"] = self._check_scoring meta["error_score"] = self.error_score meta["test_score_name"] = test_score_name meta["dimensions"] = dimensions candidate_params = optimizer.ask(n_points=n_points) out = parallelize( fun=_fit_and_score_skopt, iter=candidate_params, meta=meta, backend=self.backend, backend_params=self.backend_params, ) # fetch the mean evaluation metrics and feed them back to optimizer results_df = pd.DataFrame(out) # as the optimizer is minimising a score, # we need to negate the score if higher_is_better mean_test_score = results_df["mean_" + test_score_name] if self._check_scoring.get_tag("lower_is_better"): scores = list(mean_test_score) else: scores = list(-mean_test_score) # Update optimizer with evaluation metrics. optimizer.tell(candidate_params, scores) # keep updating the cv_results_ attribute by concatenating the result dataframe self.cv_results_ = pd.concat([self.cv_results_, results_df], ignore_index=True) try: assert len(out) >= 1 except AssertionError: raise ValueError( "No fits were performed. " "Was the CV iterator empty? " "Were there no candidates?" ) def _create_optimizer(self, params_space): """Instantiate optimizer for a search parameter space. Responsible for initialising optimizer with the correct parameters names and values. Parameters ---------- params_space : dict Dictionary with parameters names (string) as keys and the values are instances of skopt.space.Dimension (Real, Integer, or Categorical) Returns ------- optimizer : skopt.Optimizer """ from skopt.optimizer import Optimizer from skopt.utils import dimensions_aslist kwargs = self.optimizer_kwargs_.copy() # convert params space to a list ordered by the key name kwargs["dimensions"] = dimensions_aslist(params_space) dimensions_name = sorted(params_space.keys()) optimizer = Optimizer(**kwargs) # set the name of the dimensions if not set for i in range(len(optimizer.space.dimensions)): if optimizer.space.dimensions[i].name is not None: continue optimizer.space.dimensions[i].name = dimensions_name[i] return optimizer def _check_search_space(self, search_space): """Check whether the search space argument is correct. from skopt.BayesSearchCV._check_search_space """ from skopt.space import check_dimension if len(search_space) == 0: raise ValueError( "The search_spaces parameter should contain at least one" "non-empty search space, got %s" % search_space ) # check if space is a single dict, convert to list if so if isinstance(search_space, dict): search_space = [search_space] # check if the structure of the space is proper if isinstance(search_space, list): # convert to just a list of dicts dicts_only = [] # 1. check the case when a tuple of space, n_iter is provided for elem in search_space: if isinstance(elem, tuple): if len(elem) != 2: raise ValueError( "All tuples in list of search spaces should have" "length 2, and contain (dict, int), got %s" % elem ) subspace, n_iter = elem if (not isinstance(n_iter, int)) or n_iter < 0: raise ValueError( "Number of iterations in search space should be" "positive integer, got %s in tuple %s " % (n_iter, elem) ) # save subspaces here for further checking dicts_only.append(subspace) elif isinstance(elem, dict): dicts_only.append(elem) else: raise TypeError( "A search space should be provided as a dict or" "tuple (dict, int), got %s" % elem ) # 2. check all the dicts for correctness of contents for subspace in dicts_only: for params_name, param_value in subspace.items(): if params_name != "forecaster": check_dimension(param_value) else: raise TypeError( "Search space should be provided as a dict or list of dict," "got %s" % search_space )
[文档] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. Returns ------- params : dict or list of dict """ from sktime.forecasting.naive import NaiveForecaster from sktime.forecasting.trend import PolynomialTrendForecaster from sktime.performance_metrics.forecasting import MeanAbsolutePercentageError from sktime.split import SingleWindowSplitter params = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_distributions": {"window_length": [2, 5]}, "scoring": MeanAbsolutePercentageError(symmetric=True), "n_iter": 1, } params2 = { "forecaster": PolynomialTrendForecaster(), "cv": SingleWindowSplitter(fh=1), "param_distributions": {"degree": [1, 2]}, "scoring": MeanAbsolutePercentageError(symmetric=True), "update_behaviour": "inner_only", "n_iter": 1, } return [params, params2]
def _fit_and_score_skopt(params, meta): from skopt.utils import use_named_args y = meta["y"] X = meta["X"] cv = meta["cv"] mapping = meta["mapping"] strategy = meta["strategy"] scoring = meta["scoring"] error_score = meta["error_score"] dimensions = meta["dimensions"] test_score_name = meta["test_score_name"] @use_named_args(dimensions) # decorator to convert candidate param list to dict def _fit_and_score(**params): # Clone forecaster. forecaster = meta["forecaster"].clone() # map forecaster back to estimator instance if "forecaster" in params: params["forecaster"] = mapping[params["forecaster"]] # Set parameters. forecaster.set_params(**params) # Evaluate. out = evaluate( forecaster=forecaster, cv=cv, y=y, X=X, strategy=strategy, scoring=scoring, error_score=error_score, ) # Filter columns. out = out.filter( items=[test_score_name, "fit_time", "pred_time"], axis=1, ) # Aggregate results. out = out.mean() out = out.add_prefix("mean_") # Add parameters to output table. out["params"] = params return out return _fit_and_score(params)
[文档]class ForecastingOptunaSearchCV(BaseGridSearch): """Perform Optuna search cross-validation to find optimal model hyperparameters. Experimental: This feature is under development and interfaces may change. In ``fit``, this estimator uses the ``optuna`` base search algorithm applied to the ``sktime`` ``evaluate`` benchmarking output. ``param_grid`` is used to parametrize the search space, over parameters of the passed ``forecaster``, via ``set_params``. The remaining parameters are passed directly to ``evaluate``, to obtain the primary optimization outcome as the aggregate ``scoring`` metric specified on the evaluation schema. Parameters ---------- forecaster : sktime forecaster, BaseForecaster instance or interface compatible The forecaster to tune, must implement the sktime forecaster interface. sklearn regressors can be used, but must first be converted to forecasters via one of the reduction compositors, e.g., via ``make_reduction`` cv : cross-validation generator or an iterable Splitter used for generating validation folds. e.g. ExpandingWindowSplitter() param_grid : dict of optuna samplers Dictionary with parameters names as keys and lists of parameter distributions from which to sample parameter values. e.g. {"forecaster": optuna.distributions.CategoricalDistribution( (STLForecaster(), ThetaForecaster())} scoring : sktime metric (BaseMetric), str, or callable, optional (default=None) scoring metric to use in tuning the forecaster * sktime metric objects (BaseMetric) descendants can be searched with the ``registry.all_estimators`` search utility, for instance via ``all_estimators("metric", as_dataframe=True)`` * If callable, must have signature ``(y_true: 1D np.ndarray, y_pred: 1D np.ndarray) -> float``, assuming np.ndarrays being of the same length, and lower being better. Metrics in sktime.performance_metrics.forecasting are all of this form. * If str, uses registry.resolve_alias to resolve to one of the above. Valid strings are valid registry.craft specs, which include string repr-s of any BaseMetric object, e.g., "MeanSquaredError()"; and keys of registry.ALIAS_DICT referring to metrics. * If None, defaults to MeanAbsolutePercentageError() strategy : {"refit", "update", "no-update_params"}, optional, default="refit" data ingestion strategy in fitting cv, passed to ``evaluate`` internally defines the ingestion mode when the forecaster sees new data when window expands "refit" = a new copy of the forecaster is fitted to each training window "update" = forecaster is updated with training window data, in sequence provided "no-update_params" = fit to first training window, re-used without fit or update refit : bool, default=True Refit an estimator using the best found parameters on the whole dataset. verbose : int, default=0 Controls the verbosity: the higher, the more messages. return_n_best_forecasters : int, default=1 Number of best forecasters to return. backend : str, default="loky" Backend to use when running the fit. update_behaviour : str, default="full_refit" Determines how to update the forecaster during fitting. error_score : 'raise' or numeric, default=np.nan Value to assign to the score if an error occurs in estimator fitting. n_evals : int, default=100 Number of parameter settings that are sampled. n_iter trades off runtime vs quality of the solution. sampler : Optuna sampler, optional (default=None) e.g. optuna.samplers.TPESampler(seed=42) Attributes ---------- best_index_ : int best_score_: float Score of the best model best_params_ : dict Best parameter values across the parameter grid best_forecaster_ : estimator Fitted estimator with the best parameters cv_results_ : dict Results from grid search cross validation n_best_forecasters_: list of tuples ("rank", <forecaster>) The "rank" is in relation to best_forecaster_ n_best_scores_: list of float The scores of n_best_forecasters_ sorted from best to worst score of forecasters Examples -------- >>> from sktime.forecasting.model_selection import ( ... ForecastingOptunaSearchCV, ... ) >>> from sktime.datasets import load_shampoo_sales >>> import warnings >>> warnings.simplefilter(action="ignore", category=FutureWarning) >>> from sktime.forecasting.base import ForecastingHorizon >>> from sktime.split import ExpandingWindowSplitter >>> from sktime.split import temporal_train_test_split >>> from sklearn.preprocessing import MinMaxScaler, RobustScaler >>> from sktime.forecasting.compose import TransformedTargetForecaster >>> from sktime.transformations.series.adapt import TabularToSeriesAdaptor >>> from sktime.transformations.series.detrend import Deseasonalizer, Detrender >>> from sktime.forecasting.naive import NaiveForecaster >>> from sktime.forecasting.theta import ThetaForecaster >>> from sktime.forecasting.trend import STLForecaster >>> import optuna >>> from optuna.distributions import CategoricalDistribution >>> y = load_shampoo_sales() >>> y_train, y_test = temporal_train_test_split(y=y, test_size=6) >>> fh = ForecastingHorizon(y_test.index, is_relative=False).to_relative( >>> cutoff=y_train.index[-1] ... ) >>> cv = ExpandingWindowSplitter(fh=fh, initial_window=24, step_length=1) >>> forecaster = TransformedTargetForecaster( >>> steps=[ ... ("detrender", Detrender()), ... ("deseasonalizer", Deseasonalizer()), ... ("scaler", TabularToSeriesAdaptor(RobustScaler())), ... ("minmax2", TabularToSeriesAdaptor(MinMaxScaler((1, 10)))), ... ("forecaster", NaiveForecaster()), ... ] ... ) >>> param_grid = { ... "scaler__transformer__with_scaling": CategoricalDistribution( ... (True, False) ... ), "forecaster": CategoricalDistribution( ... (STLForecaster(), ThetaForecaster()) ... ), ... } >>> gscv = ForecastingOptunaSearchCV( ... forecaster=forecaster, ... param_grid=param_grid, ... cv=cv, ... n_evals=10, ... ) >>> gscv.fit(y) >>> print(f"{gscv.best_params_=}") """ _tags = { "authors": ["gareth-brown-86", "mk406", "bastisar"], "maintainers": ["gareth-brown-86", "mk406"], "scitype:y": "both", "requires-fh-in-fit": False, "handles-missing-data": False, "ignores-exogeneous-X": True, "capability:pred_int": True, "capability:pred_int:insample": True, "python_dependencies": ["optuna"], "python_version": ">= 3.6", } def __init__( self, forecaster, cv, param_grid, scoring=None, strategy="refit", refit=True, verbose=0, return_n_best_forecasters=1, backend="loky", update_behaviour="full_refit", error_score=np.nan, n_evals=100, sampler=None, ): super().__init__( forecaster=forecaster, scoring=scoring, refit=refit, cv=cv, strategy=strategy, verbose=verbose, return_n_best_forecasters=return_n_best_forecasters, backend=backend, update_behaviour=update_behaviour, error_score=error_score, ) self.param_grid = param_grid self.n_evals = n_evals self.sampler = sampler warn( "ForecastingOptunaSearchCV is experimental, and interfaces may change. " "User feedback and suggestions for future development " "are appreciated in issue #6618 here: " "https://github.com/sktime/sktime/issues/6618", obj=self, stacklevel=2, ) def _fit(self, y, X=None, fh=None): cv = check_cv(self.cv) scoring = check_scoring(self.scoring, obj=self) scoring_name = f"test_{scoring.name}" sampler = self.sampler if not isinstance(self.param_grid, (Mapping, Iterable)): raise TypeError( "Parameter distribution is not a dict or a list," f" got: {self.param_grid!r} of type " f"{type(self.param_grid).__name__}" ) if isinstance(self.param_grid, Mapping): # wrap dictionary in a singleton list to support either dict # or list of dicts self._param_grid = [self.param_grid] else: self._param_grid = self.param_grid results = self._run_search( y, X, cv, scoring, scoring_name, sampler, ) results[f"rank_{scoring_name}"] = results[f"mean_{scoring_name}"].rank( ascending=scoring.get_tag("lower_is_better") ) self.cv_results_ = results self.best_index_ = results.loc[:, f"rank_{scoring_name}"].argmin() if self.best_index_ == -1: raise NotFittedError( f"""All fits of forecaster failed, set error_score='raise' to see the exceptions. Failed forecaster: {self.forecaster}""" ) self.best_score_ = results.loc[self.best_index_, f"mean_{scoring_name}"] self.best_params_ = results.loc[self.best_index_, "params"] self.best_forecaster_ = self.forecaster.clone().set_params(**self.best_params_) if self.refit: self.best_forecaster_.fit(y, X, fh) results = results.sort_values( by=f"mean_{scoring_name}", ascending=scoring.get_tag("lower_is_better") ) self.n_best_forecasters_ = [] self.n_best_scores_ = [] for i in range(self.return_n_best_forecasters): params = results["params"].iloc[i] rank = results[f"rank_{scoring_name}"].iloc[i] rank = str(int(rank)) forecaster = self.forecaster.clone().set_params(**params) if self.refit: forecaster.fit(y, X, fh) self.n_best_forecasters_.append((rank, forecaster)) score = results[f"mean_{scoring_name}"].iloc[i] self.n_best_scores_.append(score) return self
[文档] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. Returns ------- params : dict or list of dict """ from sktime.utils.dependencies import _check_soft_dependencies if not _check_soft_dependencies("optuna", severity="none"): return { "forecaster": "foo", "cv": "bar", "param_grid": "foobar", "scoring": "barfoo", } from optuna.distributions import CategoricalDistribution from sktime.forecasting.naive import NaiveForecaster from sktime.forecasting.trend import PolynomialTrendForecaster from sktime.performance_metrics.forecasting import ( MeanAbsolutePercentageError, mean_absolute_percentage_error, ) from sktime.split import SingleWindowSplitter params = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_grid": {"window_length": CategoricalDistribution((2, 5))}, "scoring": MeanAbsolutePercentageError(symmetric=True), } params2 = { "forecaster": PolynomialTrendForecaster(), "cv": SingleWindowSplitter(fh=1), "param_grid": {"degree": CategoricalDistribution((1, 2))}, "scoring": mean_absolute_percentage_error, "update_behaviour": "inner_only", } params3 = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_grid": {"window_length": CategoricalDistribution((3, 4))}, "scoring": "MeanAbsolutePercentageError(symmetric=True)", "update_behaviour": "no_update", } scorer_with_lower_is_better_false = MeanAbsolutePercentageError(symmetric=True) scorer_with_lower_is_better_false.set_tags(**{"lower_is_better": False}) params4 = { "forecaster": NaiveForecaster(strategy="mean"), "cv": SingleWindowSplitter(fh=1), "param_grid": {"window_length": CategoricalDistribution((2, 5))}, "scoring": scorer_with_lower_is_better_false, } return [params, params2, params3, params4]
def _get_score(self, out, scoring_name): return out[f"mean_{scoring_name}"] def _run_search(self, y, X, cv, scoring, scoring_name, sampler): import optuna all_results = [] # List to store results from all parameter grids for ( param_grid_dict ) in self._param_grid: # Assuming self._param_grid is now a list of dicts scoring_direction = ( "minimize" if scoring.get_tag("lower_is_better") else "maximize" ) study = optuna.create_study(direction=scoring_direction, sampler=sampler) meta = {} meta["forecaster"] = self.forecaster meta["y"] = y meta["X"] = X meta["cv"] = cv meta["strategy"] = self.strategy meta["scoring"] = scoring meta["error_score"] = self.error_score meta["scoring_name"] = scoring_name for _ in range(self.n_evals): trial = study.ask(param_grid_dict) params = { name: trial.params[name] for name, v in param_grid_dict.items() } out = _fit_and_score(params, meta) study.tell(trial, self._get_score(out, scoring_name)) params_list = [trial.params for trial in study.trials] results = study.trials_dataframe() # Add the parameters as a new column to the DataFrame results["params"] = params_list all_results.append(results) # Append the results DataFrame to the list # Combine all results into a single DataFrame combined_results = pd.concat(all_results, ignore_index=True) return combined_results.rename(columns={"value": f"mean_{scoring_name}"})