Source code for dask.bag.random

from __future__ import annotations

import heapq
import math
import random as rnd
from functools import partial
from itertools import islice

from dask.bag.core import Bag


[docs]def sample(population, k, split_every=None): """Chooses k unique random elements from a bag. Returns a new bag containing elements from the population while leaving the original population unchanged. Parameters ---------- population: Bag Elements to sample. k: integer, optional Number of elements to sample. split_every: int (optional) Group partitions into groups of this size while performing reduction. Defaults to 8. Examples -------- >>> import dask.bag as db >>> from dask.bag import random >>> b = db.from_sequence(range(5), npartitions=2) >>> list(random.sample(b, 3).compute()) # doctest: +SKIP [1, 3, 5] """ res = _sample(population=population, k=k, split_every=split_every) return res.map_partitions(_finalize_sample, k)
[docs]def choices(population, k=1, split_every=None): """ Return a k sized list of elements chosen with replacement. Parameters ---------- population: Bag Elements to sample. k: integer, optional Number of elements to sample. split_every: int (optional) Group partitions into groups of this size while performing reduction. Defaults to 8. Examples -------- >>> import dask.bag as db >>> from dask.bag import random >>> b = db.from_sequence(range(5), npartitions=2) >>> list(random.choices(b, 3).compute()) # doctest: +SKIP [1, 1, 5] """ res = _sample_with_replacement(population=population, k=k, split_every=split_every) return res.map_partitions(_finalize_sample, k)
def _sample_reduce(reduce_iter, k, replace): """ Reduce function used on the sample and choice functions. Parameters ---------- reduce_iter : iterable Each element is a tuple coming generated by the _sample_map_partitions function. replace: bool If True, sample with replacement. If False, sample without replacement. Returns a sequence of uniformly distributed samples; """ ns_ks = [] s = [] n = 0 # unfolding reduce outputs for i in reduce_iter: (s_i, n_i) = i s.extend(s_i) n += n_i k_i = len(s_i) ns_ks.append((n_i, k_i)) if k > n and not replace: return s, n # creating the probability array p = [] for n_i, k_i in ns_ks: if k_i > 0: p_i = n_i / (k_i * n) p += [p_i] * k_i sample_func = rnd.choices if replace else _weighted_sampling_without_replacement return sample_func(population=s, weights=p, k=k), n def _weighted_sampling_without_replacement(population, weights, k): """ Source: Weighted random sampling with a reservoir, Pavlos S. Efraimidis, Paul G. Spirakis """ elt = [(math.log(rnd.random()) / weights[i], i) for i in range(len(weights))] return [population[x[1]] for x in heapq.nlargest(k, elt)] def _sample(population, k, split_every): if k < 0: raise ValueError("Cannot take a negative number of samples") return population.reduction( partial(_sample_map_partitions, k=k), partial(_sample_reduce, k=k, replace=False), out_type=Bag, split_every=split_every, ) def _finalize_sample(reduce_iter, k): sample = reduce_iter[0] if len(sample) < k: raise ValueError("Sample larger than population") return sample def _sample_map_partitions(population, k): """ Reservoir sampling strategy based on the L algorithm See https://en.wikipedia.org/wiki/Reservoir_sampling#An_optimal_algorithm """ reservoir, stream_length = [], 0 stream = iter(population) for e in islice(stream, k): reservoir.append(e) stream_length += 1 w = math.exp(math.log(rnd.random()) / k) nxt = (k - 1) + _geometric(w) for i, e in enumerate(stream, k): if i == nxt: reservoir[rnd.randrange(k)] = e w *= math.exp(math.log(rnd.random()) / k) nxt += _geometric(w) stream_length += 1 return reservoir, stream_length def _sample_with_replacement(population, k, split_every): return population.reduction( partial(_sample_with_replacement_map_partitions, k=k), partial(_sample_reduce, k=k, replace=True), out_type=Bag, split_every=split_every, ) def _sample_with_replacement_map_partitions(population, k): """ Reservoir sampling with replacement, the main idea is to use k reservoirs of size 1 See Section Applications in http://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf """ stream = iter(population) e = next(stream) reservoir, stream_length = [e for _ in range(k)], 1 w = [rnd.random() for _ in range(k)] nxt = [_geometric(wi) for wi in w] min_nxt = min(nxt) for i, e in enumerate(stream, 1): if i == min_nxt: for j, n in enumerate(nxt): if n == min_nxt: reservoir[j] = e w[j] *= rnd.random() nxt[j] += _geometric(w[j]) min_nxt = min(nxt) stream_length += 1 return reservoir, stream_length def _geometric(p): return int(math.log(rnd.uniform(0, 1)) / math.log(1 - p)) + 1