dask_ml.wrappers 源代码

"""Meta-estimators for parallelizing estimators using the scikit-learn API."""

import logging
import warnings

import dask.array as da
import dask.dataframe as dd
import dask.delayed
import numpy as np
import sklearn.base
import sklearn.metrics
from sklearn.utils.validation import check_is_fitted

from dask_ml.utils import _timer, is_frame_base

from ._partial import fit
from ._utils import copy_learned_attributes
from .metrics import check_scoring, get_scorer

logger = logging.getLogger(__name__)


[文档]class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixin): """Meta-estimator for parallel predict and transform. Parameters ---------- estimator : Estimator The underlying estimator that is fit. scoring : string or callable, optional A single string (see :ref:`scoring_parameter`) or a callable (see :ref:`scoring`) to evaluate the predictions on the test set. For evaluating multiple metrics, either give a list of (unique) strings or a dict with names as keys and callables as values. NOTE that when using custom scorers, each scorer should return a single value. Metric functions returning a list/array of values can be wrapped into multiple scorers that return one value each. See :ref:`multimetric_grid_search` for an example. .. warning:: If None, the estimator's default scorer (if available) is used. Most scikit-learn estimators will convert large Dask arrays to a single NumPy array, which may exhaust the memory of your worker. You probably want to always specify `scoring`. predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output type of the estimators ``predict`` call. This meta is necessary for for some estimators to work with ``dask.dataframe`` and ``dask.array`` predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output type of the estimators ``predict_proba`` call. This meta is necessary for for some estimators to work with ``dask.dataframe`` and ``dask.array`` transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output type of the estimators ``transform`` call. This meta is necessary for for some estimators to work with ``dask.dataframe`` and ``dask.array`` Notes ----- .. warning:: This class is not appropriate for parallel or distributed *training* on large datasets. For that, see :class:`Incremental`, which provides distributed (but sequential) training. If you're doing distributed hyperparameter optimization on larger-than-memory datasets, see :class:`dask_ml.model_selection.IncrementalSearch`. This estimator does not parallelize the training step. This simply calls the underlying estimators's ``fit`` method called and copies over the learned attributes to ``self`` afterwards. It is helpful for situations where your training dataset is relatively small (fits on a single machine) but you need to predict or transform a much larger dataset. ``predict``, ``predict_proba`` and ``transform`` will be done in parallel (potentially distributed if you've connected to a ``dask.distributed.Client``). Note that many scikit-learn estimators already predict and transform in parallel. This meta-estimator may still be useful in those cases when your dataset is larger than memory, as the distributed scheduler will ensure the data isn't all read into memory at once. See Also -------- Incremental dask_ml.model_selection.IncrementalSearch Examples -------- >>> from sklearn.ensemble import GradientBoostingClassifier >>> import sklearn.datasets >>> import dask_ml.datasets Make a small 1,000 sample 2 training dataset and fit normally. >>> X, y = sklearn.datasets.make_classification(n_samples=1000, ... random_state=0) >>> clf = ParallelPostFit(estimator=GradientBoostingClassifier(), ... scoring='accuracy') >>> clf.fit(X, y) ParallelPostFit(estimator=GradientBoostingClassifier(...)) >>> clf.classes_ array([0, 1]) Transform and predict return dask outputs for dask inputs. >>> X_big, y_big = dask_ml.datasets.make_classification(n_samples=100000, random_state=0) >>> clf.predict(X) dask.array<predict, shape=(10000,), dtype=int64, chunksize=(1000,)> Which can be computed in parallel. >>> clf.predict_proba(X).compute() array([[0.99141094, 0.00858906], [0.93178389, 0.06821611], [0.99129105, 0.00870895], ..., [0.97996652, 0.02003348], [0.98087444, 0.01912556], [0.99407016, 0.00592984]]) """
[文档] def __init__( self, estimator=None, scoring=None, predict_meta=None, predict_proba_meta=None, transform_meta=None, ): self.estimator = estimator self.scoring = scoring self.predict_meta = predict_meta self.predict_proba_meta = predict_proba_meta self.transform_meta = transform_meta
def _check_array(self, X): """Validate an array for post-fit tasks. Parameters ---------- X : Union[Array, DataFrame] Returns ------- same type as 'X' Notes ----- The following checks are applied. - Ensure that the array is blocked only along the samples. """ if isinstance(X, da.Array): if X.ndim == 2 and X.numblocks[1] > 1: logger.debug("auto-rechunking 'X'") if not np.isnan(X.chunks[0]).any(): X = X.rechunk({0: "auto", 1: -1}) else: X = X.rechunk({1: -1}) return X @property def _postfit_estimator(self): # The estimator instance to use for postfit tasks like score return self.estimator def fit(self, X, y=None, **kwargs): """Fit the underlying estimator. Parameters ---------- X, y : array-like **kwargs Additional fit-kwargs for the underlying estimator. Returns ------- self : object """ logger.info("Starting fit") with _timer("fit", _logger=logger): result = self.estimator.fit(X, y, **kwargs) # Copy over learned attributes copy_learned_attributes(result, self) copy_learned_attributes(result, self.estimator) return self def partial_fit(self, X, y=None, **kwargs): logger.info("Starting partial_fit") with _timer("fit", _logger=logger): result = self.estimator.partial_fit(X, y, **kwargs) # Copy over learned attributes copy_learned_attributes(result, self) copy_learned_attributes(result, self.estimator) return self def transform(self, X): """Transform block or partition-wise for dask inputs. For dask inputs, a dask array or dataframe is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. If the underlying estimator does not have a ``transform`` method, then an ``AttributeError`` is raised. Parameters ---------- X : array-like Returns ------- transformed : array-like """ self._check_method("transform") X = self._check_array(X) meta = self.transform_meta if isinstance(X, da.Array): if meta is None: meta = _get_output_dask_ar_meta_for_estimator( _transform, self._postfit_estimator, X ) return X.map_blocks( _transform, estimator=self._postfit_estimator, meta=meta ) elif is_frame_base(X): if meta is None: # dask-dataframe relies on dd.core.no_default # for infering meta meta = dd.core.no_default return X.map_partitions( _transform, estimator=self._postfit_estimator, meta=meta ) else: return _transform(X, estimator=self._postfit_estimator) def score(self, X, y, compute=True): """Returns the score on the given data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Input data, where n_samples is the number of samples and n_features is the number of features. y : array-like, shape = [n_samples] or [n_samples, n_output], optional Target relative to X for classification or regression; None for unsupervised learning. Returns ------- score : float return self.estimator.score(X, y) """ scoring = self.scoring X = self._check_array(X) y = self._check_array(y) if not scoring: if type(self._postfit_estimator).score == sklearn.base.RegressorMixin.score: scoring = "r2" elif ( type(self._postfit_estimator).score == sklearn.base.ClassifierMixin.score ): scoring = "accuracy" else: scoring = self.scoring if scoring: if not dask.is_dask_collection(X) and not dask.is_dask_collection(y): scorer = sklearn.metrics.get_scorer(scoring) else: scorer = get_scorer(scoring, compute=compute) return scorer(self, X, y) else: return self._postfit_estimator.score(X, y) def predict(self, X): """Predict for X. For dask inputs, a dask array or dataframe is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. Parameters ---------- X : array-like Returns ------- y : array-like """ self._check_method("predict") X = self._check_array(X) meta = self.predict_meta if isinstance(X, da.Array): if meta is None: meta = _get_output_dask_ar_meta_for_estimator( _predict, self._postfit_estimator, X ) result = X.map_blocks( _predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta ) return result elif is_frame_base(X): if meta is None: meta = dd.core.no_default return X.map_partitions( _predict, estimator=self._postfit_estimator, meta=meta ) else: return _predict(X, estimator=self._postfit_estimator) def predict_proba(self, X): """Probability estimates. For dask inputs, a dask array or dataframe is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. If the underlying estimator does not have a ``predict_proba`` method, then an ``AttributeError`` is raised. Parameters ---------- X : array or dataframe Returns ------- y : array-like """ X = self._check_array(X) self._check_method("predict_proba") meta = self.predict_proba_meta if isinstance(X, da.Array): if meta is None: meta = _get_output_dask_ar_meta_for_estimator( _predict_proba, self._postfit_estimator, X ) # XXX: multiclass return X.map_blocks( _predict_proba, estimator=self._postfit_estimator, meta=meta, chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), ) elif is_frame_base(X): if meta is None: meta = dd.core.no_default return X.map_partitions( _predict_proba, estimator=self._postfit_estimator, meta=meta ) else: return _predict_proba(X, estimator=self._postfit_estimator) def predict_log_proba(self, X): """Log of probability estimates. For dask inputs, a dask array or dataframe is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. If the underlying estimator does not have a ``predict_proba`` method, then an ``AttributeError`` is raised. Parameters ---------- X : array or dataframe Returns ------- y : array-like """ self._check_method("predict_log_proba") return da.log(self.predict_proba(X)) def _check_method(self, method): """Check if self.estimator has 'method'. Raises ------ AttributeError """ estimator = self._postfit_estimator if not hasattr(estimator, method): msg = "The wrapped estimator '{}' does not have a '{}' method.".format( estimator, method ) raise AttributeError(msg) return getattr(estimator, method)
[文档]class Incremental(ParallelPostFit): """Metaestimator for feeding Dask Arrays to an estimator blockwise. This wrapper provides a bridge between Dask objects and estimators implementing the ``partial_fit`` API. These *incremental learners* can train on batches of data. This fits well with Dask's blocked data structures. .. note:: This meta-estimator is not appropriate for hyperparameter optimization on larger-than-memory datasets. For that, see :class:`~dask_ml.model_selection.IncrementalSearchCV` or :class:`~dask_ml.model_selection.HyperbandSearchCV`. See the `list of incremental learners`_ in the scikit-learn documentation for a list of estimators that implement the ``partial_fit`` API. Note that `Incremental` is not limited to just these classes, it will work on any estimator implementing ``partial_fit``, including those defined outside of scikit-learn itself. Calling :meth:`Incremental.fit` with a Dask Array will pass each block of the Dask array or arrays to ``estimator.partial_fit`` *sequentially*. Like :class:`ParallelPostFit`, the methods available after fitting (e.g. :meth:`Incremental.predict`, etc.) are all parallel and delayed. The ``estimator_`` attribute is a clone of `estimator` that was actually used during the call to ``fit``. All attributes learned during training are available on ``Incremental`` directly. .. _list of incremental learners: https://scikit-learn.org/stable/modules/computing.html#incremental-learning # noqa Parameters ---------- estimator : Estimator Any object supporting the scikit-learn ``partial_fit`` API. scoring : string or callable, optional A single string (see :ref:`scoring_parameter`) or a callable (see :ref:`scoring`) to evaluate the predictions on the test set. For evaluating multiple metrics, either give a list of (unique) strings or a dict with names as keys and callables as values. NOTE that when using custom scorers, each scorer should return a single value. Metric functions returning a list/array of values can be wrapped into multiple scorers that return one value each. See :ref:`multimetric_grid_search` for an example. .. warning:: If None, the estimator's default scorer (if available) is used. Most scikit-learn estimators will convert large Dask arrays to a single NumPy array, which may exhaust the memory of your worker. You probably want to always specify `scoring`. random_state : int or numpy.random.RandomState, optional Random object that determines how to shuffle blocks. shuffle_blocks : bool, default True Determines whether to call ``partial_fit`` on a randomly selected chunk of the Dask arrays (default), or to fit in sequential order. This does not control shuffle between blocks or shuffling each block. predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output type of the estimators ``predict`` call. This meta is necessary for for some estimators to work with ``dask.dataframe`` and ``dask.array`` predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output type of the estimators ``predict_proba`` call. This meta is necessary for for some estimators to work with ``dask.dataframe`` and ``dask.array`` transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output type of the estimators ``transform`` call. This meta is necessary for for some estimators to work with ``dask.dataframe`` and ``dask.array`` Attributes ---------- estimator_ : Estimator A clone of `estimator` that was actually fit during the ``.fit`` call. See Also -------- ParallelPostFit dask_ml.model_selection.IncrementalSearchCV Examples -------- >>> from dask_ml.wrappers import Incremental >>> from dask_ml.datasets import make_classification >>> import sklearn.linear_model >>> X, y = make_classification(chunks=25) >>> est = sklearn.linear_model.SGDClassifier() >>> clf = Incremental(est, scoring='accuracy') >>> clf.fit(X, y, classes=[0, 1]) When used inside a grid search, prefix the underlying estimator's parameter names with ``estimator__``. >>> from sklearn.model_selection import GridSearchCV >>> param_grid = {"estimator__alpha": [0.1, 1.0, 10.0]} >>> gs = GridSearchCV(clf, param_grid) >>> gs.fit(X, y, classes=[0, 1]) """
[文档] def __init__( self, estimator=None, scoring=None, shuffle_blocks=True, random_state=None, assume_equal_chunks=True, predict_meta=None, predict_proba_meta=None, transform_meta=None, ): self.shuffle_blocks = shuffle_blocks self.random_state = random_state self.assume_equal_chunks = assume_equal_chunks super(Incremental, self).__init__( estimator=estimator, scoring=scoring, predict_meta=predict_meta, predict_proba_meta=predict_proba_meta, transform_meta=transform_meta, )
@property def _postfit_estimator(self): check_is_fitted(self, "estimator_") return self.estimator_ def _fit_for_estimator(self, estimator, X, y, **fit_kwargs): check_scoring(estimator, self.scoring) if not dask.is_dask_collection(X) and not dask.is_dask_collection(y): result = estimator.partial_fit(X=X, y=y, **fit_kwargs) else: result = fit( estimator, X, y, random_state=self.random_state, shuffle_blocks=self.shuffle_blocks, assume_equal_chunks=self.assume_equal_chunks, **fit_kwargs, ) copy_learned_attributes(result, self) self.estimator_ = result return self def fit(self, X, y=None, **fit_kwargs): estimator = sklearn.base.clone(self.estimator) self._fit_for_estimator(estimator, X, y, **fit_kwargs) return self def partial_fit(self, X, y=None, **fit_kwargs): """Fit the underlying estimator. If this estimator has not been previously fit, this is identical to :meth:`Incremental.fit`. If it has been previously fit, ``self.estimator_`` is used as the starting point. Parameters ---------- X, y : array-like **kwargs Additional fit-kwargs for the underlying estimator. Returns ------- self : object """ estimator = getattr(self, "estimator_", None) if estimator is None: estimator = sklearn.base.clone(self.estimator) return self._fit_for_estimator(estimator, X, y, **fit_kwargs)
def _first_block(dask_object): """Extract the first block / partition from a dask object""" if isinstance(dask_object, da.Array): if dask_object.ndim > 1 and dask_object.numblocks[-1] != 1: raise NotImplementedError( "IID estimators require that the array " "blocked only along the first axis. " "Rechunk your array before fitting." ) shape = (dask_object.chunks[0][0],) if dask_object.ndim > 1: shape = shape + (dask_object.chunks[1][0],) return da.from_delayed( dask_object.to_delayed().flatten()[0], shape, dask_object.dtype ) if is_frame_base(dask_object): return dask_object.get_partition(0) else: return dask_object def _predict(part, estimator): return estimator.predict(part) def _predict_proba(part, estimator): return estimator.predict_proba(part) def _transform(part, estimator): return estimator.transform(part) def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): """ Returns the output metadata array for the model function (predict, transform etc) by running the appropriate function on dummy data of shape (1, n_features) Parameters ---------- model_fun: Model function _predict, _transform etc estimator : Estimator The underlying estimator that is fit. input_dask_ar: The input dask_array Returns ------- metadata: metadata of output dask array """ # sklearn fails if input array has size size # It requires at least 1 sample to run successfully input_meta = input_dask_ar._meta if hasattr(input_meta, "__array_function__"): ar = np.zeros( shape=(1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype, like=input_meta, ) elif "scipy.sparse" in type(input_meta).__module__: # sparse matrices dont support # `like` due to non implimented __array_function__ # Refer https://github.com/scipy/scipy/issues/10362 # Note below works for both cupy and scipy sparse matrices ar = type(input_meta)((1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype) else: func_name = model_fn.__name__.strip("_") msg = ( f"Metadata for {func_name} is not provided, so Dask is " f"running the {func_name} " "function on a small dataset to guess output metadata. " "As a result, It is possible that Dask will guess incorrectly.\n" "To silence this warning, provide explicit " f"`{func_name}_meta` to the dask_ml.wrapper." "\nExample: \n" "wrap_clf = dask_ml.wrappers.Incremental(GradientBoostingClassifier(), " f"{func_name}_meta = np.array([1],dtype=np.int8))" ) warnings.warn(msg) ar = np.zeros(shape=(1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype) return model_fn(ar, estimator)