"""Utilities for splitting datasets."""
import itertools
import logging
import numbers
import warnings
import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
import sklearn.model_selection as ms
from sklearn.model_selection._split import BaseCrossValidator, _validate_shuffle_split
from sklearn.utils import check_random_state
from dask_ml._compat import DASK_2130, DASK_VERSION
from dask_ml.utils import check_array, check_matching_blocks
from .._utils import draw_seed
logger = logging.getLogger(__name__)
_I4MAX = np.iinfo("i4").max
def _check_blockwise(blockwise):
if blockwise not in {True, False}:
raise ValueError(
"Expected a boolean for 'blockwise " "but got {} instead".format(blockwise)
)
return blockwise
def _maybe_normalize_split_sizes(train_size, test_size):
# adopt scikit-learn's new behavior (complement) now.
if train_size is None and test_size is None:
msg = "test_size and train_size can not both be None"
raise ValueError(msg)
elif any(isinstance(x, numbers.Integral) for x in (train_size, test_size)):
raise ValueError(
"Dask-ML does not support absolute sizes for "
"'train_size' and 'test_size'. Use floats between "
"0 and 1 to specify the fraction of each block "
"that should go to the train and test set."
)
if train_size is not None:
if train_size < 0 or train_size > 1:
raise ValueError(
"'train_size' must be between 0 and 1. " "Got {}".format(train_size)
)
if test_size is None:
test_size = 1 - train_size
if test_size is not None:
if test_size < 0 or test_size > 1:
raise ValueError(
"'test_size' must be between 0 and 1. " "Got {}".format(test_size)
)
if train_size is None:
train_size = 1 - test_size
if abs(1 - (train_size + test_size)) > 0.001:
raise ValueError(
"The sum of 'train_size' and 'test_size' must be 1. "
"train_size: {} test_size: {}".format(train_size, test_size)
)
return train_size, test_size
def _generate_idx(n, seed, n_train, n_test):
"""Generate train, test indices for a length-n array.
Parameters
----------
n : int
The length of the array
seed : int
Seed for a RandomState
n_train, n_test : int, 0 < n_train, n_test < n
Number of samples to use for the train or
test index.
Notes
-----
"""
idx = check_random_state(seed).permutation(n)
ind_test = idx[:n_test]
ind_train = idx[n_test : n_train + n_test]
return ind_train, ind_test
[文档]class ShuffleSplit(BaseCrossValidator):
"""Random permutation cross-validator.
Yields indices to split data into training and test sets.
.. warning::
By default, this performs a blockwise-shuffle. That is,
each block is shuffled internally, but data are not shuffled
between blocks. If your data is ordered, then set ``blockwise=False``.
Note: contrary to other cross-validation strategies, random splits
do not guarantee that all folds will be different, although this is
still very likely for sizeable datasets.
Parameters
----------
n_splits : int, default 10
Number of re-shuffling & splitting iterations.
test_size : float, int, None, default=0.1
If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split. If int, represents the
absolute number of test samples. If None, the value is set to the
complement of the train size.
train_size : float, int, or None, default=None
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set to the complement of the test size.
blockwise : bool, default True
Whether to shuffle data only within blocks (True), or allow data to
be shuffled between blocks (False). Shuffling between blocks can
be much more expensive, especially in distributed environments.
random_state : int, RandomState instance or None, optional (default=None)
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
"""
[文档] def __init__(
self,
n_splits=10,
test_size=0.1,
train_size=None,
blockwise=True,
random_state=None,
):
self.n_splits = n_splits
self.test_size = test_size
self.train_size = train_size
self.random_state = random_state
self.blockwise = _check_blockwise(blockwise)
def split(self, X, y=None, groups=None):
X = check_array(X, ensure_2d=False, allow_nd=True)
rng = check_random_state(self.random_state)
for i in range(self.n_splits):
seeds = draw_seed(rng, 0, _I4MAX, size=len(X.chunks[0]), dtype="uint")
if self.blockwise:
yield self._split_blockwise(X, seeds)
else:
yield self._split(X)
def _split_blockwise(self, X, seeds):
chunks = X.chunks[0]
train_pct, test_pct = _maybe_normalize_split_sizes(
self.train_size, self.test_size
)
sizes = [_validate_shuffle_split(c, test_pct, train_pct) for c in chunks]
objs = [
dask.delayed(_generate_idx, nout=2)(chunksize, seed, n_train, n_test)
for chunksize, seed, (n_train, n_test) in zip(chunks, seeds, sizes)
]
train_objs, test_objs = zip(*objs)
offsets = np.hstack([0, np.cumsum(chunks)])
train_idx = da.concatenate(
[
da.from_delayed(x + offset, (train_size,), np.dtype("int"))
for x, chunksize, (train_size, _), offset in zip(
train_objs, chunks, sizes, offsets
)
]
)
test_idx = da.concatenate(
[
da.from_delayed(x + offset, (test_size,), np.dtype("int"))
for x, chunksize, (_, test_size), offset in zip(
test_objs, chunks, sizes, offsets
)
]
)
return train_idx, test_idx
def _split(self, X):
raise NotImplementedError(
"ShuffleSplit with `blockwise=False` has " "not been implemented yet."
)
def get_n_splits(self, X=None, y=None, groups=None):
return self.n_splits
def _generate_offset_idx(n, start, stop, offset, seed):
if seed is not None:
idx = check_random_state(seed).permutation(n)
else:
idx = np.arange(n)
return idx[start - offset : stop - offset] + offset
[文档]class KFold(BaseCrossValidator):
"""K-Folds cross-validator
Provides train/test indices to split data in train/test sets. Split
dataset into k consecutive folds (without shuffling by default).
Each fold is then used once as a validation while the k - 1 remaining
folds form the training set.
Parameters
----------
n_splits : int, default=5
Number of folds. Must be at least 2.
shuffle : boolean, optional
Whether to shuffle the data before splitting into batches.
random_state : int, RandomState instance or None, optional, default=None
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`. Used when ``shuffle`` == True.
"""
[文档] def __init__(self, n_splits=5, shuffle=False, random_state=None):
self.n_splits = n_splits
self.shuffle = shuffle
self.random_state = random_state
def split(self, X, y=None, groups=None):
X = check_array(X)
n_samples = X.shape[0]
n_splits = self.n_splits
fold_sizes = np.full(n_splits, n_samples // n_splits, dtype=int)
fold_sizes[: n_samples % n_splits] += 1
chunks = X.chunks[0]
seeds = [None] * len(chunks)
if self.shuffle:
rng = check_random_state(self.random_state)
seeds = draw_seed(rng, 0, _I4MAX, size=len(chunks), dtype="uint")
test_current = 0
for fold_size in fold_sizes:
test_start, test_stop = test_current, test_current + fold_size
yield self._split(test_start, test_stop, n_samples, chunks, seeds)
test_current = test_stop
def _split(self, test_start, test_stop, n_samples, chunks, seeds):
train_objs = []
test_objs = []
train_sizes = []
test_sizes = []
offset = 0
for chunk, seed in zip(chunks, seeds):
start, stop = offset, offset + chunk
test_id_start = max(test_start, start)
test_id_stop = min(test_stop, stop)
if test_id_start < test_id_stop:
test_objs.append(
dask.delayed(_generate_offset_idx)(
chunk, test_id_start, test_id_stop, offset, seed
)
)
test_sizes.append(test_id_stop - test_id_start)
train_id_stop = min(test_id_start, stop)
if train_id_stop > start:
train_objs.append(
dask.delayed(_generate_offset_idx)(
chunk, start, train_id_stop, offset, seed
)
)
train_sizes.append(train_id_stop - start)
train_id_start = max(test_id_stop, start)
if train_id_start < stop:
train_objs.append(
dask.delayed(_generate_offset_idx)(
chunk, train_id_start, stop, offset, seed
)
)
train_sizes.append(stop - train_id_start)
offset = stop
train_idx = da.concatenate(
[
da.from_delayed(obj, (train_size,), np.dtype("int"))
for obj, train_size in zip(train_objs, train_sizes)
]
)
test_idx = da.concatenate(
[
da.from_delayed(obj, (test_size,), np.dtype("int"))
for obj, test_size in zip(test_objs, test_sizes)
]
)
return train_idx, test_idx
def get_n_splits(self, X=None, y=None, groups=None):
return self.n_splits
def _blockwise_slice(arr, idx):
"""Slice an array that is blockwise-aligned with idx.
Parameters
----------
arr : Dask array
idx : Dask array
Should have the following properties
* Same blocks as `arr` along the first dimension
* Contains only integers
* Each block's values should be between ``[0, len(block))``
Returns
-------
sliced : dask.Array
"""
objs = []
offsets = np.hstack([0, np.cumsum(arr.chunks[0])[:-1]])
for i, (x, idx2) in enumerate(
zip(arr.to_delayed().ravel(), idx.to_delayed().ravel())
):
idx3 = idx2 - offsets[i]
objs.append(x[idx3])
shapes = idx.chunks[0]
if arr.ndim == 2:
P = arr.shape[1]
shapes = [(x, P) for x in shapes]
else:
shapes = [(x,) for x in shapes]
sliced = da.concatenate(
[
da.from_delayed(x, shape=shape, dtype=arr.dtype)
for x, shape in zip(objs, shapes)
]
)
return sliced
[文档]def train_test_split(
*arrays,
test_size=None,
train_size=None,
random_state=None,
shuffle=None,
blockwise=True,
convert_mixed_types=False,
**options,
):
"""Split arrays into random train and test matrices.
Parameters
----------
*arrays : Sequence of Dask Arrays, DataFrames, or Series
Non-dask objects will be passed through to
:func:`sklearn.model_selection.train_test_split`.
test_size : float or int, default 0.1
train_size : float or int, optional
random_state : int, RandomState instance or None, optional (default=None)
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
shuffle : bool, default None
Whether to shuffle the data before splitting.
blockwise : bool, default True.
Whether to shuffle data only within blocks (True), or allow data to
be shuffled between blocks (False). Shuffling between blocks can
be much more expensive, especially in distributed environments.
The default is ``True``, data are only shuffled within blocks.
For Dask Arrays, set ``blockwise=False`` to shuffle data between
blocks as well. For Dask DataFrames, ``blockwise=False`` is not
currently supported and a ``ValueError`` will be raised.
convert_mixed_types : bool, default False
Whether to convert dask DataFrames and Series to dask Arrays when
arrays contains a mixture of types. This results in some computation
to determine the length of each block.
Returns
-------
splitting : list, length=2 * len(arrays)
List containing train-test split of inputs
Examples
--------
>>> import dask.array as da
>>> from dask_ml.datasets import make_regression
>>> X, y = make_regression(n_samples=125, n_features=4, chunks=50,
... random_state=0)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
... random_state=0)
>>> X_train
dask.array<concatenate, shape=(113, 4), dtype=float64, chunksize=(45, 4)>
>>> X_train.compute()[:2]
array([[ 0.12372191, 0.58222459, 0.92950511, -2.09460307],
[ 0.99439439, -0.70972797, -0.27567053, 1.73887268]])
"""
if train_size is None and test_size is None:
# all other validation done elsewhere.
test_size = 0.1
if train_size is None and test_size is not None:
train_size = 1 - test_size
if test_size is None and train_size is not None:
test_size = 1 - train_size
if options:
raise TypeError("Unexpected options {}".format(options))
types = set(type(arr) for arr in arrays)
if da.Array in types and types & {dd.Series, dd.DataFrame}:
if convert_mixed_types:
arrays = tuple(
(
x.to_dask_array(lengths=True)
if isinstance(x, (dd.Series, dd.DataFrame))
else x
)
for x in arrays
)
else:
raise TypeError(
"Got mixture of dask DataFrames and Arrays. Specify "
"'convert_mixed_types=True'"
)
if all(isinstance(arr, (dd.Series, dd.DataFrame)) for arr in arrays):
check_matching_blocks(*arrays)
if blockwise is False:
raise NotImplementedError(
"'blockwise=False' is not currently supported for dask DataFrames."
)
rng = check_random_state(random_state)
rng = draw_seed(rng, 0, _I4MAX, dtype="uint")
if DASK_2130:
if shuffle is None:
shuffle = False
warnings.warn(
message="The default value for 'shuffle' must be specified"
" when splitting DataFrames. In the future"
" DataFrames will automatically be shuffled within"
" blocks prior to splitting. Specify 'shuffle=True'"
" to adopt the future behavior now, or 'shuffle=False'"
" to retain the previous behavior.",
category=FutureWarning,
)
kwargs = {"shuffle": shuffle}
else:
if shuffle is None:
shuffle = True
if not shuffle:
raise NotImplementedError(
f"'shuffle=False' is not supported for DataFrames in"
f" dask versions<2.13.0. Current version is {DASK_VERSION}."
)
kwargs = {}
return list(
itertools.chain.from_iterable(
arr.random_split([train_size, test_size], random_state=rng, **kwargs)
for arr in arrays
)
)
elif all(isinstance(arr, da.Array) for arr in arrays):
if shuffle is None:
shuffle = True
if not shuffle:
raise NotImplementedError(
"'shuffle=False' is not currently supported for dask Arrays."
)
splitter = ShuffleSplit(
n_splits=1,
test_size=test_size,
train_size=train_size,
blockwise=blockwise,
random_state=random_state,
)
train_idx, test_idx = next(splitter.split(*arrays))
train_test_pairs = (
(_blockwise_slice(arr, train_idx), _blockwise_slice(arr, test_idx))
for arr in arrays
)
return list(itertools.chain.from_iterable(train_test_pairs))
else:
return ms.train_test_split(
*arrays,
test_size=test_size,
train_size=train_size,
random_state=random_state,
shuffle=shuffle,
)