sktime.classification.early_classification._probability_threshold 源代码

"""Probability Threshold Early Classifier.

An early classifier using a prediction probability threshold with a time series
classifier.
"""

__author__ = ["MatthewMiddlehurst"]
__all__ = ["ProbabilityThresholdEarlyClassifier"]

import copy

import numpy as np
from joblib import Parallel, delayed
from sklearn.ensemble import RandomForestClassifier
from sklearn.utils import check_random_state

from sktime.base._base import _clone_estimator
from sktime.classification.base import BaseClassifier
from sktime.classification.interval_based import CanonicalIntervalForest
from sktime.utils.validation.panel import check_X


# TODO: fix this in 0.34.0
# base class should have been changed to BaseEarlyClassifier
[文档]class ProbabilityThresholdEarlyClassifier(BaseClassifier): """Probability Threshold Early Classifier. An early classifier which uses a threshold of prediction probability to determine whether an early prediction is safe or not. Overview: Build n classifiers, where n is the number of classification_points. While a prediction is still deemed unsafe: Make a prediction using the series length at classification point i. Decide whether the predcition is safe or not using decide_prediction_safety. Parameters ---------- probability_threshold : float, default=0.85 The class prediction probability required to deem a prediction as safe. consecutive_predictions : int, default=1 The number of consecutive predictions for a class above the threshold required to deem a prediction as safe. estimator: sktime classifier, default=None An sktime estimator to be built using the transformed data. Defaults to a CanonicalIntervalForest. classification_points : List or None, default=None List of integer time series time stamps to build classifiers and allow predictions at. Early predictions must have a series length that matches a value in the _classification_points List. Duplicate values will be removed, and the full series length will be appended if not present. If None, will use 20 thresholds linearly spaces from 0 to the series length. n_jobs : int, default=1 The number of jobs to run in parallel for both ``fit`` and ``predict``. ``-1`` means using all processors. random_state : int or None, default=None Seed for random number generation. Attributes ---------- n_classes_ : int The number of classes. classes_ : list The unique class labels. Examples -------- >>> from sktime.classification.early_classification import ( ... ProbabilityThresholdEarlyClassifier ... ) >>> from sktime.classification.interval_based import TimeSeriesForestClassifier >>> from sktime.datasets import load_unit_test >>> X_train, y_train = load_unit_test(split="train", return_X_y=True) >>> X_test, y_test = load_unit_test(split="test", return_X_y=True) >>> clf = ProbabilityThresholdEarlyClassifier( ... classification_points=[6, 16, 24], ... estimator=TimeSeriesForestClassifier(n_estimators=10) ... ) >>> clf.fit(X_train, y_train) ProbabilityThresholdEarlyClassifier(...) >>> y_pred = clf.predict(X_test) """ _tags = { "capability:multivariate": True, "capability:multithreading": True, } def __init__( self, probability_threshold=0.85, consecutive_predictions=1, estimator=None, classification_points=None, n_jobs=1, random_state=None, ): self.probability_threshold = probability_threshold self.consecutive_predictions = consecutive_predictions self.estimator = estimator self.classification_points = classification_points self.n_jobs = n_jobs self.random_state = random_state self._estimators = [] self._classification_points = [] super().__init__() def _fit(self, X, y): m = getattr(self.estimator, "predict_proba", None) if not callable(m): raise ValueError("Base estimator must have a predict_proba method.") _, _, series_length = X.shape self._classification_points = ( copy.deepcopy(self.classification_points) if self.classification_points is not None else [round(series_length / i) for i in range(1, 21)] ) # remove duplicates self._classification_points = list(set(self._classification_points)) self._classification_points.sort() # remove classification points that are less than 3 time stamps self._classification_points = [i for i in self._classification_points if i >= 3] # make sure the full series length is included if self._classification_points[-1] != series_length: self._classification_points.append(series_length) # create dictionary of classification point indices self._classification_point_dictionary = {} for index, classification_point in enumerate(self._classification_points): self._classification_point_dictionary[classification_point] = index m = getattr(self.estimator, "n_jobs", None) threads = self._threads_to_use if m is None else 1 self._estimators = Parallel(n_jobs=threads)( delayed(self._fit_estimator)( X, y, i, ) for i in range(len(self._classification_points)) ) return self def _predict(self, X) -> np.ndarray: rng = check_random_state(self.random_state) return np.array( [ self.classes_[int(rng.choice(np.flatnonzero(prob == prob.max())))] for prob in self._predict_proba(X) ] ) def _predict_proba(self, X) -> np.ndarray: _, _, series_length = X.shape idx = self._classification_point_dictionary.get(series_length, -1) if idx == -1: raise ValueError( f"Input series length does not match the classification points produced" f" in fit. Current classification points: {self._classification_points}" ) return self._estimators[idx].predict_proba(X)
[文档] def decide_prediction_safety(self, X, X_probabilities, state_info): """Decide on the safety of an early classification. Parameters ---------- X : 3D np.array (any number of dimensions, equal length series) of shape = [n_instances,n_dimensions,series_length] or pd.DataFrame with each column a dimension, each cell a pd.Series (any number of dimensions, equal or unequal length series). The prediction time series data. X_probabilities : 2D numpy array of shape = [n_instances,n_classes]. The predicted probabilities for X. state_info : List or None A List containing the state info for each decision in X. contains information for future decisions on the data. Inputs should be None for the first decision made, the returned List new_state_info for subsequent decisions. Returns ------- decisions : List A List of booleans, containing the decision of whether a prediction is safe to use or not. new_state_info : List A List containing the state info for each decision in X, contains information for future decisions on the data. """ X = check_X(X, coerce_to_numpy=True) n_instances, _, series_length = X.shape idx = self._classification_point_dictionary.get(series_length, -1) if idx == -1: raise ValueError( f"Input series length does not match the classification points produced" f" in fit. Current classification points: {self._classification_points}" ) # If this is the smallest dataset, there should be no state_info, else we # should have state info for each, and they should all be the same length if state_info is None and ( idx == 0 or idx == len(self._classification_points) - 1 ): state_info = [(0, 0, 0) for _ in range(n_instances)] elif isinstance(state_info, list) and idx > 0: if not all(si[0] == idx for si in state_info): raise ValueError("All input instances must be of the same length.") else: raise ValueError( "state_info should be None for first time input, and a list of " "state_info outputs from the previous decision making for later inputs." ) # if we have the full series, always return true if idx == len(self._classification_points) - 1: return [True for _ in range(n_instances)], None # find predicted class for each instance rng = check_random_state(self.random_state) preds = [ int(rng.choice(np.flatnonzero(prob == prob.max()))) for prob in X_probabilities ] # make a decision based on probability threshold, record consecutive class # decisions decisions = [ X_probabilities[i][preds[i]] >= self.probability_threshold for i in range(n_instances) ] new_state_info = [ ( # next classification point index idx + 1, # consecutive predictions, add one if positive decision and same class ( state_info[i][1] + 1 if decisions[i] and preds[i] == state_info[i][2] # set to 0 if the decision is negative # 1 if its positive but different class else 1 if decisions[i] else 0 ), # predicted class index preds[i], ) for i in range(n_instances) ] # return the safety decisions and new state information for the instances if self.consecutive_predictions < 2: return decisions, new_state_info else: return [ True if new_state_info[i][1] >= self.consecutive_predictions else False for i in range(n_instances) ], new_state_info
def _fit_estimator(self, X, y, i): rs = 255 if self.random_state == 0 else self.random_state rs = None if self.random_state is None else rs * 37 * (i + 1) % 2**31 rng = check_random_state(rs) estimator = _clone_estimator( CanonicalIntervalForest() if self.estimator is None else self.estimator, rng, ) m = getattr(estimator, "n_jobs", None) if m is not None: estimator.n_jobs = self._threads_to_use estimator.fit(X[:, :, : self._classification_points[i]], y) return estimator
[文档] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class. """ from sktime.classification.dummy import DummyClassifier from sktime.classification.feature_based import Catch22Classifier from sktime.utils.dependencies import _check_soft_dependencies if _check_soft_dependencies("numba", severity="none"): est = Catch22Classifier(estimator=RandomForestClassifier(n_estimators=2)) else: est = DummyClassifier() params1 = {"classification_points": [3], "estimator": est} params2 = {"probability_threshold": 0.9, "estimator": est} return [params1, params2]