Dask Delayed
============
.. meta::
:description: The Dask delayed function decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph.
.. toctree::
:maxdepth: 1
:hidden:
delayed-collections.rst
delayed-best-practices.rst
Sometimes problems don't fit into one of the collections like ``dask.array`` or
``dask.dataframe``. In these cases, users can parallelize custom algorithms
using the simpler ``dask.delayed`` interface. This allows you to create graphs
directly with a light annotation of normal python code:
.. code-block:: python
>>> x = dask.delayed(inc)(1)
>>> y = dask.delayed(inc)(2)
>>> z = dask.delayed(add)(x, y)
>>> z.compute()
5
>>> z.visualize()
.. image:: images/inc-add.svg
:alt: A Dask Delayed task graph with two "inc" functions combined using an "add" function resulting in an output node.
.. raw:: html
Example
-------
Visit http://www.aidoczh.com/dask-examples/ delayed.html to see and run examples using Dask
Delayed.
Sometimes we face problems that are parallelizable, but don't fit into high-level
abstractions like Dask Array or Dask DataFrame. Consider the following example:
.. code-block:: python
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = sum(output)
There is clearly parallelism in this problem (many of the ``inc``,
``double``, and ``add`` functions can be evaluated independently), but it's not
clear how to convert this to an array or DataFrame computation. As written,
this code runs sequentially in a single thread. However, we see that a
lot of this could be executed in parallel.
The Dask ``delayed`` function decorates your functions so that they operate
*lazily*. Rather than executing your function immediately, it will defer
execution, placing the function and its arguments into a task graph.
.. currentmodule:: dask.delayed
.. autosummary::
delayed
We slightly modify our code by wrapping functions in ``delayed``.
This delays the execution of the function and generates a Dask graph instead:
.. code-block:: python
import dask
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
We used the ``dask.delayed`` function to wrap the function calls that we want
to turn into tasks. None of the ``inc``, ``double``, ``add``, or ``sum`` calls
have happened yet. Instead, the object ``total`` is a ``Delayed`` result that
contains a task graph of the entire computation. Looking at the graph we see
clear opportunities for parallel execution. The :doc:`Dask schedulers `
will exploit this parallelism, generally improving performance (although not in this
example, because these functions are already very small and fast.)
.. code-block:: python
total.visualize() # see image to the right
.. image:: images/delayed-inc-double-add.svg
:align: right
:alt: A task graph with many nodes for "inc" and "double" that combine with "add" nodes. The output of the "add" nodes finally aggregate with a "sum" node.
We can now compute this lazy result to execute the graph in parallel:
.. code-block:: python
>>> total.compute()
45
Decorator
---------
It is also common to see the delayed function used as a decorator. Here is a
reproduction of our original problem as a parallel code:
.. code-block:: python
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def double(x):
return x * 2
@dask.delayed
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = dask.delayed(sum)(output)
Real time
---------
Sometimes you want to create and destroy work during execution, launch tasks
from other tasks, etc. For this, see the :doc:`Futures ` interface.
Best Practices
--------------
For a list of common problems and recommendations see
:doc:`Delayed Best Practices `.
Indirect Dependencies
---------------------
Sometimes you might find yourself wanting to add a dependency to a task that does
not take the result of that dependency as an input. For example when a task depends
on the side-effect of another task. In these cases you can use
``dask.graph_manipulation.bind``.
.. code-block:: python
import dask
from dask.graph_manipulation import bind
DATA = []
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add_data(x):
DATA.append(x)
@dask.delayed
def sum_data(x):
return sum(DATA) + x
a = inc(1)
b = add_data(a)
c = inc(3)
d = add_data(c)
e = inc(5)
f = bind(sum_data, [b, d])(e)
f.compute()
``sum_data`` will operate on DATA only after both the expected items have
been appended to it. ``bind`` can also be used along with direct dependencies
passed through the function arguments.
Execution
---------
By default, Dask Delayed uses the threaded scheduler in order to avoid data
transfer costs.
You should consider using `multi-processing`_ scheduler or `dask.distributed`_ scheduler
on a local machine or on a cluster if your code does not `release the GIL well`_
(computations that are dominated by pure Python code, or computations wrapping external
code and holding onto it).
.. _`dask.distributed`: http://www.aidoczh.com/dask-distributed/
.. _`multi-processing`: http://www.aidoczh.com/dask/en/stable/scheduling.html#local-processes
.. _`release the GIL well`: http://www.aidoczh.com/dask/en/stable/graphs.html#avoid-holding-the-gil